Compare commits

...

1 Commits

Author SHA1 Message Date
J
99bfc0db83 test(rollup): serialise shared-singleton rollup tests across packages (MUL-3980)
`go test ./...` compiles internal/handler and internal/scheduler into
separate binaries and runs them in parallel against the same DATABASE_URL.
Both mutate the global task_usage_hourly_rollup_state singleton (id=1) and
contend for the rollup function's advisory lock 4246, so under `-race` on CI
they interleave and fail flakily:

  - TestRollupTaskUsageHourlyCapsWindowAtOneDay reads the scheduler test's
    forced-back watermark (0.063 days ≈ the scheduler's now-90min) instead of
    "now".
  - TestPgCronConcurrentNoDoubleWrite sees a handler rollup tick advance the
    watermark past its window, yielding winners=0.

Add a dedicated session-level advisory lock (42463980, distinct from the
function's own 4246) that every test touching the singleton acquires for its
duration, serialising them across test processes. Reproduced the exact CI
failures on a concurrent stress loop (5/5 rounds) and confirmed the guard
eliminates them (8/8 rounds green).

Co-authored-by: multica-agent <github@multica.ai>
2026-07-02 17:07:52 +08:00
4 changed files with 131 additions and 0 deletions

View File

@@ -432,6 +432,9 @@ func TestRollupTaskUsageHourlyIdempotentAndWatermark(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
// Serialise against any other package's test that touches the shared
// rollup singleton / advisory lock 4246 (MUL-3980).
lockRollupSingleton(t)
ctx := context.Background()
var runtimeID, agentID string
@@ -1053,6 +1056,10 @@ func TestPruneTaskUsageHourlyDirty(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
// This test calls rollup_task_usage_hourly(), which advances the shared
// watermark as a side effect; serialise so it does not perturb another
// package's rollup test (MUL-3980).
lockRollupSingleton(t)
ctx := context.Background()
// task_usage_hourly_dirty carries no FKs (it is a queue), so synthetic
@@ -1132,6 +1139,11 @@ func TestRollupTaskUsageHourlyCapsWindowAtOneDay(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
// Serialise against any other package's rollup test (MUL-3980). Acquire
// the guard BEFORE registering the watermark-restore cleanup below so
// that cleanup (LIFO) runs while the guard is still held, and the guard
// is released last.
lockRollupSingleton(t)
ctx := context.Background()
// Other tests drive rollup_task_usage_hourly_window directly and never

View File

@@ -0,0 +1,57 @@
package handler
import (
"context"
"testing"
)
// rollupSingletonTestLock guards the global task_usage_hourly_rollup_state
// singleton (id = 1) and the cron entrypoint rollup_task_usage_hourly()
// across concurrently-running test binaries.
//
// `go test ./...` compiles internal/handler and internal/scheduler into
// separate binaries and runs them in parallel against the SAME
// DATABASE_URL. Both mutate that one singleton row and contend for the
// function's own advisory lock 4246. Without a cross-process guard they
// interleave and fail flakily (MUL-3980): the scheduler's
// TestPgCronConcurrentNoDoubleWrite forces the watermark ~90 min back and
// expects exactly one of six concurrent callers to advance it, while a
// handler rollup tick concurrently advances the same watermark past the
// window — so the scheduler sees winners=0, and the handler's
// TestRollupTaskUsageHourlyCapsWindowAtOneDay reads the scheduler's
// 90-min-old watermark (0.063 days) instead of "now".
//
// A dedicated session-level advisory lock — deliberately distinct from the
// rollup function's own 4246 (reusing 4246 would make the function's
// pg_try_advisory_lock fail from other pool connections) — serialises every
// test that touches the singleton. 42463980 = 4246 (rollup family) + 3980
// (the tracking issue) and collides with no production key.
const rollupSingletonTestLock int64 = 42463980
// lockRollupSingleton blocks until this test owns the rollup-singleton guard,
// then releases it (and the pinned connection) on cleanup. Call it at the very
// TOP of any test that writes task_usage_hourly_rollup_state or invokes
// rollup_task_usage_hourly(). Because it registers its release via t.Cleanup
// first, later cleanups in the same test (e.g. restoring the watermark) still
// run while the guard is held, then the guard is released last.
func lockRollupSingleton(t *testing.T) {
t.Helper()
ctx := context.Background()
// Advisory locks are per-session, so pin one connection for the lock's
// lifetime (same idiom as internal/taskusagebackfill/backfill.go).
conn, err := testPool.Acquire(ctx)
if err != nil {
t.Fatalf("acquire rollup-guard connection: %v", err)
}
if _, err := conn.Exec(ctx, `SELECT pg_advisory_lock($1)`, rollupSingletonTestLock); err != nil {
conn.Release()
t.Fatalf("acquire rollup singleton guard: %v", err)
}
t.Cleanup(func() {
// Fresh context so a cancelled test context cannot skip the unlock.
if _, err := conn.Exec(context.Background(), `SELECT pg_advisory_unlock($1)`, rollupSingletonTestLock); err != nil {
t.Logf("release rollup singleton guard: %v", err)
}
conn.Release()
})
}

View File

@@ -39,6 +39,13 @@ func TestPgCronConcurrentNoDoubleWrite(t *testing.T) {
pool := integrationPool(t)
ctx := context.Background()
// This test forces the shared rollup watermark backwards and asserts
// exactly one of N concurrent callers advances it. Serialise against any
// other package's rollup test running concurrently against the same DB,
// otherwise a stray rollup tick advances the watermark past our window
// and we see winners=0 (MUL-3980).
lockRollupSingleton(t, pool)
// Seed an isolated workspace/runtime/agent/task and a handful of
// task_usage rows landing in the same UTC hour bucket. The bucket
// math is the SQL helper task_usage_hour_bucket(...).

View File

@@ -0,0 +1,55 @@
package scheduler
import (
"context"
"testing"
"github.com/jackc/pgx/v5/pgxpool"
)
// rollupSingletonTestLock guards the global task_usage_hourly_rollup_state
// singleton (id = 1) and the cron entrypoint rollup_task_usage_hourly()
// across concurrently-running test binaries.
//
// `go test ./...` compiles internal/scheduler and internal/handler into
// separate binaries and runs them in parallel against the SAME DATABASE_URL.
// Both mutate that one singleton row and contend for the function's own
// advisory lock 4246. Without a cross-process guard they interleave and fail
// flakily (MUL-3980): TestPgCronConcurrentNoDoubleWrite forces the watermark
// back and expects exactly one of six concurrent callers to advance it, but a
// handler rollup tick concurrently pushes the same watermark past the window,
// so this test sees winners=0.
//
// A dedicated session-level advisory lock — deliberately distinct from the
// rollup function's own 4246 (reusing 4246 would make the function's
// pg_try_advisory_lock fail from other pool connections) — serialises every
// test that touches the singleton. The value MUST match the one in
// internal/handler/rollup_guard_test.go so the two binaries serialise against
// each other.
const rollupSingletonTestLock int64 = 42463980
// lockRollupSingleton blocks until this test owns the rollup-singleton guard,
// then releases it (and the pinned connection) on cleanup. Call it at the very
// TOP of any test that writes task_usage_hourly_rollup_state or invokes
// rollup_task_usage_hourly().
func lockRollupSingleton(t *testing.T, pool *pgxpool.Pool) {
t.Helper()
ctx := context.Background()
// Advisory locks are per-session, so pin one connection for the lock's
// lifetime (same idiom as internal/taskusagebackfill/backfill.go).
conn, err := pool.Acquire(ctx)
if err != nil {
t.Fatalf("acquire rollup-guard connection: %v", err)
}
if _, err := conn.Exec(ctx, `SELECT pg_advisory_lock($1)`, rollupSingletonTestLock); err != nil {
conn.Release()
t.Fatalf("acquire rollup singleton guard: %v", err)
}
t.Cleanup(func() {
// Fresh context so a cancelled test context cannot skip the unlock.
if _, err := conn.Exec(context.Background(), `SELECT pg_advisory_unlock($1)`, rollupSingletonTestLock); err != nil {
t.Logf("release rollup singleton guard: %v", err)
}
conn.Release()
})
}