Files
multica/server/cmd/migrate/main.go
LinYushen b89b9cb4d6 test(migrate): concurrent migration race test using real Postgres (MUL-2956) (#3712)
* test(migrate): add concurrent migration race test using real Postgres (MUL-2956)

Follow-up to MUL-2923 / #3658, which added a Postgres advisory lock to
serialize the migration loop across concurrent runners (multi-replica
backend startup, scale-up, manual `migrate up` overlap). That PR shipped
without a test because cmd/migrate/ had no harness; this commit adds it.

Refactor: extract runMigrations(ctx, pool, runOptions) from main(), with
the lock key, the bookkeeping table, and the file list now injectable.
main() behavior is unchanged. Identifier interpolation goes through
pgx.Identifier{}.Sanitize so callers can pass "schema.schema_migrations"
safely.

Tests (cmd/migrate/migrate_concurrent_test.go) — every case isolates
itself in a unique throwaway schema and a unique lock key, so they
never touch the real schema_migrations table or block real production
runners that share the database. Skip cleanly when DATABASE_URL is
unreachable, matching the pattern already used in
internal/handler/handler_test.go and internal/metrics/business_sampler_pgsleep_test.go.

  - TestRunMigrationsConcurrentPending: 16 goroutines apply 5
    deliberately non-idempotent migrations (bare CREATE TABLE +
    ALTER TABLE ADD COLUMN). Without the lock, concurrent CREATE TABLE
    races trip "duplicate key value violates unique constraint
    pg_type_typname_nsp_index" — proving the lock is doing its job.
  - TestRunMigrationsConcurrentAlreadyApplied: 16 goroutines hit the
    EXISTS no-op path against a pre-populated bookkeeping table; the
    state must be unchanged.
  - TestRunMigrationsAdvisoryLockSerializes: an external connection
    holds the same advisory lock; we assert that zero of the 16
    runners complete during a 1 s observation window, then release
    the side lock and let them all finish. Catches the original
    MUL-2923 bug where the lock got attached to a random pooled
    connection.
  - TestRunMigrationsConcurrentMixedPoolStress: same pending case but
    with a deliberately small pool (runners/2), forcing pgxpool.Acquire
    contention to overlap with pg_advisory_lock contention.

Verified locally: `go test -race -count=10 ./cmd/migrate/` passes in
~15 s. Mutation test (lock acquire/release replaced with `SELECT 1`)
confirms the pending and lock-serializes tests both fail loudly,
catching the regression they were written to detect.

go.mod tidy promotes golang.org/x/sync to a direct dependency
(now imported by the test for errgroup) and incidentally fixes a
stale `// indirect` annotation on prometheus/client_model, which is
already imported directly by internal/metrics/testutil.go.

Co-authored-by: multica-agent <github@multica.ai>

* test(migrate): gofmt + address review nits (MUL-2956)

- gofmt -w cmd/migrate/migrate_concurrent_test.go: fixture struct field
  alignment.
- quoteQualifiedIdentifier: actually reject identifiers with more than
  one dot (the previous version split on the first dot only and would
  silently sanitize "a.b.c" into "a"."b.c", contradicting the comment).
  Inline the splitter via strings.Split now that we explicitly check the
  component count.
- Soften the test's lock-key comment from "never collide" to the
  accurate probabilistic statement (~1 in 2^62 collision odds with the
  production constant).

go test -race -count=10 ./cmd/migrate/ still passes (~15 s).

Co-authored-by: multica-agent <github@multica.ai>

* test(migrate): direction whitelist + tidy go.mod (MUL-2956)

Address two follow-ups from review:

- runMigrations now whitelist-checks opts.Direction up-front and
  returns an error for anything that is not "up" or "down". The
  previous shape relied on `opts.Direction == "up"` and an else branch,
  so a typo or empty string would silently fall through to the
  rollback path. Add TestRunMigrationsRejectsInvalidDirection covering
  the empty string, "UP"/"DOWN" case mismatches, "rollback", and a
  whitespace-padded value; the check fires before any pool work, so
  the test runs without Postgres.
- go mod tidy: promotes google.golang.org/protobuf to a direct
  dependency (it is imported directly elsewhere in the module and was
  stale-marked indirect).

go test -race -count=10 ./cmd/migrate/ green (~15.7 s, 50/50).

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: wei-heshang <wei-heshang@multica.ai>
Co-authored-by: multica-agent <github@multica.ai>
2026-06-08 13:33:16 +08:00

318 lines
11 KiB
Go

package main
import (
"context"
"fmt"
"log/slog"
"os"
"strings"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/multica-ai/multica/server/internal/logger"
"github.com/multica-ai/multica/server/internal/migrations"
"github.com/multica-ai/multica/server/internal/taskusagebackfill"
)
// preMigrationHook runs work that must happen before a specific
// migration is applied during `migrate up`. Hooks are idempotent and
// must not depend on the migration loop's session-pinned advisory lock
// — they run on the pool, not on the loop's pinned conn, so they can
// safely acquire other session-level locks (e.g. advisory lock 4246
// for the task_usage hourly rollup).
//
// Returning an error aborts the migration run. The corresponding
// migration is NOT recorded in schema_migrations, so the next run will
// retry the hook + migration.
type preMigrationHook func(ctx context.Context, pool *pgxpool.Pool) error
// preMigrationHooks wires migration version → hook. The version key is
// the file basename without the `.up.sql` suffix, matching what
// `migrations.ExtractVersion` returns.
//
// MUL-2957: the v0.3.4 → current direct-upgrade path needs the hourly
// rollup seeded BEFORE migration 103 evaluates its fail-closed lag
// guard, because at `cmd/migrate up` time the server has not yet
// started so neither the legacy pg_cron job nor the new app scheduler
// can advance the watermark. The hook runs the same idempotent
// monthly-slice backfill that
// `cmd/backfill_task_usage_hourly` exposes to operators.
var preMigrationHooks = map[string]preMigrationHook{
"103_drop_legacy_daily_rollups": runTaskUsageHourlyHook,
}
func runTaskUsageHourlyHook(ctx context.Context, pool *pgxpool.Pool) error {
res, err := taskusagebackfill.Hook(ctx, pool, taskusagebackfill.HookOptions{})
if err != nil {
return fmt.Errorf("task_usage_hourly pre-103 hook: %w", err)
}
if res.Skipped != "" {
slog.Info("task_usage hourly rollup hook: skipped",
"reason", res.Skipped,
"watermark_stamped", res.WatermarkStamped)
return nil
}
slog.Info("task_usage hourly rollup hook: backfill complete",
"slices", res.SlicesProcessed,
"rows_touched", res.RowsTouched,
"from", res.From.Format("2006-01-02T15:04:05Z07:00"),
"to", res.To.Format("2006-01-02T15:04:05Z07:00"))
return nil
}
// migrationAdvisoryLockKey is the int64 identifier used with Postgres
// pg_advisory_lock to serialize the migration loop across concurrent
// runners (multi-replica backend Deployment, scale-up, or a manual
// `migrate up` overlapping with pod startup). The exact value is
// arbitrary — it just needs to be stable across every process that runs
// migrations against the same database. See GitHub multica-ai/multica#3647.
const migrationAdvisoryLockKey int64 = 7244554146635925501
// defaultSchemaMigrationsTable is the unqualified name of the bookkeeping
// table that tracks which migrations have been applied. Tests override
// this so a concurrent-race harness can run against the same shared
// Postgres without colliding with the production table.
const defaultSchemaMigrationsTable = "schema_migrations"
// runOptions carries everything runMigrations needs that is not the
// pool itself. Tests use it to inject a hermetic migrations directory,
// a unique per-test bookkeeping table, and a unique advisory-lock key
// that doesn't collide with any other migration runner sharing the same
// Postgres instance.
type runOptions struct {
// Direction is "up" or "down".
Direction string
// Files is the ordered list of .sql files to apply. Production callers
// pass migrations.Files(direction); tests pass a curated set written
// to a t.TempDir().
Files []string
// SchemaMigrationsTable is the bookkeeping table to read/write.
// May be schema-qualified (e.g. "migrate_test_xyz.schema_migrations").
// Empty means defaultSchemaMigrationsTable.
SchemaMigrationsTable string
// AdvisoryLockKey is the int64 used with pg_advisory_lock. Zero means
// migrationAdvisoryLockKey. Tests pass a unique key per run so
// concurrent test workers do not block on the production migration
// runner if it happens to share the database.
AdvisoryLockKey int64
// Hooks maps migration version → pre-migration hook. The hook
// receives the pool (not the loop's pinned conn) so it can take
// its own session-level locks. nil or missing entries mean "no
// hook" and the migration runs straight through. Production main()
// passes preMigrationHooks; tests leave this nil.
Hooks map[string]preMigrationHook
}
func main() {
logger.Init()
if len(os.Args) < 2 {
fmt.Println("Usage: go run ./cmd/migrate <up|down>")
os.Exit(1)
}
direction := os.Args[1]
if direction != "up" && direction != "down" {
fmt.Println("Usage: go run ./cmd/migrate <up|down>")
os.Exit(1)
}
dbURL := os.Getenv("DATABASE_URL")
if dbURL == "" {
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
}
ctx := context.Background()
pool, err := pgxpool.New(ctx, dbURL)
if err != nil {
slog.Error("unable to connect to database", "error", err)
os.Exit(1)
}
defer pool.Close()
if err := pool.Ping(ctx); err != nil {
slog.Error("unable to ping database", "error", err)
os.Exit(1)
}
files, err := migrations.Files(direction)
if err != nil {
slog.Error("failed to find migration files", "error", err)
os.Exit(1)
}
if err := runMigrations(ctx, pool, runOptions{
Direction: direction,
Files: files,
Hooks: preMigrationHooks,
}); err != nil {
slog.Error("migration run failed", "error", err)
os.Exit(1)
}
fmt.Println("Done.")
}
// runMigrations applies (direction="up") or rolls back (direction="down")
// the given file list against the supplied pool, serialized through a
// Postgres session-level advisory lock so multiple concurrent runners
// (multi-replica startup, scale-up, manual migrate overlap) take turns
// instead of racing each other.
//
// It is safe to invoke concurrently from multiple goroutines or
// processes against the same database with the same options: every
// caller blocks on pg_advisory_lock, and once it is their turn the
// already-applied EXISTS check turns each finished migration into a
// no-op skip. See GitHub multica-ai/multica#3647 / MUL-2923.
func runMigrations(ctx context.Context, pool *pgxpool.Pool, opts runOptions) error {
switch opts.Direction {
case "up", "down":
// ok
default:
return fmt.Errorf("invalid direction %q (want \"up\" or \"down\")", opts.Direction)
}
table := opts.SchemaMigrationsTable
if table == "" {
table = defaultSchemaMigrationsTable
}
tableIdent, err := quoteQualifiedIdentifier(table)
if err != nil {
return fmt.Errorf("invalid schema migrations table %q: %w", table, err)
}
lockKey := opts.AdvisoryLockKey
if lockKey == 0 {
lockKey = migrationAdvisoryLockKey
}
// pg_advisory_lock is scoped to a single session, so we must pin one
// *pgxpool.Conn for the whole run — calling pool.Exec would attach the
// lock to a random connection that pgxpool could hand back out before
// the loop finishes, making the lock effectively a no-op. We use the
// blocking pg_advisory_lock (not pg_try_*) so a late-arriving runner
// queues behind the current one instead of crash-looping; once it
// acquires the lock the EXISTS checks below turn finished migrations
// into no-op skips.
//
// We deliberately do NOT wrap the loop in a single transaction: the
// repo already ships migrations using CREATE INDEX CONCURRENTLY,
// which Postgres rejects inside a transaction block.
conn, err := pool.Acquire(ctx)
if err != nil {
return fmt.Errorf("acquire migration connection: %w", err)
}
defer conn.Release()
if _, err := conn.Exec(ctx, "SELECT pg_advisory_lock($1)", lockKey); err != nil {
return fmt.Errorf("acquire migration advisory lock: %w", err)
}
// Best-effort explicit unlock on the success path. On error returns
// the defer still runs; on os.Exit error paths in main() it does not,
// but session-level advisory locks are released automatically when
// the connection closes at process exit, so the next runner is never
// permanently blocked.
defer func() {
if _, err := conn.Exec(ctx, "SELECT pg_advisory_unlock($1)", lockKey); err != nil {
slog.Warn("failed to release migration advisory lock", "error", err)
}
}()
// Create migrations tracking table.
if _, err := conn.Exec(ctx, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s (
version TEXT PRIMARY KEY,
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
)
`, tableIdent)); err != nil {
return fmt.Errorf("create migrations table: %w", err)
}
existsSQL := fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM %s WHERE version = $1)", tableIdent)
insertSQL := fmt.Sprintf("INSERT INTO %s (version) VALUES ($1)", tableIdent)
deleteSQL := fmt.Sprintf("DELETE FROM %s WHERE version = $1", tableIdent)
for _, file := range opts.Files {
version := migrations.ExtractVersion(file)
var exists bool
if err := conn.QueryRow(ctx, existsSQL, version).Scan(&exists); err != nil {
return fmt.Errorf("check migration %q: %w", version, err)
}
if opts.Direction == "up" {
if exists {
fmt.Printf(" skip %s (already applied)\n", version)
continue
}
} else {
if !exists {
fmt.Printf(" skip %s (not applied)\n", version)
continue
}
}
sql, err := os.ReadFile(file)
if err != nil {
return fmt.Errorf("read migration %q: %w", file, err)
}
// Run any pre-migration hook before the SQL file. Hooks
// receive the *pgxpool.Pool (not the loop's pinned conn), so
// they can acquire other session-level locks without
// colliding with migrationAdvisoryLockKey. Hook failures
// abort the run before schema_migrations is updated, so the
// same version retries cleanly on the next invocation.
if opts.Direction == "up" {
if hook, ok := opts.Hooks[version]; ok && hook != nil {
slog.Info("running pre-migration hook", "version", version)
if err := hook(ctx, pool); err != nil {
return fmt.Errorf("pre-migration hook for %q: %w", version, err)
}
}
}
if _, err := conn.Exec(ctx, string(sql)); err != nil {
return fmt.Errorf("apply migration %q: %w", file, err)
}
if opts.Direction == "up" {
_, err = conn.Exec(ctx, insertSQL, version)
} else {
_, err = conn.Exec(ctx, deleteSQL, version)
}
if err != nil {
return fmt.Errorf("record migration %q: %w", version, err)
}
fmt.Printf(" %s %s\n", opts.Direction, version)
}
return nil
}
// quoteQualifiedIdentifier safely quotes either an unqualified table
// name ("foo") or a schema-qualified name ("schema.foo") for embedding
// into a SQL statement. Postgres does not let parametrized queries
// supply identifiers, so we have to interpolate, but pgx.Identifier
// does the right escaping (double-quotes, embedded-quote handling).
//
// The accepted shape is exactly one or two dot-separated components.
// Names containing more than one dot are rejected outright rather than
// silently sanitized into a "schema"."b.c" reference, which is valid
// SQL but almost certainly not what the caller meant.
func quoteQualifiedIdentifier(name string) (string, error) {
if name == "" {
return "", fmt.Errorf("empty identifier")
}
parts := strings.Split(name, ".")
if len(parts) > 2 {
return "", fmt.Errorf("identifier %q has more than one dot; only schema.table is supported", name)
}
for _, p := range parts {
if p == "" {
return "", fmt.Errorf("empty component in %q", name)
}
}
return pgx.Identifier(parts).Sanitize(), nil
}