mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
391 lines
11 KiB
Go
391 lines
11 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
)
|
|
|
|
// ErrLeaseLost is returned by heartbeat / terminal-update primitives
|
|
// when the row is no longer owned by the calling runner. The handler
|
|
// should stop, and the scheduler will not write a terminal status for
|
|
// the lost lease.
|
|
var ErrLeaseLost = errors.New("scheduler: lease lost")
|
|
|
|
// errTerminalIgnored is the sentinel finishSuccess / finishFailure use
|
|
// to convey a successful UPDATE that affected zero rows. Internal to
|
|
// this package.
|
|
var errTerminalIgnored = ErrLeaseLost
|
|
|
|
// dbNow returns Postgres's notion of "now" as the canonical clock. The
|
|
// scheduler uses DB time for every plan calculation and lease window so
|
|
// instances with skewed clocks still agree on the same plan_time.
|
|
func dbNow(ctx context.Context, pool *pgxpool.Pool) (time.Time, error) {
|
|
var t time.Time
|
|
if err := pool.QueryRow(ctx, "SELECT now()").Scan(&t); err != nil {
|
|
return time.Time{}, fmt.Errorf("scheduler: read db now: %w", err)
|
|
}
|
|
return t.UTC(), nil
|
|
}
|
|
|
|
// claim is the result of trying to acquire a plan. Only one of the
|
|
// boolean outcomes is true at a time.
|
|
type claim struct {
|
|
ID uuid.UUID
|
|
LeaseToken uuid.UUID
|
|
Attempt int
|
|
Won bool // fresh insert
|
|
Stole bool // stale-steal or FAILED retry
|
|
Conflicted bool // another runner already owns this plan, or attempts exhausted
|
|
}
|
|
|
|
// tryClaim attempts to acquire the lease for (job, scope, plan_time).
|
|
// Returns Won=true on a fresh insert, Stole=true on a stale-steal or
|
|
// retry-after-FAILED, and Conflicted=true if another runner owns the
|
|
// plan, the row is already SUCCESS, the FAILED row is not yet
|
|
// retry-eligible (next_retry_at in the future), or attempts are
|
|
// exhausted. The caller treats every Conflicted outcome the same way
|
|
// (no-op); the scheduler distinguishes them through audit-row metrics
|
|
// rather than per-call return fields.
|
|
func tryClaim(
|
|
ctx context.Context,
|
|
pool *pgxpool.Pool,
|
|
job *JobSpec,
|
|
scope Scope,
|
|
planTime time.Time,
|
|
dbTime time.Time,
|
|
runnerID string,
|
|
) (claim, error) {
|
|
// Fresh-insert path. ON CONFLICT DO NOTHING means losers do not
|
|
// touch the existing row — we follow up with the steal/retry path
|
|
// only if this insert was a conflict.
|
|
insertSQL := `
|
|
INSERT INTO sys_cron_executions (
|
|
job_name, scope_kind, scope_id, plan_time,
|
|
status, attempt, max_attempts,
|
|
runner_id, lease_token,
|
|
heartbeat_at, stale_after,
|
|
started_at, updated_at
|
|
) VALUES (
|
|
$1, $2, $3, $4,
|
|
'RUNNING', 1, $5,
|
|
$6, gen_random_uuid(),
|
|
$7::timestamptz, $7::timestamptz + make_interval(secs => $8),
|
|
$7::timestamptz, $7::timestamptz
|
|
)
|
|
ON CONFLICT ON CONSTRAINT uq_sys_cron_execution DO NOTHING
|
|
RETURNING id, lease_token, attempt
|
|
`
|
|
staleSecs := int64(job.StaleTimeout / time.Second)
|
|
if staleSecs <= 0 {
|
|
staleSecs = 1
|
|
}
|
|
|
|
var c claim
|
|
err := pool.QueryRow(ctx, insertSQL,
|
|
job.Name, scope.Kind, scope.ID, planTime,
|
|
job.MaxAttempts,
|
|
runnerID,
|
|
dbTime, staleSecs,
|
|
).Scan(&c.ID, &c.LeaseToken, &c.Attempt)
|
|
if err == nil {
|
|
c.Won = true
|
|
return c, nil
|
|
}
|
|
if !errors.Is(err, pgx.ErrNoRows) {
|
|
return claim{}, fmt.Errorf("scheduler: claim insert: %w", err)
|
|
}
|
|
|
|
// Conflict path. Try retry-after-FAILED or stale-steal in a single
|
|
// statement. The WHERE clause encodes both branches; the choice is
|
|
// made server-side from the existing row.
|
|
stealSQL := `
|
|
UPDATE sys_cron_executions
|
|
SET status = 'RUNNING',
|
|
attempt = attempt + 1,
|
|
runner_id = $1,
|
|
lease_token = gen_random_uuid(),
|
|
heartbeat_at = $2::timestamptz,
|
|
stale_after = $2::timestamptz + make_interval(secs => $3),
|
|
started_at = $2::timestamptz,
|
|
finished_at = NULL,
|
|
duration_ms = NULL,
|
|
next_retry_at = NULL,
|
|
error_code = NULL,
|
|
error_msg = NULL,
|
|
updated_at = $2::timestamptz
|
|
WHERE job_name = $4
|
|
AND scope_kind = $5
|
|
AND scope_id = $6
|
|
AND plan_time = $7
|
|
AND attempt < max_attempts
|
|
AND (
|
|
(status = 'FAILED' AND COALESCE(next_retry_at, $2::timestamptz) <= $2::timestamptz)
|
|
OR
|
|
(status = 'RUNNING' AND stale_after < $2::timestamptz AND $8)
|
|
)
|
|
RETURNING id, lease_token, attempt
|
|
`
|
|
err = pool.QueryRow(ctx, stealSQL,
|
|
runnerID,
|
|
dbTime, staleSecs,
|
|
job.Name, scope.Kind, scope.ID, planTime,
|
|
job.AllowStaleReentry,
|
|
).Scan(&c.ID, &c.LeaseToken, &c.Attempt)
|
|
if err == nil {
|
|
c.Stole = true
|
|
return c, nil
|
|
}
|
|
if !errors.Is(err, pgx.ErrNoRows) {
|
|
return claim{}, fmt.Errorf("scheduler: claim steal: %w", err)
|
|
}
|
|
|
|
// No-row update — somebody else owns a fresh RUNNING lease, the
|
|
// row is already SUCCESS, the row is FAILED but not yet
|
|
// retry-eligible, or attempts are exhausted. The caller treats
|
|
// these no-op cases identically.
|
|
c.Conflicted = true
|
|
return c, nil
|
|
}
|
|
|
|
// markStaleAsFailed transitions a single stale RUNNING row to FAILED
|
|
// when its job has AllowStaleReentry=false. Returns RowsAffected so the
|
|
// sweeper can log how many rows were closed.
|
|
func markStaleAsFailed(
|
|
ctx context.Context,
|
|
pool *pgxpool.Pool,
|
|
jobName string,
|
|
dbTime time.Time,
|
|
) (int64, error) {
|
|
tag, err := pool.Exec(ctx, `
|
|
UPDATE sys_cron_executions
|
|
SET status = 'FAILED',
|
|
finished_at = $2,
|
|
error_code = 'stale_timeout',
|
|
error_msg = 'lease expired without heartbeat',
|
|
updated_at = $2
|
|
WHERE job_name = $1
|
|
AND status = 'RUNNING'
|
|
AND stale_after < $2
|
|
`, jobName, dbTime)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("scheduler: mark stale failed: %w", err)
|
|
}
|
|
return tag.RowsAffected(), nil
|
|
}
|
|
|
|
// heartbeat extends stale_after for the runner that holds the lease.
|
|
// Returns ErrLeaseLost if the row is no longer ours (stolen or already
|
|
// finalised).
|
|
func heartbeat(
|
|
ctx context.Context,
|
|
pool *pgxpool.Pool,
|
|
id, leaseToken uuid.UUID,
|
|
staleTimeout time.Duration,
|
|
) error {
|
|
staleSecs := int64(staleTimeout / time.Second)
|
|
if staleSecs <= 0 {
|
|
staleSecs = 1
|
|
}
|
|
tag, err := pool.Exec(ctx, `
|
|
UPDATE sys_cron_executions
|
|
SET heartbeat_at = now(),
|
|
stale_after = now() + make_interval(secs => $3),
|
|
updated_at = now()
|
|
WHERE id = $1
|
|
AND lease_token = $2
|
|
AND status = 'RUNNING'
|
|
`, id, leaseToken, staleSecs)
|
|
if err != nil {
|
|
return fmt.Errorf("scheduler: heartbeat: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return ErrLeaseLost
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// finishSuccess writes a terminal SUCCESS row. The lease_token guard
|
|
// prevents an ex-lease-holder from overwriting a newer attempt's state.
|
|
func finishSuccess(
|
|
ctx context.Context,
|
|
pool *pgxpool.Pool,
|
|
id, leaseToken uuid.UUID,
|
|
dbTime time.Time,
|
|
durationMs int64,
|
|
res HandlerResult,
|
|
) error {
|
|
resultJSON, err := encodeResult(res.Result)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
tag, err := pool.Exec(ctx, `
|
|
UPDATE sys_cron_executions
|
|
SET status = 'SUCCESS',
|
|
finished_at = $3,
|
|
duration_ms = $4,
|
|
rows_affected = $5,
|
|
result = $6::jsonb,
|
|
error_code = NULL,
|
|
error_msg = NULL,
|
|
updated_at = $3
|
|
WHERE id = $1
|
|
AND lease_token = $2
|
|
AND status = 'RUNNING'
|
|
`, id, leaseToken, dbTime, durationMs, res.RowsAffected, resultJSON)
|
|
if err != nil {
|
|
return fmt.Errorf("scheduler: finish success: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return errTerminalIgnored
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// finishFailure writes a terminal FAILED row. nextRetryAt may be the
|
|
// zero value if no retry is due (max_attempts reached).
|
|
func finishFailure(
|
|
ctx context.Context,
|
|
pool *pgxpool.Pool,
|
|
id, leaseToken uuid.UUID,
|
|
dbTime time.Time,
|
|
durationMs int64,
|
|
errorCode, errorMsg string,
|
|
nextRetryAt time.Time,
|
|
) error {
|
|
var nextRetry pgtype.Timestamptz
|
|
if !nextRetryAt.IsZero() {
|
|
nextRetry = pgtype.Timestamptz{Time: nextRetryAt, Valid: true}
|
|
}
|
|
|
|
if errorCode == "" {
|
|
errorCode = "handler_error"
|
|
}
|
|
if len(errorMsg) > 4000 {
|
|
errorMsg = errorMsg[:4000]
|
|
}
|
|
|
|
tag, err := pool.Exec(ctx, `
|
|
UPDATE sys_cron_executions
|
|
SET status = 'FAILED',
|
|
finished_at = $3,
|
|
duration_ms = $4,
|
|
next_retry_at = $5,
|
|
error_code = $6,
|
|
error_msg = $7,
|
|
updated_at = $3
|
|
WHERE id = $1
|
|
AND lease_token = $2
|
|
AND status = 'RUNNING'
|
|
`, id, leaseToken, dbTime, durationMs, nextRetry, errorCode, errorMsg)
|
|
if err != nil {
|
|
return fmt.Errorf("scheduler: finish failure: %w", err)
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return errTerminalIgnored
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// encodeResult serialises a handler's small structured result to a
|
|
// JSONB-ready string. Empty/nil maps encode as `{}` so the column
|
|
// default is preserved if no result was set.
|
|
func encodeResult(in map[string]any) (string, error) {
|
|
if len(in) == 0 {
|
|
return "{}", nil
|
|
}
|
|
b, err := json.Marshal(in)
|
|
if err != nil {
|
|
return "", fmt.Errorf("scheduler: marshal result: %w", err)
|
|
}
|
|
// Defensive cap — keep result-row size predictable. Bigger
|
|
// payloads belong in structured logs.
|
|
if len(b) > 16*1024 {
|
|
return "", fmt.Errorf("scheduler: result payload too large (%d bytes); keep it small or use logs", len(b))
|
|
}
|
|
return string(b), nil
|
|
}
|
|
|
|
// latestPlanInfo returns the latest known plan_time for (job, scope)
|
|
// plus the fields the catch-up planner needs to decide whether the row
|
|
// is still claimable at the same plan_time (FAILED-with-retry) or
|
|
// finished and the next plan_time should advance past it.
|
|
type latestPlanInfo struct {
|
|
Found bool
|
|
PlanTime time.Time
|
|
Status string
|
|
Attempt int
|
|
MaxAttempts int
|
|
// NextRetryAt is zero (NULL in DB) when the row is not in a
|
|
// retry-eligible state, or when the next retry is due immediately
|
|
// (FAILED with no backoff configured).
|
|
NextRetryAt time.Time
|
|
}
|
|
|
|
// RetryEligible reports whether the latest stored row should still be
|
|
// considered for the same plan_time on the next tick. True for FAILED
|
|
// rows that have remaining attempts and whose next_retry_at has
|
|
// passed; the every_plan planner uses this to keep the cursor on the
|
|
// retry-eligible bucket so tryClaim's retry-from-FAILED branch can
|
|
// fire.
|
|
func (i latestPlanInfo) RetryEligible(now time.Time) bool {
|
|
if !i.Found {
|
|
return false
|
|
}
|
|
if i.Status != "FAILED" {
|
|
return false
|
|
}
|
|
if i.Attempt >= i.MaxAttempts {
|
|
return false
|
|
}
|
|
if i.NextRetryAt.IsZero() {
|
|
// COALESCE-style: NULL next_retry_at means "as soon as
|
|
// possible", which is right now.
|
|
return true
|
|
}
|
|
return !i.NextRetryAt.After(now)
|
|
}
|
|
|
|
func latestPlan(
|
|
ctx context.Context,
|
|
pool *pgxpool.Pool,
|
|
jobName string,
|
|
scope Scope,
|
|
) (latestPlanInfo, error) {
|
|
var info latestPlanInfo
|
|
var nextRetry pgtype.Timestamptz
|
|
err := pool.QueryRow(ctx, `
|
|
SELECT plan_time, status, attempt, max_attempts, next_retry_at
|
|
FROM sys_cron_executions
|
|
WHERE job_name = $1
|
|
AND scope_kind = $2
|
|
AND scope_id = $3
|
|
ORDER BY plan_time DESC
|
|
LIMIT 1
|
|
`, jobName, scope.Kind, scope.ID).Scan(
|
|
&info.PlanTime, &info.Status,
|
|
&info.Attempt, &info.MaxAttempts,
|
|
&nextRetry,
|
|
)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return info, nil
|
|
}
|
|
return info, fmt.Errorf("scheduler: read latest plan: %w", err)
|
|
}
|
|
info.Found = true
|
|
info.PlanTime = info.PlanTime.UTC()
|
|
if nextRetry.Valid {
|
|
info.NextRetryAt = nextRetry.Time.UTC()
|
|
}
|
|
return info, nil
|
|
}
|