mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
* test(migrate): add concurrent migration race test using real Postgres (MUL-2956) Follow-up to MUL-2923 / #3658, which added a Postgres advisory lock to serialize the migration loop across concurrent runners (multi-replica backend startup, scale-up, manual `migrate up` overlap). That PR shipped without a test because cmd/migrate/ had no harness; this commit adds it. Refactor: extract runMigrations(ctx, pool, runOptions) from main(), with the lock key, the bookkeeping table, and the file list now injectable. main() behavior is unchanged. Identifier interpolation goes through pgx.Identifier{}.Sanitize so callers can pass "schema.schema_migrations" safely. Tests (cmd/migrate/migrate_concurrent_test.go) — every case isolates itself in a unique throwaway schema and a unique lock key, so they never touch the real schema_migrations table or block real production runners that share the database. Skip cleanly when DATABASE_URL is unreachable, matching the pattern already used in internal/handler/handler_test.go and internal/metrics/business_sampler_pgsleep_test.go. - TestRunMigrationsConcurrentPending: 16 goroutines apply 5 deliberately non-idempotent migrations (bare CREATE TABLE + ALTER TABLE ADD COLUMN). Without the lock, concurrent CREATE TABLE races trip "duplicate key value violates unique constraint pg_type_typname_nsp_index" — proving the lock is doing its job. - TestRunMigrationsConcurrentAlreadyApplied: 16 goroutines hit the EXISTS no-op path against a pre-populated bookkeeping table; the state must be unchanged. - TestRunMigrationsAdvisoryLockSerializes: an external connection holds the same advisory lock; we assert that zero of the 16 runners complete during a 1 s observation window, then release the side lock and let them all finish. Catches the original MUL-2923 bug where the lock got attached to a random pooled connection. - TestRunMigrationsConcurrentMixedPoolStress: same pending case but with a deliberately small pool (runners/2), forcing pgxpool.Acquire contention to overlap with pg_advisory_lock contention. Verified locally: `go test -race -count=10 ./cmd/migrate/` passes in ~15 s. Mutation test (lock acquire/release replaced with `SELECT 1`) confirms the pending and lock-serializes tests both fail loudly, catching the regression they were written to detect. go.mod tidy promotes golang.org/x/sync to a direct dependency (now imported by the test for errgroup) and incidentally fixes a stale `// indirect` annotation on prometheus/client_model, which is already imported directly by internal/metrics/testutil.go. Co-authored-by: multica-agent <github@multica.ai> * test(migrate): gofmt + address review nits (MUL-2956) - gofmt -w cmd/migrate/migrate_concurrent_test.go: fixture struct field alignment. - quoteQualifiedIdentifier: actually reject identifiers with more than one dot (the previous version split on the first dot only and would silently sanitize "a.b.c" into "a"."b.c", contradicting the comment). Inline the splitter via strings.Split now that we explicitly check the component count. - Soften the test's lock-key comment from "never collide" to the accurate probabilistic statement (~1 in 2^62 collision odds with the production constant). go test -race -count=10 ./cmd/migrate/ still passes (~15 s). Co-authored-by: multica-agent <github@multica.ai> * test(migrate): direction whitelist + tidy go.mod (MUL-2956) Address two follow-ups from review: - runMigrations now whitelist-checks opts.Direction up-front and returns an error for anything that is not "up" or "down". The previous shape relied on `opts.Direction == "up"` and an else branch, so a typo or empty string would silently fall through to the rollback path. Add TestRunMigrationsRejectsInvalidDirection covering the empty string, "UP"/"DOWN" case mismatches, "rollback", and a whitespace-padded value; the check fires before any pool work, so the test runs without Postgres. - go mod tidy: promotes google.golang.org/protobuf to a direct dependency (it is imported directly elsewhere in the module and was stale-marked indirect). go test -race -count=10 ./cmd/migrate/ green (~15.7 s, 50/50). Co-authored-by: multica-agent <github@multica.ai> --------- Co-authored-by: wei-heshang <wei-heshang@multica.ai> Co-authored-by: multica-agent <github@multica.ai>
318 lines
11 KiB
Go
318 lines
11 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"strings"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/multica-ai/multica/server/internal/logger"
|
|
"github.com/multica-ai/multica/server/internal/migrations"
|
|
"github.com/multica-ai/multica/server/internal/taskusagebackfill"
|
|
)
|
|
|
|
// preMigrationHook runs work that must happen before a specific
|
|
// migration is applied during `migrate up`. Hooks are idempotent and
|
|
// must not depend on the migration loop's session-pinned advisory lock
|
|
// — they run on the pool, not on the loop's pinned conn, so they can
|
|
// safely acquire other session-level locks (e.g. advisory lock 4246
|
|
// for the task_usage hourly rollup).
|
|
//
|
|
// Returning an error aborts the migration run. The corresponding
|
|
// migration is NOT recorded in schema_migrations, so the next run will
|
|
// retry the hook + migration.
|
|
type preMigrationHook func(ctx context.Context, pool *pgxpool.Pool) error
|
|
|
|
// preMigrationHooks wires migration version → hook. The version key is
|
|
// the file basename without the `.up.sql` suffix, matching what
|
|
// `migrations.ExtractVersion` returns.
|
|
//
|
|
// MUL-2957: the v0.3.4 → current direct-upgrade path needs the hourly
|
|
// rollup seeded BEFORE migration 103 evaluates its fail-closed lag
|
|
// guard, because at `cmd/migrate up` time the server has not yet
|
|
// started so neither the legacy pg_cron job nor the new app scheduler
|
|
// can advance the watermark. The hook runs the same idempotent
|
|
// monthly-slice backfill that
|
|
// `cmd/backfill_task_usage_hourly` exposes to operators.
|
|
var preMigrationHooks = map[string]preMigrationHook{
|
|
"103_drop_legacy_daily_rollups": runTaskUsageHourlyHook,
|
|
}
|
|
|
|
func runTaskUsageHourlyHook(ctx context.Context, pool *pgxpool.Pool) error {
|
|
res, err := taskusagebackfill.Hook(ctx, pool, taskusagebackfill.HookOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("task_usage_hourly pre-103 hook: %w", err)
|
|
}
|
|
if res.Skipped != "" {
|
|
slog.Info("task_usage hourly rollup hook: skipped",
|
|
"reason", res.Skipped,
|
|
"watermark_stamped", res.WatermarkStamped)
|
|
return nil
|
|
}
|
|
slog.Info("task_usage hourly rollup hook: backfill complete",
|
|
"slices", res.SlicesProcessed,
|
|
"rows_touched", res.RowsTouched,
|
|
"from", res.From.Format("2006-01-02T15:04:05Z07:00"),
|
|
"to", res.To.Format("2006-01-02T15:04:05Z07:00"))
|
|
return nil
|
|
}
|
|
|
|
// migrationAdvisoryLockKey is the int64 identifier used with Postgres
|
|
// pg_advisory_lock to serialize the migration loop across concurrent
|
|
// runners (multi-replica backend Deployment, scale-up, or a manual
|
|
// `migrate up` overlapping with pod startup). The exact value is
|
|
// arbitrary — it just needs to be stable across every process that runs
|
|
// migrations against the same database. See GitHub multica-ai/multica#3647.
|
|
const migrationAdvisoryLockKey int64 = 7244554146635925501
|
|
|
|
// defaultSchemaMigrationsTable is the unqualified name of the bookkeeping
|
|
// table that tracks which migrations have been applied. Tests override
|
|
// this so a concurrent-race harness can run against the same shared
|
|
// Postgres without colliding with the production table.
|
|
const defaultSchemaMigrationsTable = "schema_migrations"
|
|
|
|
// runOptions carries everything runMigrations needs that is not the
|
|
// pool itself. Tests use it to inject a hermetic migrations directory,
|
|
// a unique per-test bookkeeping table, and a unique advisory-lock key
|
|
// that doesn't collide with any other migration runner sharing the same
|
|
// Postgres instance.
|
|
type runOptions struct {
|
|
// Direction is "up" or "down".
|
|
Direction string
|
|
// Files is the ordered list of .sql files to apply. Production callers
|
|
// pass migrations.Files(direction); tests pass a curated set written
|
|
// to a t.TempDir().
|
|
Files []string
|
|
// SchemaMigrationsTable is the bookkeeping table to read/write.
|
|
// May be schema-qualified (e.g. "migrate_test_xyz.schema_migrations").
|
|
// Empty means defaultSchemaMigrationsTable.
|
|
SchemaMigrationsTable string
|
|
// AdvisoryLockKey is the int64 used with pg_advisory_lock. Zero means
|
|
// migrationAdvisoryLockKey. Tests pass a unique key per run so
|
|
// concurrent test workers do not block on the production migration
|
|
// runner if it happens to share the database.
|
|
AdvisoryLockKey int64
|
|
// Hooks maps migration version → pre-migration hook. The hook
|
|
// receives the pool (not the loop's pinned conn) so it can take
|
|
// its own session-level locks. nil or missing entries mean "no
|
|
// hook" and the migration runs straight through. Production main()
|
|
// passes preMigrationHooks; tests leave this nil.
|
|
Hooks map[string]preMigrationHook
|
|
}
|
|
|
|
func main() {
|
|
logger.Init()
|
|
|
|
if len(os.Args) < 2 {
|
|
fmt.Println("Usage: go run ./cmd/migrate <up|down>")
|
|
os.Exit(1)
|
|
}
|
|
|
|
direction := os.Args[1]
|
|
if direction != "up" && direction != "down" {
|
|
fmt.Println("Usage: go run ./cmd/migrate <up|down>")
|
|
os.Exit(1)
|
|
}
|
|
|
|
dbURL := os.Getenv("DATABASE_URL")
|
|
if dbURL == "" {
|
|
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
|
}
|
|
|
|
ctx := context.Background()
|
|
pool, err := pgxpool.New(ctx, dbURL)
|
|
if err != nil {
|
|
slog.Error("unable to connect to database", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
defer pool.Close()
|
|
|
|
if err := pool.Ping(ctx); err != nil {
|
|
slog.Error("unable to ping database", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
files, err := migrations.Files(direction)
|
|
if err != nil {
|
|
slog.Error("failed to find migration files", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if err := runMigrations(ctx, pool, runOptions{
|
|
Direction: direction,
|
|
Files: files,
|
|
Hooks: preMigrationHooks,
|
|
}); err != nil {
|
|
slog.Error("migration run failed", "error", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
fmt.Println("Done.")
|
|
}
|
|
|
|
// runMigrations applies (direction="up") or rolls back (direction="down")
|
|
// the given file list against the supplied pool, serialized through a
|
|
// Postgres session-level advisory lock so multiple concurrent runners
|
|
// (multi-replica startup, scale-up, manual migrate overlap) take turns
|
|
// instead of racing each other.
|
|
//
|
|
// It is safe to invoke concurrently from multiple goroutines or
|
|
// processes against the same database with the same options: every
|
|
// caller blocks on pg_advisory_lock, and once it is their turn the
|
|
// already-applied EXISTS check turns each finished migration into a
|
|
// no-op skip. See GitHub multica-ai/multica#3647 / MUL-2923.
|
|
func runMigrations(ctx context.Context, pool *pgxpool.Pool, opts runOptions) error {
|
|
switch opts.Direction {
|
|
case "up", "down":
|
|
// ok
|
|
default:
|
|
return fmt.Errorf("invalid direction %q (want \"up\" or \"down\")", opts.Direction)
|
|
}
|
|
|
|
table := opts.SchemaMigrationsTable
|
|
if table == "" {
|
|
table = defaultSchemaMigrationsTable
|
|
}
|
|
tableIdent, err := quoteQualifiedIdentifier(table)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid schema migrations table %q: %w", table, err)
|
|
}
|
|
lockKey := opts.AdvisoryLockKey
|
|
if lockKey == 0 {
|
|
lockKey = migrationAdvisoryLockKey
|
|
}
|
|
|
|
// pg_advisory_lock is scoped to a single session, so we must pin one
|
|
// *pgxpool.Conn for the whole run — calling pool.Exec would attach the
|
|
// lock to a random connection that pgxpool could hand back out before
|
|
// the loop finishes, making the lock effectively a no-op. We use the
|
|
// blocking pg_advisory_lock (not pg_try_*) so a late-arriving runner
|
|
// queues behind the current one instead of crash-looping; once it
|
|
// acquires the lock the EXISTS checks below turn finished migrations
|
|
// into no-op skips.
|
|
//
|
|
// We deliberately do NOT wrap the loop in a single transaction: the
|
|
// repo already ships migrations using CREATE INDEX CONCURRENTLY,
|
|
// which Postgres rejects inside a transaction block.
|
|
conn, err := pool.Acquire(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("acquire migration connection: %w", err)
|
|
}
|
|
defer conn.Release()
|
|
|
|
if _, err := conn.Exec(ctx, "SELECT pg_advisory_lock($1)", lockKey); err != nil {
|
|
return fmt.Errorf("acquire migration advisory lock: %w", err)
|
|
}
|
|
// Best-effort explicit unlock on the success path. On error returns
|
|
// the defer still runs; on os.Exit error paths in main() it does not,
|
|
// but session-level advisory locks are released automatically when
|
|
// the connection closes at process exit, so the next runner is never
|
|
// permanently blocked.
|
|
defer func() {
|
|
if _, err := conn.Exec(ctx, "SELECT pg_advisory_unlock($1)", lockKey); err != nil {
|
|
slog.Warn("failed to release migration advisory lock", "error", err)
|
|
}
|
|
}()
|
|
|
|
// Create migrations tracking table.
|
|
if _, err := conn.Exec(ctx, fmt.Sprintf(`
|
|
CREATE TABLE IF NOT EXISTS %s (
|
|
version TEXT PRIMARY KEY,
|
|
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
)
|
|
`, tableIdent)); err != nil {
|
|
return fmt.Errorf("create migrations table: %w", err)
|
|
}
|
|
|
|
existsSQL := fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM %s WHERE version = $1)", tableIdent)
|
|
insertSQL := fmt.Sprintf("INSERT INTO %s (version) VALUES ($1)", tableIdent)
|
|
deleteSQL := fmt.Sprintf("DELETE FROM %s WHERE version = $1", tableIdent)
|
|
|
|
for _, file := range opts.Files {
|
|
version := migrations.ExtractVersion(file)
|
|
|
|
var exists bool
|
|
if err := conn.QueryRow(ctx, existsSQL, version).Scan(&exists); err != nil {
|
|
return fmt.Errorf("check migration %q: %w", version, err)
|
|
}
|
|
|
|
if opts.Direction == "up" {
|
|
if exists {
|
|
fmt.Printf(" skip %s (already applied)\n", version)
|
|
continue
|
|
}
|
|
} else {
|
|
if !exists {
|
|
fmt.Printf(" skip %s (not applied)\n", version)
|
|
continue
|
|
}
|
|
}
|
|
|
|
sql, err := os.ReadFile(file)
|
|
if err != nil {
|
|
return fmt.Errorf("read migration %q: %w", file, err)
|
|
}
|
|
|
|
// Run any pre-migration hook before the SQL file. Hooks
|
|
// receive the *pgxpool.Pool (not the loop's pinned conn), so
|
|
// they can acquire other session-level locks without
|
|
// colliding with migrationAdvisoryLockKey. Hook failures
|
|
// abort the run before schema_migrations is updated, so the
|
|
// same version retries cleanly on the next invocation.
|
|
if opts.Direction == "up" {
|
|
if hook, ok := opts.Hooks[version]; ok && hook != nil {
|
|
slog.Info("running pre-migration hook", "version", version)
|
|
if err := hook(ctx, pool); err != nil {
|
|
return fmt.Errorf("pre-migration hook for %q: %w", version, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if _, err := conn.Exec(ctx, string(sql)); err != nil {
|
|
return fmt.Errorf("apply migration %q: %w", file, err)
|
|
}
|
|
|
|
if opts.Direction == "up" {
|
|
_, err = conn.Exec(ctx, insertSQL, version)
|
|
} else {
|
|
_, err = conn.Exec(ctx, deleteSQL, version)
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("record migration %q: %w", version, err)
|
|
}
|
|
|
|
fmt.Printf(" %s %s\n", opts.Direction, version)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// quoteQualifiedIdentifier safely quotes either an unqualified table
|
|
// name ("foo") or a schema-qualified name ("schema.foo") for embedding
|
|
// into a SQL statement. Postgres does not let parametrized queries
|
|
// supply identifiers, so we have to interpolate, but pgx.Identifier
|
|
// does the right escaping (double-quotes, embedded-quote handling).
|
|
//
|
|
// The accepted shape is exactly one or two dot-separated components.
|
|
// Names containing more than one dot are rejected outright rather than
|
|
// silently sanitized into a "schema"."b.c" reference, which is valid
|
|
// SQL but almost certainly not what the caller meant.
|
|
func quoteQualifiedIdentifier(name string) (string, error) {
|
|
if name == "" {
|
|
return "", fmt.Errorf("empty identifier")
|
|
}
|
|
parts := strings.Split(name, ".")
|
|
if len(parts) > 2 {
|
|
return "", fmt.Errorf("identifier %q has more than one dot; only schema.table is supported", name)
|
|
}
|
|
for _, p := range parts {
|
|
if p == "" {
|
|
return "", fmt.Errorf("empty component in %q", name)
|
|
}
|
|
}
|
|
return pgx.Identifier(parts).Sanitize(), nil
|
|
}
|