mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
400 lines
13 KiB
Go
400 lines
13 KiB
Go
package taskusagebackfill_test
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/multica-ai/multica/server/internal/migrations"
|
|
"github.com/multica-ai/multica/server/internal/taskusagebackfill"
|
|
)
|
|
|
|
// TestHook_DirectV034Upgrade simulates the path described in
|
|
// docs/db-backed-execution-scheduler-rfc.md §12.3 ("从 v0.3.4 直接升级
|
|
// 到带 scheduler 的版本") and RFC §14 row "direct v0.3.4 upgrade":
|
|
//
|
|
// - apply migrations through 102 (legacy daily rollups + new hourly
|
|
// schema/pipeline are both live);
|
|
// - seed a `task_usage` row dated > 1 hour ago, watermark left at
|
|
// 1970-01-01 (the post-101 default);
|
|
// - migration 103's fail-closed guard would normally abort `migrate
|
|
// up` here because watermark < max_event - 1h;
|
|
// - run taskusagebackfill.Hook (the migrator hook from MUL-2957);
|
|
// - the hook performs an idempotent monthly-slice backfill, stamps
|
|
// the watermark, and migration 103 then passes.
|
|
//
|
|
// The test creates and drops a temporary database so it does not
|
|
// disturb the shared development DB. If the postgres server is not
|
|
// reachable the test skips (mirrors the rest of the integration suite).
|
|
func TestHook_DirectV034Upgrade(t *testing.T) {
|
|
adminURL := os.Getenv("DATABASE_URL")
|
|
if adminURL == "" {
|
|
adminURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
|
}
|
|
|
|
ctx := context.Background()
|
|
if !databaseReachable(ctx, adminURL) {
|
|
t.Skip("integration test requires Postgres at DATABASE_URL")
|
|
}
|
|
|
|
tmpDB := fmt.Sprintf("multica_v034_upgrade_%d", time.Now().UnixNano())
|
|
if err := createDatabase(ctx, adminURL, tmpDB); err != nil {
|
|
t.Fatalf("create temp database %s: %v", tmpDB, err)
|
|
}
|
|
t.Cleanup(func() {
|
|
if err := dropDatabase(context.Background(), adminURL, tmpDB); err != nil {
|
|
t.Logf("drop temp database %s: %v", tmpDB, err)
|
|
}
|
|
})
|
|
|
|
tmpURL := replaceDatabase(adminURL, tmpDB)
|
|
pool, err := pgxpool.New(ctx, tmpURL)
|
|
if err != nil {
|
|
t.Fatalf("connect to temp database: %v", err)
|
|
}
|
|
t.Cleanup(pool.Close)
|
|
|
|
// Apply migrations through 102 (the simulated v0.3.4-equivalent
|
|
// state where the new hourly schema and pipeline are present, but
|
|
// 103 has not yet dropped the legacy daily rollups).
|
|
if err := applyMigrationsUpTo(ctx, pool, "102_task_usage_hourly_pipeline"); err != nil {
|
|
t.Fatalf("apply migrations to 102: %v", err)
|
|
}
|
|
|
|
// Seed: one task_usage row dated 2 hours ago. Triggers fire on
|
|
// INSERT/UPDATE post-102 only via the watermark window in
|
|
// rollup_task_usage_hourly_window, so we leave the watermark at
|
|
// the post-101 default of 1970-01-01.
|
|
wsID, runtimeID, agentID, taskID := seedTaskUsageFixture(t, ctx, pool)
|
|
|
|
twoHoursAgo := time.Now().UTC().Add(-2 * time.Hour)
|
|
if _, err := pool.Exec(ctx, `
|
|
INSERT INTO task_usage (
|
|
task_id, provider, model,
|
|
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
|
created_at
|
|
) VALUES ($1, 'openai', 'gpt-test', 10, 20, 0, 0, $2)
|
|
`, taskID, twoHoursAgo); err != nil {
|
|
t.Fatalf("seed task_usage: %v", err)
|
|
}
|
|
_ = wsID
|
|
_ = runtimeID
|
|
_ = agentID
|
|
|
|
// Confirm migration 103's guard would abort if applied now.
|
|
pendingErr := tryApplyMigration(ctx, pool, "103_drop_legacy_daily_rollups")
|
|
if pendingErr == nil {
|
|
t.Fatalf("migration 103 guard did not trip with stale watermark; preconditions invalid")
|
|
}
|
|
if !strings.Contains(pendingErr.Error(), "refusing to drop legacy daily rollups") {
|
|
t.Fatalf("expected fail-closed guard error, got %v", pendingErr)
|
|
}
|
|
// Roll back the failed transaction state (Postgres connections are
|
|
// fine; we just did not commit anything destructive).
|
|
|
|
// Run the MUL-2957 migrator hook. The hook should:
|
|
// * Read MIN/MAX from task_usage.
|
|
// * Acquire advisory lock 4246 on its own conn.
|
|
// * Walk monthly slices via rollup_task_usage_hourly_window.
|
|
// * Stamp the watermark to now() - 5 minutes.
|
|
res, err := taskusagebackfill.Hook(ctx, pool, taskusagebackfill.HookOptions{})
|
|
if err != nil {
|
|
t.Fatalf("Hook returned error: %v", err)
|
|
}
|
|
if res.Skipped != "" {
|
|
t.Fatalf("Hook should have run a backfill; got skipped=%q", res.Skipped)
|
|
}
|
|
if !res.WatermarkStamped {
|
|
t.Fatalf("Hook should have stamped the watermark")
|
|
}
|
|
if res.SlicesProcessed < 1 {
|
|
t.Fatalf("expected at least 1 slice processed, got %d", res.SlicesProcessed)
|
|
}
|
|
|
|
// Hourly bucket exists now.
|
|
var hourlyRows int
|
|
if err := pool.QueryRow(ctx, `
|
|
SELECT COUNT(*) FROM task_usage_hourly
|
|
`).Scan(&hourlyRows); err != nil {
|
|
t.Fatalf("count task_usage_hourly: %v", err)
|
|
}
|
|
if hourlyRows == 0 {
|
|
t.Fatalf("backfill did not produce any hourly rows; check seed fixture")
|
|
}
|
|
|
|
// Watermark stamped close to "now - 5 min" (loose bound to absorb
|
|
// CI clock skew).
|
|
var watermark time.Time
|
|
if err := pool.QueryRow(ctx, `
|
|
SELECT watermark_at FROM task_usage_hourly_rollup_state WHERE id = 1
|
|
`).Scan(&watermark); err != nil {
|
|
t.Fatalf("read watermark: %v", err)
|
|
}
|
|
expected := time.Now().UTC().Add(-5 * time.Minute)
|
|
if delta := watermark.UTC().Sub(expected); delta > time.Minute || delta < -2*time.Minute {
|
|
t.Fatalf("watermark %s far from expected %s (delta=%s)",
|
|
watermark.Format(time.RFC3339), expected.Format(time.RFC3339), delta)
|
|
}
|
|
|
|
// And now migration 103 must apply cleanly.
|
|
if err := tryApplyMigration(ctx, pool, "103_drop_legacy_daily_rollups"); err != nil {
|
|
t.Fatalf("migration 103 still fails after hook: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestHook_FreshDatabaseStampsWatermarkOnly covers the empty-history
|
|
// branch: a brand new self-host install runs the hook before migration
|
|
// 103, but task_usage is empty. The hook should stamp the watermark
|
|
// (so the guard's "fresh DB" branch passes) and not touch any rows.
|
|
func TestHook_FreshDatabaseStampsWatermarkOnly(t *testing.T) {
|
|
adminURL := os.Getenv("DATABASE_URL")
|
|
if adminURL == "" {
|
|
adminURL = "postgres://multica:multica@localhost:5432/multica?sslmode=disable"
|
|
}
|
|
ctx := context.Background()
|
|
if !databaseReachable(ctx, adminURL) {
|
|
t.Skip("integration test requires Postgres at DATABASE_URL")
|
|
}
|
|
|
|
tmpDB := fmt.Sprintf("multica_v034_fresh_%d", time.Now().UnixNano())
|
|
if err := createDatabase(ctx, adminURL, tmpDB); err != nil {
|
|
t.Fatalf("create temp database: %v", err)
|
|
}
|
|
t.Cleanup(func() {
|
|
_ = dropDatabase(context.Background(), adminURL, tmpDB)
|
|
})
|
|
|
|
pool, err := pgxpool.New(ctx, replaceDatabase(adminURL, tmpDB))
|
|
if err != nil {
|
|
t.Fatalf("pool: %v", err)
|
|
}
|
|
t.Cleanup(pool.Close)
|
|
|
|
if err := applyMigrationsUpTo(ctx, pool, "102_task_usage_hourly_pipeline"); err != nil {
|
|
t.Fatalf("apply migrations to 102: %v", err)
|
|
}
|
|
|
|
res, err := taskusagebackfill.Hook(ctx, pool, taskusagebackfill.HookOptions{})
|
|
if err != nil {
|
|
t.Fatalf("Hook on empty DB: %v", err)
|
|
}
|
|
if res.Skipped != "task_usage_empty" {
|
|
t.Fatalf("expected skipped=task_usage_empty, got %q", res.Skipped)
|
|
}
|
|
if !res.WatermarkStamped {
|
|
t.Fatalf("watermark must still be stamped on empty DB so 103 can pass")
|
|
}
|
|
|
|
// Migration 103 applies cleanly on the fresh DB — its early-out
|
|
// branch (no task_usage rows) plus the stamped watermark let it
|
|
// through.
|
|
if err := tryApplyMigration(ctx, pool, "103_drop_legacy_daily_rollups"); err != nil {
|
|
t.Fatalf("migration 103 fresh-DB path failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------
|
|
// Test helpers — minimal harness so we can apply migrations to a temp
|
|
// database without coupling to cmd/migrate's exit-on-error model.
|
|
// ---------------------------------------------------------------------
|
|
|
|
// resolveMigrationsDir locates server/migrations relative to this test
|
|
// file's source path. The shared migrations.ResolveDir() walks parent
|
|
// directories looking for any folder literally named "migrations",
|
|
// which collides with the Go package server/internal/migrations and
|
|
// returns the wrong directory. Anchoring on runtime.Caller's file path
|
|
// avoids that ambiguity entirely.
|
|
func resolveMigrationsDir() (string, error) {
|
|
_, here, _, ok := runtime.Caller(0)
|
|
if !ok {
|
|
return "", fmt.Errorf("runtime.Caller(0) failed")
|
|
}
|
|
// here = .../server/internal/taskusagebackfill/hook_test.go
|
|
dir := filepath.Join(filepath.Dir(here), "..", "..", "migrations")
|
|
dir = filepath.Clean(dir)
|
|
if info, err := os.Stat(dir); err != nil || !info.IsDir() {
|
|
return "", fmt.Errorf("migrations dir not at %s", dir)
|
|
}
|
|
return dir, nil
|
|
}
|
|
|
|
func databaseReachable(ctx context.Context, url string) bool {
|
|
pool, err := pgxpool.New(ctx, url)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer pool.Close()
|
|
return pool.Ping(ctx) == nil
|
|
}
|
|
|
|
func createDatabase(ctx context.Context, adminURL, name string) error {
|
|
conn, err := pgx.Connect(ctx, adminURL)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close(ctx)
|
|
if _, err := conn.Exec(ctx, fmt.Sprintf(`CREATE DATABASE %q`, name)); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func dropDatabase(ctx context.Context, adminURL, name string) error {
|
|
conn, err := pgx.Connect(ctx, adminURL)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close(ctx)
|
|
if _, err := conn.Exec(ctx, fmt.Sprintf(`DROP DATABASE IF EXISTS %q WITH (FORCE)`, name)); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// replaceDatabase swaps the database segment of a postgres URL.
|
|
// "postgres://u:p@h:5432/old?x=1" with name="new" -> ".../new?x=1".
|
|
func replaceDatabase(url, name string) string {
|
|
idx := strings.LastIndex(url, "/")
|
|
if idx < 0 {
|
|
return url
|
|
}
|
|
rest := url[idx+1:]
|
|
q := strings.Index(rest, "?")
|
|
if q < 0 {
|
|
return url[:idx+1] + name
|
|
}
|
|
return url[:idx+1] + name + rest[q:]
|
|
}
|
|
|
|
func applyMigrationsUpTo(ctx context.Context, pool *pgxpool.Pool, lastVersion string) error {
|
|
dir, err := resolveMigrationsDir()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
files, err := filepath.Glob(filepath.Join(dir, "*.up.sql"))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sort.Strings(files)
|
|
|
|
if _, err := pool.Exec(ctx, `
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version TEXT PRIMARY KEY,
|
|
applied_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
)
|
|
`); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, f := range files {
|
|
v := migrations.ExtractVersion(f)
|
|
sql, err := os.ReadFile(f)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := pool.Exec(ctx, string(sql)); err != nil {
|
|
return fmt.Errorf("apply %s: %w", v, err)
|
|
}
|
|
if _, err := pool.Exec(ctx,
|
|
`INSERT INTO schema_migrations (version) VALUES ($1) ON CONFLICT DO NOTHING`,
|
|
v); err != nil {
|
|
return err
|
|
}
|
|
if v == lastVersion {
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("migration %q not found", lastVersion)
|
|
}
|
|
|
|
func tryApplyMigration(ctx context.Context, pool *pgxpool.Pool, version string) error {
|
|
dir, err := resolveMigrationsDir()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
path := filepath.Join(dir, version+".up.sql")
|
|
sql, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if _, err := pool.Exec(ctx, string(sql)); err != nil {
|
|
return err
|
|
}
|
|
_, err = pool.Exec(ctx,
|
|
`INSERT INTO schema_migrations (version) VALUES ($1) ON CONFLICT DO NOTHING`,
|
|
version)
|
|
return err
|
|
}
|
|
|
|
// seedTaskUsageFixture inserts the minimal joined rows
|
|
// (workspace, runtime, agent, agent_task_queue) needed for a task_usage
|
|
// row to participate in the hourly rollup. Returns the IDs in
|
|
// (workspace, runtime, agent, task) order.
|
|
func seedTaskUsageFixture(t *testing.T, ctx context.Context, pool *pgxpool.Pool) (string, string, string, string) {
|
|
t.Helper()
|
|
|
|
var wsID, runtimeID, agentID, taskID string
|
|
if err := pool.QueryRow(ctx, `
|
|
INSERT INTO workspace (name, slug)
|
|
VALUES ('upgrade-test', 'upgrade-test')
|
|
RETURNING id
|
|
`).Scan(&wsID); err != nil {
|
|
t.Fatalf("seed workspace: %v", err)
|
|
}
|
|
if err := pool.QueryRow(ctx, `
|
|
INSERT INTO agent_runtime (
|
|
workspace_id, daemon_id, name, runtime_mode, provider, status,
|
|
device_info, metadata, last_seen_at
|
|
)
|
|
VALUES ($1, NULL, 'upgrade-runtime', 'cloud', 'p', 'online',
|
|
'{}'::jsonb, '{}'::jsonb, now())
|
|
RETURNING id
|
|
`, wsID).Scan(&runtimeID); err != nil {
|
|
t.Fatalf("seed runtime: %v", err)
|
|
}
|
|
if err := pool.QueryRow(ctx, `
|
|
INSERT INTO agent (
|
|
workspace_id, name, description, runtime_mode, runtime_config,
|
|
runtime_id, visibility, max_concurrent_tasks
|
|
)
|
|
VALUES ($1, 'upgrade-agent', '', 'cloud', '{}'::jsonb, $2, 'workspace', 1)
|
|
RETURNING id
|
|
`, wsID, runtimeID).Scan(&agentID); err != nil {
|
|
t.Fatalf("seed agent: %v", err)
|
|
}
|
|
if err := pool.QueryRow(ctx, `
|
|
INSERT INTO agent_task_queue (
|
|
agent_id, runtime_id, status, payload
|
|
)
|
|
VALUES ($1, $2, 'queued', '{}'::jsonb)
|
|
RETURNING id
|
|
`, agentID, runtimeID).Scan(&taskID); err != nil {
|
|
// agent_task_queue schema may differ; fall back to inferring
|
|
// the smallest column set.
|
|
var altErr error
|
|
altErr = pool.QueryRow(ctx, `
|
|
INSERT INTO agent_task_queue (agent_id, runtime_id)
|
|
VALUES ($1, $2)
|
|
RETURNING id
|
|
`, agentID, runtimeID).Scan(&taskID)
|
|
if altErr != nil {
|
|
t.Fatalf("seed agent_task_queue: %v / %v", err, altErr)
|
|
}
|
|
}
|
|
return wsID, runtimeID, agentID, taskID
|
|
}
|
|
|
|
// ensure errors-helpers import isn't dropped if the compiler decides
|
|
// to optimise the empty references away.
|
|
var _ = errors.New
|