mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
Compare commits
3 Commits
v0.3.23
...
mul-2956-c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
098003c2a5 | ||
|
|
87d172daa0 | ||
|
|
3bb757d214 |
@@ -5,7 +5,9 @@ import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/multica-ai/multica/server/internal/logger"
|
||||
"github.com/multica-ai/multica/server/internal/migrations"
|
||||
@@ -66,6 +68,41 @@ func runTaskUsageHourlyHook(ctx context.Context, pool *pgxpool.Pool) error {
|
||||
// migrations against the same database. See GitHub multica-ai/multica#3647.
|
||||
const migrationAdvisoryLockKey int64 = 7244554146635925501
|
||||
|
||||
// defaultSchemaMigrationsTable is the unqualified name of the bookkeeping
|
||||
// table that tracks which migrations have been applied. Tests override
|
||||
// this so a concurrent-race harness can run against the same shared
|
||||
// Postgres without colliding with the production table.
|
||||
const defaultSchemaMigrationsTable = "schema_migrations"
|
||||
|
||||
// runOptions carries everything runMigrations needs that is not the
|
||||
// pool itself. Tests use it to inject a hermetic migrations directory,
|
||||
// a unique per-test bookkeeping table, and a unique advisory-lock key
|
||||
// that doesn't collide with any other migration runner sharing the same
|
||||
// Postgres instance.
|
||||
type runOptions struct {
|
||||
// Direction is "up" or "down".
|
||||
Direction string
|
||||
// Files is the ordered list of .sql files to apply. Production callers
|
||||
// pass migrations.Files(direction); tests pass a curated set written
|
||||
// to a t.TempDir().
|
||||
Files []string
|
||||
// SchemaMigrationsTable is the bookkeeping table to read/write.
|
||||
// May be schema-qualified (e.g. "migrate_test_xyz.schema_migrations").
|
||||
// Empty means defaultSchemaMigrationsTable.
|
||||
SchemaMigrationsTable string
|
||||
// AdvisoryLockKey is the int64 used with pg_advisory_lock. Zero means
|
||||
// migrationAdvisoryLockKey. Tests pass a unique key per run so
|
||||
// concurrent test workers do not block on the production migration
|
||||
// runner if it happens to share the database.
|
||||
AdvisoryLockKey int64
|
||||
// Hooks maps migration version → pre-migration hook. The hook
|
||||
// receives the pool (not the loop's pinned conn) so it can take
|
||||
// its own session-level locks. nil or missing entries mean "no
|
||||
// hook" and the migration runs straight through. Production main()
|
||||
// passes preMigrationHooks; tests leave this nil.
|
||||
Hooks map[string]preMigrationHook
|
||||
}
|
||||
|
||||
func main() {
|
||||
logger.Init()
|
||||
|
||||
@@ -98,81 +135,116 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Serialize the entire migration run with a Postgres session-level
|
||||
// advisory lock. pg_advisory_lock is scoped to a single session, so we
|
||||
// must pin one *pgxpool.Conn for the whole run — calling pool.Exec
|
||||
// would attach the lock to a random connection that pgxpool could
|
||||
// hand back out before the loop finishes, making the lock effectively
|
||||
// a no-op. We use the blocking pg_advisory_lock (not pg_try_*) so a
|
||||
// late-arriving pod queues behind the current runner instead of
|
||||
// crash-looping; once it acquires the lock the EXISTS checks below
|
||||
// turn into a no-op skip. See GitHub multica-ai/multica#3647.
|
||||
//
|
||||
// We deliberately do NOT wrap the loop in a single transaction: the
|
||||
// repo already ships migrations using CREATE INDEX CONCURRENTLY,
|
||||
// which Postgres rejects inside a transaction block.
|
||||
conn, err := pool.Acquire(ctx)
|
||||
if err != nil {
|
||||
slog.Error("unable to acquire migration connection", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer conn.Release()
|
||||
|
||||
if _, err := conn.Exec(ctx, "SELECT pg_advisory_lock($1)", migrationAdvisoryLockKey); err != nil {
|
||||
slog.Error("failed to acquire migration advisory lock", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
// Best-effort explicit unlock on the success path. On os.Exit error
|
||||
// paths this defer does not run, but session-level advisory locks are
|
||||
// released automatically when the connection closes at process exit,
|
||||
// so the next runner is never permanently blocked.
|
||||
defer func() {
|
||||
if _, err := conn.Exec(ctx, "SELECT pg_advisory_unlock($1)", migrationAdvisoryLockKey); err != nil {
|
||||
slog.Warn("failed to release migration advisory lock", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Create migrations tracking table
|
||||
_, err = conn.Exec(ctx, `
|
||||
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||
version TEXT PRIMARY KEY,
|
||||
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
)
|
||||
`)
|
||||
if err != nil {
|
||||
slog.Error("failed to create migrations table", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
files, err := migrations.Files(direction)
|
||||
if err != nil {
|
||||
slog.Error("failed to find migration files", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
for _, file := range files {
|
||||
if err := runMigrations(ctx, pool, runOptions{
|
||||
Direction: direction,
|
||||
Files: files,
|
||||
Hooks: preMigrationHooks,
|
||||
}); err != nil {
|
||||
slog.Error("migration run failed", "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
fmt.Println("Done.")
|
||||
}
|
||||
|
||||
// runMigrations applies (direction="up") or rolls back (direction="down")
|
||||
// the given file list against the supplied pool, serialized through a
|
||||
// Postgres session-level advisory lock so multiple concurrent runners
|
||||
// (multi-replica startup, scale-up, manual migrate overlap) take turns
|
||||
// instead of racing each other.
|
||||
//
|
||||
// It is safe to invoke concurrently from multiple goroutines or
|
||||
// processes against the same database with the same options: every
|
||||
// caller blocks on pg_advisory_lock, and once it is their turn the
|
||||
// already-applied EXISTS check turns each finished migration into a
|
||||
// no-op skip. See GitHub multica-ai/multica#3647 / MUL-2923.
|
||||
func runMigrations(ctx context.Context, pool *pgxpool.Pool, opts runOptions) error {
|
||||
switch opts.Direction {
|
||||
case "up", "down":
|
||||
// ok
|
||||
default:
|
||||
return fmt.Errorf("invalid direction %q (want \"up\" or \"down\")", opts.Direction)
|
||||
}
|
||||
|
||||
table := opts.SchemaMigrationsTable
|
||||
if table == "" {
|
||||
table = defaultSchemaMigrationsTable
|
||||
}
|
||||
tableIdent, err := quoteQualifiedIdentifier(table)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid schema migrations table %q: %w", table, err)
|
||||
}
|
||||
lockKey := opts.AdvisoryLockKey
|
||||
if lockKey == 0 {
|
||||
lockKey = migrationAdvisoryLockKey
|
||||
}
|
||||
|
||||
// pg_advisory_lock is scoped to a single session, so we must pin one
|
||||
// *pgxpool.Conn for the whole run — calling pool.Exec would attach the
|
||||
// lock to a random connection that pgxpool could hand back out before
|
||||
// the loop finishes, making the lock effectively a no-op. We use the
|
||||
// blocking pg_advisory_lock (not pg_try_*) so a late-arriving runner
|
||||
// queues behind the current one instead of crash-looping; once it
|
||||
// acquires the lock the EXISTS checks below turn finished migrations
|
||||
// into no-op skips.
|
||||
//
|
||||
// We deliberately do NOT wrap the loop in a single transaction: the
|
||||
// repo already ships migrations using CREATE INDEX CONCURRENTLY,
|
||||
// which Postgres rejects inside a transaction block.
|
||||
conn, err := pool.Acquire(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("acquire migration connection: %w", err)
|
||||
}
|
||||
defer conn.Release()
|
||||
|
||||
if _, err := conn.Exec(ctx, "SELECT pg_advisory_lock($1)", lockKey); err != nil {
|
||||
return fmt.Errorf("acquire migration advisory lock: %w", err)
|
||||
}
|
||||
// Best-effort explicit unlock on the success path. On error returns
|
||||
// the defer still runs; on os.Exit error paths in main() it does not,
|
||||
// but session-level advisory locks are released automatically when
|
||||
// the connection closes at process exit, so the next runner is never
|
||||
// permanently blocked.
|
||||
defer func() {
|
||||
if _, err := conn.Exec(ctx, "SELECT pg_advisory_unlock($1)", lockKey); err != nil {
|
||||
slog.Warn("failed to release migration advisory lock", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Create migrations tracking table.
|
||||
if _, err := conn.Exec(ctx, fmt.Sprintf(`
|
||||
CREATE TABLE IF NOT EXISTS %s (
|
||||
version TEXT PRIMARY KEY,
|
||||
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
)
|
||||
`, tableIdent)); err != nil {
|
||||
return fmt.Errorf("create migrations table: %w", err)
|
||||
}
|
||||
|
||||
existsSQL := fmt.Sprintf("SELECT EXISTS(SELECT 1 FROM %s WHERE version = $1)", tableIdent)
|
||||
insertSQL := fmt.Sprintf("INSERT INTO %s (version) VALUES ($1)", tableIdent)
|
||||
deleteSQL := fmt.Sprintf("DELETE FROM %s WHERE version = $1", tableIdent)
|
||||
|
||||
for _, file := range opts.Files {
|
||||
version := migrations.ExtractVersion(file)
|
||||
|
||||
if direction == "up" {
|
||||
// Check if already applied
|
||||
var exists bool
|
||||
err := conn.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)", version).Scan(&exists)
|
||||
if err != nil {
|
||||
slog.Error("failed to check migration status", "version", version, "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
var exists bool
|
||||
if err := conn.QueryRow(ctx, existsSQL, version).Scan(&exists); err != nil {
|
||||
return fmt.Errorf("check migration %q: %w", version, err)
|
||||
}
|
||||
|
||||
if opts.Direction == "up" {
|
||||
if exists {
|
||||
fmt.Printf(" skip %s (already applied)\n", version)
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// Check if applied (only rollback applied ones)
|
||||
var exists bool
|
||||
err := conn.QueryRow(ctx, "SELECT EXISTS(SELECT 1 FROM schema_migrations WHERE version = $1)", version).Scan(&exists)
|
||||
if err != nil {
|
||||
slog.Error("failed to check migration status", "version", version, "error", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if !exists {
|
||||
fmt.Printf(" skip %s (not applied)\n", version)
|
||||
continue
|
||||
@@ -181,44 +253,65 @@ func main() {
|
||||
|
||||
sql, err := os.ReadFile(file)
|
||||
if err != nil {
|
||||
slog.Error("failed to read migration file", "file", file, "error", err)
|
||||
os.Exit(1)
|
||||
return fmt.Errorf("read migration %q: %w", file, err)
|
||||
}
|
||||
|
||||
// Run any pre-migration hook before the SQL file. The hook
|
||||
// uses the pool, not the conn pinned for migrationAdvisoryLockKey,
|
||||
// so it can acquire other session-level locks without
|
||||
// colliding with the migrate loop's lock. Hook failures abort
|
||||
// the run before schema_migrations is updated, so the same
|
||||
// version can be retried cleanly on the next invocation.
|
||||
if direction == "up" {
|
||||
if hook, ok := preMigrationHooks[version]; ok {
|
||||
// Run any pre-migration hook before the SQL file. Hooks
|
||||
// receive the *pgxpool.Pool (not the loop's pinned conn), so
|
||||
// they can acquire other session-level locks without
|
||||
// colliding with migrationAdvisoryLockKey. Hook failures
|
||||
// abort the run before schema_migrations is updated, so the
|
||||
// same version retries cleanly on the next invocation.
|
||||
if opts.Direction == "up" {
|
||||
if hook, ok := opts.Hooks[version]; ok && hook != nil {
|
||||
slog.Info("running pre-migration hook", "version", version)
|
||||
if err := hook(ctx, pool); err != nil {
|
||||
slog.Error("pre-migration hook failed", "version", version, "error", err)
|
||||
os.Exit(1)
|
||||
return fmt.Errorf("pre-migration hook for %q: %w", version, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_, err = conn.Exec(ctx, string(sql))
|
||||
if err != nil {
|
||||
slog.Error("failed to run migration", "file", file, "error", err)
|
||||
os.Exit(1)
|
||||
if _, err := conn.Exec(ctx, string(sql)); err != nil {
|
||||
return fmt.Errorf("apply migration %q: %w", file, err)
|
||||
}
|
||||
|
||||
if direction == "up" {
|
||||
_, err = conn.Exec(ctx, "INSERT INTO schema_migrations (version) VALUES ($1)", version)
|
||||
if opts.Direction == "up" {
|
||||
_, err = conn.Exec(ctx, insertSQL, version)
|
||||
} else {
|
||||
_, err = conn.Exec(ctx, "DELETE FROM schema_migrations WHERE version = $1", version)
|
||||
_, err = conn.Exec(ctx, deleteSQL, version)
|
||||
}
|
||||
if err != nil {
|
||||
slog.Error("failed to record migration", "version", version, "error", err)
|
||||
os.Exit(1)
|
||||
return fmt.Errorf("record migration %q: %w", version, err)
|
||||
}
|
||||
|
||||
fmt.Printf(" %s %s\n", direction, version)
|
||||
fmt.Printf(" %s %s\n", opts.Direction, version)
|
||||
}
|
||||
|
||||
fmt.Println("Done.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// quoteQualifiedIdentifier safely quotes either an unqualified table
|
||||
// name ("foo") or a schema-qualified name ("schema.foo") for embedding
|
||||
// into a SQL statement. Postgres does not let parametrized queries
|
||||
// supply identifiers, so we have to interpolate, but pgx.Identifier
|
||||
// does the right escaping (double-quotes, embedded-quote handling).
|
||||
//
|
||||
// The accepted shape is exactly one or two dot-separated components.
|
||||
// Names containing more than one dot are rejected outright rather than
|
||||
// silently sanitized into a "schema"."b.c" reference, which is valid
|
||||
// SQL but almost certainly not what the caller meant.
|
||||
func quoteQualifiedIdentifier(name string) (string, error) {
|
||||
if name == "" {
|
||||
return "", fmt.Errorf("empty identifier")
|
||||
}
|
||||
parts := strings.Split(name, ".")
|
||||
if len(parts) > 2 {
|
||||
return "", fmt.Errorf("identifier %q has more than one dot; only schema.table is supported", name)
|
||||
}
|
||||
for _, p := range parts {
|
||||
if p == "" {
|
||||
return "", fmt.Errorf("empty component in %q", name)
|
||||
}
|
||||
}
|
||||
return pgx.Identifier(parts).Sanitize(), nil
|
||||
}
|
||||
|
||||
499
server/cmd/migrate/migrate_concurrent_test.go
Normal file
499
server/cmd/migrate/migrate_concurrent_test.go
Normal file
@@ -0,0 +1,499 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// MUL-2956 — concurrent migration race test.
|
||||
//
|
||||
// PR multica-ai/multica#3658 (MUL-2923) added a Postgres advisory lock
|
||||
// around the migration loop to serialize concurrent runners. This file
|
||||
// is the live-Postgres test that proves the lock is actually doing its
|
||||
// job. We run N goroutines that all call runMigrations against the same
|
||||
// database with the same options, and assert:
|
||||
//
|
||||
// 1. Pending: when migrations have NOT been applied, every goroutine
|
||||
// returns nil and exactly one application of each migration lands
|
||||
// in the bookkeeping table — no duplicate-key blow-ups, no missing
|
||||
// rows, and (since our test fixtures are deliberately non-idempotent
|
||||
// bare CREATE TABLE / ALTER TABLE) no "relation already exists"
|
||||
// failures from the SQL itself, which would prove the lock isn't
|
||||
// serializing.
|
||||
// 2. Already applied: rerunning the same N-way race against the just-
|
||||
// populated bookkeeping table sends every goroutine down the EXISTS
|
||||
// no-op path; nobody re-applies anything and the underlying schema
|
||||
// is unchanged.
|
||||
// 3. Lock serialization: while one connection holds the same advisory
|
||||
// lock externally, every concurrent runMigrations is observed to
|
||||
// wait, and only after the external holder releases does the lock
|
||||
// get acquired. This catches the regression where the lock would
|
||||
// get attached to a random pooled connection (the bug fixed in
|
||||
// MUL-2923 / #3658) and effectively become a no-op.
|
||||
//
|
||||
// The test connects to whatever DATABASE_URL points at (default
|
||||
// postgres://multica:multica@localhost:5432/multica?sslmode=disable),
|
||||
// matching the harness pattern already used in
|
||||
// server/internal/handler/handler_test.go and
|
||||
// server/internal/metrics/business_sampler_pgsleep_test.go. If
|
||||
// Postgres is unreachable the suite skips cleanly, the same way every
|
||||
// other live-Postgres test in the repo skips, so CI without a database
|
||||
// sees SKIP rather than failure.
|
||||
//
|
||||
// Each test isolates itself by creating a unique throwaway schema
|
||||
// (migrate_test_<timestamp>_<rand>) and using a unique advisory-lock
|
||||
// key per run. That means the test never touches the real
|
||||
// schema_migrations table and never blocks behind a real production
|
||||
// migration runner sharing the same database. The schema is dropped
|
||||
// during cleanup.
|
||||
|
||||
const (
|
||||
// concurrentRunners is the goroutine count for the race tests. Set
|
||||
// large enough that a missing lock would reliably trip on a multi-
|
||||
// core box with -race, but small enough to keep the suite fast on a
|
||||
// single shared Postgres.
|
||||
concurrentRunners = 16
|
||||
// raceTestTimeout bounds every individual concurrent step; if the
|
||||
// lock implementation regresses into a deadlock we fail loudly
|
||||
// instead of hanging the suite.
|
||||
raceTestTimeout = 60 * time.Second
|
||||
)
|
||||
|
||||
func openTestPool(t *testing.T) *pgxpool.Pool {
|
||||
t.Helper()
|
||||
dbURL := os.Getenv("DATABASE_URL")
|
||||
if dbURL == "" {
|
||||
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
pool, err := pgxpool.New(ctx, dbURL)
|
||||
if err != nil {
|
||||
t.Skipf("could not connect to %s: %v", dbURL, err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
t.Skipf("database not reachable at %s: %v", dbURL, err)
|
||||
}
|
||||
t.Cleanup(pool.Close)
|
||||
return pool
|
||||
}
|
||||
|
||||
// fixture is the per-test sandbox: a private schema, a unique advisory
|
||||
// lock key, and a temp directory full of deliberately non-idempotent
|
||||
// migration SQL files.
|
||||
type fixture struct {
|
||||
pool *pgxpool.Pool
|
||||
schema string
|
||||
tableFQN string // e.g. "migrate_test_xyz"."schema_migrations"
|
||||
lockKey int64
|
||||
files []string // sorted .up.sql paths
|
||||
versions []string // matching versions
|
||||
tableNames []string // distinct test tables each migration creates
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T) *fixture {
|
||||
t.Helper()
|
||||
pool := openTestPool(t)
|
||||
|
||||
// Unique schema and lock key per test invocation. We salt with both
|
||||
// nanos and a process-local random so re-running with -count=N still
|
||||
// gets a distinct sandbox per iteration even if the wall clock has
|
||||
// not visibly advanced.
|
||||
suffix := fmt.Sprintf("%d_%d", time.Now().UnixNano(), rand.Uint32())
|
||||
schema := "migrate_test_" + suffix
|
||||
tableFQN := schema + ".schema_migrations"
|
||||
// Random non-zero positive int64. The high bit is masked off to
|
||||
// keep this in the same numeric range pg_advisory_lock expects, and
|
||||
// the OR with 1 guarantees we never end up at zero. Collision with
|
||||
// the production migrationAdvisoryLockKey constant is not strictly
|
||||
// impossible — both are int64 — but the probability is on the order
|
||||
// of 1 in 2^62, which is negligible for a unit-test sandbox.
|
||||
lockKey := int64(rand.Uint64()&0x7fffffffffffffff) | 1
|
||||
|
||||
ctx := context.Background()
|
||||
if _, err := pool.Exec(ctx, fmt.Sprintf(`CREATE SCHEMA %s`, pgx.Identifier{schema}.Sanitize())); err != nil {
|
||||
t.Fatalf("create schema: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
// Use a fresh context so cleanup still runs if the test ctx was
|
||||
// cancelled. Drop CASCADE to take everything down even when a
|
||||
// half-applied migration left orphan tables behind.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
if _, err := pool.Exec(ctx, fmt.Sprintf(`DROP SCHEMA IF EXISTS %s CASCADE`, pgx.Identifier{schema}.Sanitize())); err != nil {
|
||||
t.Logf("drop schema %s: %v", schema, err)
|
||||
}
|
||||
})
|
||||
|
||||
// Build a small set of deliberately non-idempotent migrations. Each
|
||||
// one creates a distinct table inside our schema. Bare CREATE TABLE
|
||||
// (no IF NOT EXISTS) and ALTER TABLE ADD COLUMN (no IF NOT EXISTS)
|
||||
// guarantee that if two goroutines actually ran the same migration
|
||||
// in parallel, the second one would error with "relation already
|
||||
// exists" / "column already exists" — which is exactly the failure
|
||||
// signature we want the test to catch when the lock regresses.
|
||||
dir := t.TempDir()
|
||||
const numFiles = 5
|
||||
files := make([]string, 0, numFiles)
|
||||
versions := make([]string, 0, numFiles)
|
||||
tableNames := make([]string, 0, numFiles)
|
||||
for i := 0; i < numFiles; i++ {
|
||||
version := fmt.Sprintf("%03d_test_%s", i+1, suffix)
|
||||
tableName := fmt.Sprintf("t_%s_%d", suffix, i+1)
|
||||
// Reference both the schema and the table, then add a column in
|
||||
// a follow-up statement. Either statement run twice (i.e.
|
||||
// concurrent re-application by another goroutine that won the
|
||||
// race past the EXISTS check) would error.
|
||||
body := fmt.Sprintf(
|
||||
"CREATE TABLE %s.%s (id BIGSERIAL PRIMARY KEY);\n"+
|
||||
"ALTER TABLE %s.%s ADD COLUMN payload TEXT NOT NULL DEFAULT '';\n",
|
||||
pgx.Identifier{schema}.Sanitize(), pgx.Identifier{tableName}.Sanitize(),
|
||||
pgx.Identifier{schema}.Sanitize(), pgx.Identifier{tableName}.Sanitize(),
|
||||
)
|
||||
path := filepath.Join(dir, version+".up.sql")
|
||||
if err := os.WriteFile(path, []byte(body), 0o600); err != nil {
|
||||
t.Fatalf("write migration: %v", err)
|
||||
}
|
||||
files = append(files, path)
|
||||
versions = append(versions, version)
|
||||
tableNames = append(tableNames, tableName)
|
||||
}
|
||||
sort.Strings(files)
|
||||
|
||||
return &fixture{
|
||||
pool: pool,
|
||||
schema: schema,
|
||||
tableFQN: tableFQN,
|
||||
lockKey: lockKey,
|
||||
files: files,
|
||||
versions: versions,
|
||||
tableNames: tableNames,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fixture) opts() runOptions {
|
||||
return runOptions{
|
||||
Direction: "up",
|
||||
Files: f.files,
|
||||
SchemaMigrationsTable: f.tableFQN,
|
||||
AdvisoryLockKey: f.lockKey,
|
||||
}
|
||||
}
|
||||
|
||||
// appliedVersions returns the versions recorded in the bookkeeping
|
||||
// table, sorted ascending. Empty slice means the table is empty (or
|
||||
// does not yet exist, which the helper reports as a fatal error).
|
||||
func (f *fixture) appliedVersions(t *testing.T) []string {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
rows, err := f.pool.Query(ctx,
|
||||
fmt.Sprintf(`SELECT version FROM %s ORDER BY version`,
|
||||
pgx.Identifier{f.schema, "schema_migrations"}.Sanitize()))
|
||||
if err != nil {
|
||||
t.Fatalf("read schema_migrations: %v", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
var got []string
|
||||
for rows.Next() {
|
||||
var v string
|
||||
if err := rows.Scan(&v); err != nil {
|
||||
t.Fatalf("scan version: %v", err)
|
||||
}
|
||||
got = append(got, v)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
t.Fatalf("rows.Err: %v", err)
|
||||
}
|
||||
return got
|
||||
}
|
||||
|
||||
// tableExists checks that a given table is present inside the fixture
|
||||
// schema. Used to confirm migrations actually executed, not just that
|
||||
// the bookkeeping rows landed.
|
||||
func (f *fixture) tableExists(t *testing.T, name string) bool {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
var exists bool
|
||||
if err := f.pool.QueryRow(ctx, `
|
||||
SELECT EXISTS (
|
||||
SELECT 1 FROM information_schema.tables
|
||||
WHERE table_schema = $1 AND table_name = $2
|
||||
)`, f.schema, name).Scan(&exists); err != nil {
|
||||
t.Fatalf("check table %s.%s: %v", f.schema, name, err)
|
||||
}
|
||||
return exists
|
||||
}
|
||||
|
||||
// TestRunMigrationsConcurrentPending fires N goroutines at runMigrations
|
||||
// against a fresh schema where none of the migrations have been applied
|
||||
// yet. The advisory lock must serialize them so that exactly one of
|
||||
// them executes each CREATE TABLE / ALTER TABLE and exactly one row
|
||||
// per migration lands in schema_migrations. If the lock is broken,
|
||||
// either the SQL fails ("relation already exists") or the bookkeeping
|
||||
// table picks up duplicate-key violations on the version primary key.
|
||||
func TestRunMigrationsConcurrentPending(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), raceTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
// errgroup limits concurrency only when SetLimit is called; we want
|
||||
// every goroutine running at once so they all queue on the lock.
|
||||
for i := 0; i < concurrentRunners; i++ {
|
||||
g.Go(func() error {
|
||||
return runMigrations(gctx, f.pool, f.opts())
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
t.Fatalf("concurrent runMigrations(up) on pending schema returned error: %v", err)
|
||||
}
|
||||
|
||||
got := f.appliedVersions(t)
|
||||
if want := f.versions; !equalStrings(got, want) {
|
||||
t.Fatalf("schema_migrations contents = %v, want %v", got, want)
|
||||
}
|
||||
for _, tbl := range f.tableNames {
|
||||
if !f.tableExists(t, tbl) {
|
||||
t.Fatalf("expected table %s.%s to exist after concurrent up, missing", f.schema, tbl)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunMigrationsConcurrentAlreadyApplied first applies the
|
||||
// migrations once (single-threaded, to establish a clean baseline) and
|
||||
// then fires N goroutines at runMigrations again. Every goroutine must
|
||||
// hit the EXISTS no-op path and return nil, the bookkeeping table must
|
||||
// stay exactly the way the baseline left it, and the underlying tables
|
||||
// must not have been touched (no duplicate CREATE / ALTER blow-ups).
|
||||
//
|
||||
// This is the path that matters in production: most pod restarts find
|
||||
// the database fully migrated and just need to confirm-and-skip.
|
||||
func TestRunMigrationsConcurrentAlreadyApplied(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), raceTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Baseline single-threaded run.
|
||||
if err := runMigrations(ctx, f.pool, f.opts()); err != nil {
|
||||
t.Fatalf("baseline runMigrations: %v", err)
|
||||
}
|
||||
baseline := f.appliedVersions(t)
|
||||
if !equalStrings(baseline, f.versions) {
|
||||
t.Fatalf("baseline schema_migrations = %v, want %v", baseline, f.versions)
|
||||
}
|
||||
|
||||
// Concurrent re-run: every goroutine should hit the EXISTS no-op
|
||||
// branch and return nil.
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
for i := 0; i < concurrentRunners; i++ {
|
||||
g.Go(func() error {
|
||||
return runMigrations(gctx, f.pool, f.opts())
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
t.Fatalf("concurrent runMigrations(up) on already-applied schema returned error: %v", err)
|
||||
}
|
||||
|
||||
got := f.appliedVersions(t)
|
||||
if !equalStrings(got, baseline) {
|
||||
t.Fatalf("schema_migrations changed after concurrent re-run: got %v, want %v", got, baseline)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunMigrationsAdvisoryLockSerializes proves the lock genuinely
|
||||
// blocks contenders. We acquire the same advisory key on a side
|
||||
// connection BEFORE spawning any runMigrations goroutine, then start N
|
||||
// goroutines and watch how many of them have made it past the lock
|
||||
// acquire. The expectation:
|
||||
//
|
||||
// - While the side connection holds the lock, zero goroutines have
|
||||
// completed (we observe via a small delay + count-check).
|
||||
// - The moment the side connection releases the lock, the goroutines
|
||||
// start unblocking and finish in well under the test timeout.
|
||||
//
|
||||
// If the advisory lock had regressed back to attaching to a random
|
||||
// pooled connection (the original MUL-2923 bug), the side-held lock
|
||||
// would not actually block a fresh pool.Acquire from grabbing its own
|
||||
// connection without the lock, and the goroutines would all complete
|
||||
// while the lock was still "held" — which is exactly what this test
|
||||
// detects.
|
||||
func TestRunMigrationsAdvisoryLockSerializes(t *testing.T) {
|
||||
f := newFixture(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), raceTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Acquire the lock on a pinned side connection. We use a *pgx.Conn
|
||||
// (not pool.Acquire) so the lock holder is not reachable through
|
||||
// the pool the runMigrations goroutines draw from — the lock is
|
||||
// session-scoped and we want the behaviour to be "the next pool
|
||||
// connection that calls pg_advisory_lock blocks", not "the same
|
||||
// connection re-enters". (pg_advisory_lock is reentrant on the same
|
||||
// session, so re-acquiring on the same conn would not actually
|
||||
// prove serialization.)
|
||||
dbURL := os.Getenv("DATABASE_URL")
|
||||
if dbURL == "" {
|
||||
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
||||
}
|
||||
holder, err := pgx.Connect(ctx, dbURL)
|
||||
if err != nil {
|
||||
t.Fatalf("side connect: %v", err)
|
||||
}
|
||||
defer holder.Close(context.Background())
|
||||
if _, err := holder.Exec(ctx, "SELECT pg_advisory_lock($1)", f.lockKey); err != nil {
|
||||
t.Fatalf("side acquire lock: %v", err)
|
||||
}
|
||||
|
||||
var done int64
|
||||
var startedAt = time.Now()
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
for i := 0; i < concurrentRunners; i++ {
|
||||
g.Go(func() error {
|
||||
err := runMigrations(gctx, f.pool, f.opts())
|
||||
atomic.AddInt64(&done, 1)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
// Watchdog: while the side holder still has the lock, no
|
||||
// runMigrations goroutine should have completed. We sample for a
|
||||
// generous window (1 s) — much longer than the trivial migration
|
||||
// set takes to apply on the unlocked path — to give a regressed
|
||||
// implementation room to incorrectly succeed.
|
||||
const observeWindow = 1 * time.Second
|
||||
const observeStep = 50 * time.Millisecond
|
||||
deadline := time.Now().Add(observeWindow)
|
||||
for time.Now().Before(deadline) {
|
||||
if n := atomic.LoadInt64(&done); n != 0 {
|
||||
t.Fatalf("advisory lock did not block: %d/%d goroutines finished while side connection held the lock for %s",
|
||||
n, concurrentRunners, time.Since(startedAt))
|
||||
}
|
||||
time.Sleep(observeStep)
|
||||
}
|
||||
|
||||
// Release the side lock and wait for all goroutines to finish.
|
||||
if _, err := holder.Exec(ctx, "SELECT pg_advisory_unlock($1)", f.lockKey); err != nil {
|
||||
t.Fatalf("side release lock: %v", err)
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
t.Fatalf("concurrent runMigrations after lock release returned error: %v", err)
|
||||
}
|
||||
|
||||
// Sanity: state ended up as the pending case did — exactly one
|
||||
// application of every migration.
|
||||
if got, want := f.appliedVersions(t), f.versions; !equalStrings(got, want) {
|
||||
t.Fatalf("schema_migrations after lock-release race = %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRunMigrationsConcurrentMixedPoolStress runs the pending case
|
||||
// against a deliberately under-sized pool to put pressure on the
|
||||
// "every runner needs its own pinned connection for the lock" code
|
||||
// path. If runMigrations ever regresses into using pool.Exec (which
|
||||
// could give the lock and the migration steps different connections),
|
||||
// this test will deadlock or produce SQL races. Pool size strictly
|
||||
// less than runners is the interesting configuration.
|
||||
func TestRunMigrationsConcurrentMixedPoolStress(t *testing.T) {
|
||||
dbURL := os.Getenv("DATABASE_URL")
|
||||
if dbURL == "" {
|
||||
dbURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
||||
}
|
||||
cfg, err := pgxpool.ParseConfig(dbURL)
|
||||
if err != nil {
|
||||
t.Skipf("parse DATABASE_URL: %v", err)
|
||||
}
|
||||
// Small pool: half the runner count, minimum 2. This forces
|
||||
// runners to wait on pgxpool.Acquire AND on pg_advisory_lock,
|
||||
// exercising the same connection lifecycle a real multi-replica
|
||||
// startup would.
|
||||
cfg.MaxConns = int32(concurrentRunners / 2)
|
||||
if cfg.MaxConns < 2 {
|
||||
cfg.MaxConns = 2
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), raceTestTimeout)
|
||||
defer cancel()
|
||||
pool, err := pgxpool.NewWithConfig(ctx, cfg)
|
||||
if err != nil {
|
||||
t.Skipf("could not open small pool: %v", err)
|
||||
}
|
||||
defer pool.Close()
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
t.Skipf("small pool not reachable: %v", err)
|
||||
}
|
||||
|
||||
// Reuse the standard fixture's schema/files/lock-key wiring but
|
||||
// substitute the small-pool *pgxpool.Pool so the test exercises a
|
||||
// different connection budget.
|
||||
big := newFixture(t)
|
||||
f := *big
|
||||
f.pool = pool
|
||||
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
var startedOnce sync.Once
|
||||
startedAt := time.Time{}
|
||||
for i := 0; i < concurrentRunners; i++ {
|
||||
g.Go(func() error {
|
||||
startedOnce.Do(func() { startedAt = time.Now() })
|
||||
return runMigrations(gctx, f.pool, f.opts())
|
||||
})
|
||||
}
|
||||
if err := g.Wait(); err != nil {
|
||||
t.Fatalf("small-pool concurrent runMigrations error after %s: %v", time.Since(startedAt), err)
|
||||
}
|
||||
if got, want := big.appliedVersions(t), big.versions; !equalStrings(got, want) {
|
||||
t.Fatalf("small-pool schema_migrations = %v, want %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func equalStrings(a, b []string) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// TestRunMigrationsRejectsInvalidDirection pins the direction
|
||||
// whitelist contract: anything other than "up" or "down" must error
|
||||
// before runMigrations touches the pool. This prevents the subtle bug
|
||||
// where an empty or typo'd direction silently fell through to the
|
||||
// "down" branch (`opts.Direction == "up"` is false → else branch
|
||||
// handles it as a rollback).
|
||||
//
|
||||
// The check runs ahead of any pool/conn use, so passing nil is safe
|
||||
// and lets this case execute without a live Postgres.
|
||||
func TestRunMigrationsRejectsInvalidDirection(t *testing.T) {
|
||||
bad := []string{"", "UP", "DOWN", "rollback", "x", " up "}
|
||||
for _, dir := range bad {
|
||||
err := runMigrations(context.Background(), nil, runOptions{Direction: dir})
|
||||
if err == nil {
|
||||
t.Errorf("direction %q: want error, got nil", dir)
|
||||
continue
|
||||
}
|
||||
if !strings.Contains(err.Error(), "invalid direction") {
|
||||
t.Errorf("direction %q: error %q does not mention 'invalid direction'", dir, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -19,10 +19,13 @@ require (
|
||||
github.com/oklog/ulid/v2 v2.1.1
|
||||
github.com/pelletier/go-toml/v2 v2.3.0
|
||||
github.com/prometheus/client_golang v1.23.2
|
||||
github.com/prometheus/client_model v0.6.2
|
||||
github.com/redis/go-redis/v9 v9.18.0
|
||||
github.com/resend/resend-go/v2 v2.28.0
|
||||
github.com/robfig/cron/v3 v3.0.1
|
||||
github.com/spf13/cobra v1.10.2
|
||||
golang.org/x/sync v0.20.0
|
||||
google.golang.org/protobuf v1.36.8
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
@@ -52,14 +55,11 @@ require (
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.66.1 // indirect
|
||||
github.com/prometheus/procfs v0.16.1 // indirect
|
||||
github.com/spf13/pflag v1.0.9 // indirect
|
||||
go.uber.org/atomic v1.11.0 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.2 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
google.golang.org/protobuf v1.36.8 // indirect
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user