Compare commits

...

2 Commits

Author SHA1 Message Date
Jiang Bohan
acadd2a3ec chore(autopilot): relax failure monitor defaults to daily / 50 runs
Per review feedback in MUL-1339: 30-min scan was overkill — the 50-run
threshold already provides multi-hour lag, and operational simplicity
matters. Lowering MinRuns from 100 → 50 keeps low-frequency autopilots
in scope (~7 runs/day reaches threshold within 7d window).

Co-authored-by: multica-agent <github@multica.ai>
2026-05-06 17:57:39 +08:00
Jiang Bohan
c5f55124cf feat(autopilot): auto-pause autopilots with sustained high failure rate
Adds a background monitor that pauses any active autopilot whose recent
runs are dominated by failures (defaults: ≥100 terminal runs in 7d, ≥90%
failed). The monitor leaves a severity=attention inbox notification for
the autopilot's creator (or the agent's owner if the autopilot was
agent-created) so a human learns about the auto-pause and can fix the
root cause before re-enabling.

Motivated by MUL-1336 §6 #2: a single broken cron autopilot
(`Registro de ls cada 5 min`, 1,475/1,476 failed in 7d) was burning
~1.5k tasks/tokens per week with no human in the loop.

Tunable via AUTOPILOT_FAIL_MONITOR_{INTERVAL,LOOKBACK,MIN_RUNS,FAIL_RATIO,STARTUP_DELAY};
INTERVAL=0 disables the monitor entirely.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-06 17:32:22 +08:00
5 changed files with 834 additions and 0 deletions

View File

@@ -0,0 +1,425 @@
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"math"
"os"
"strconv"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// failureMonitorConfig is the tunable knob set for the autopilot failure
// monitor. Defaults match the proposal in MUL-1336 §6 action item #2:
// pause autopilots whose recent run history is dominated by failures and that
// have run enough times that the failure rate is statistically meaningful.
//
// All values can be overridden via env vars (see envFailureMonitorConfig).
// Setting Interval <= 0 disables the monitor entirely.
type failureMonitorConfig struct {
Interval time.Duration
Lookback time.Duration
MinRuns int64
FailRatio float64
StartupDelay time.Duration
}
func defaultFailureMonitorConfig() failureMonitorConfig {
return failureMonitorConfig{
Interval: 24 * time.Hour,
Lookback: 7 * 24 * time.Hour,
MinRuns: 50,
FailRatio: 0.9,
StartupDelay: 1 * time.Minute,
}
}
func envFailureMonitorConfig() failureMonitorConfig {
cfg := defaultFailureMonitorConfig()
cfg.Interval = envDurationOrZero("AUTOPILOT_FAIL_MONITOR_INTERVAL", cfg.Interval)
cfg.Lookback = envDurationPositive("AUTOPILOT_FAIL_MONITOR_LOOKBACK", cfg.Lookback)
cfg.StartupDelay = envDurationNonNegative("AUTOPILOT_FAIL_MONITOR_STARTUP_DELAY", cfg.StartupDelay)
if v, ok := envInt64Positive("AUTOPILOT_FAIL_MONITOR_MIN_RUNS"); ok {
cfg.MinRuns = v
}
if v, ok := envFloatInUnitInterval("AUTOPILOT_FAIL_MONITOR_FAIL_RATIO"); ok {
cfg.FailRatio = v
}
return cfg
}
// runAutopilotFailureMonitor periodically pauses autopilots whose recent run
// history exceeds the configured failure threshold. This stops runaway
// scheduled autopilots from burning tasks/tokens on a hot loop (e.g. the
// `Registro de ls cada 5 min` case in MUL-1336: 1,475 / 1,476 runs failed
// over 7 days, still firing every 5 min). The monitor leaves a
// `severity=attention` inbox notification for the autopilot's creator (or the
// agent's owner if the autopilot was created by an agent) so somebody human
// learns that auto-pause happened.
//
// Disable with `AUTOPILOT_FAIL_MONITOR_INTERVAL=0`.
func runAutopilotFailureMonitor(ctx context.Context, queries *db.Queries, bus *events.Bus, cfg failureMonitorConfig) {
if cfg.Interval <= 0 {
slog.Info("autopilot failure monitor: disabled (interval <= 0)")
return
}
slog.Info(
"autopilot failure monitor: starting",
"interval", cfg.Interval.String(),
"lookback", cfg.Lookback.String(),
"min_runs", cfg.MinRuns,
"fail_ratio", cfg.FailRatio,
)
// Stagger startup so we don't all-or-nothing hit the DB the moment the
// process boots — important during a fleet rolling restart.
if cfg.StartupDelay > 0 {
select {
case <-ctx.Done():
return
case <-time.After(cfg.StartupDelay):
}
}
// Run once immediately after the startup delay so a freshly-deployed node
// catches existing offenders without waiting a full interval.
tickAutopilotFailureMonitor(ctx, queries, bus, cfg)
ticker := time.NewTicker(cfg.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
tickAutopilotFailureMonitor(ctx, queries, bus, cfg)
}
}
}
// tickAutopilotFailureMonitor performs a single sweep: query candidates,
// attempt to pause each, and emit notifications + WS events on success.
func tickAutopilotFailureMonitor(ctx context.Context, queries *db.Queries, bus *events.Bus, cfg failureMonitorConfig) {
since := time.Now().Add(-cfg.Lookback)
candidates, err := queries.SelectAutopilotsExceedingFailureThreshold(
ctx,
db.SelectAutopilotsExceedingFailureThresholdParams{
MinRuns: cfg.MinRuns,
FailRatioThreshold: cfg.FailRatio,
Since: pgtype.Timestamptz{Time: since, Valid: true},
},
)
if err != nil {
slog.Warn("autopilot failure monitor: failed to query candidates", "error", err)
return
}
if len(candidates) == 0 {
return
}
slog.Info("autopilot failure monitor: candidates", "count", len(candidates))
for _, c := range candidates {
paused, err := queries.SystemPauseAutopilot(ctx, c.ID)
if err != nil {
// pgx returns ErrNoRows when the WHERE status='active' clause
// matched zero rows — i.e. another caller (manual UI action,
// concurrent monitor) paused it first. Treat as a benign no-op.
if isNoRows(err) {
continue
}
slog.Warn("autopilot failure monitor: pause failed",
"autopilot_id", util.UUIDToString(c.ID),
"error", err,
)
continue
}
failPct := 100.0
if c.TotalRuns > 0 {
failPct = math.Round(float64(c.FailedRuns)/float64(c.TotalRuns)*1000) / 10 // one decimal place
}
slog.Info(
"autopilot failure monitor: paused autopilot",
"autopilot_id", util.UUIDToString(c.ID),
"workspace_id", util.UUIDToString(c.WorkspaceID),
"title", c.Title,
"failed_runs", c.FailedRuns,
"total_runs", c.TotalRuns,
"fail_pct", failPct,
)
emitAutopilotPausedNotifications(ctx, queries, bus, paused, c, cfg, failPct)
// Fan out the status change so any open UI updates the autopilot row.
workspaceID := util.UUIDToString(paused.WorkspaceID)
bus.Publish(events.Event{
Type: protocol.EventAutopilotUpdated,
WorkspaceID: workspaceID,
ActorType: "system",
Payload: map[string]any{
"autopilot": autopilotEventPayload(paused),
"reason": "auto_paused_high_failure_rate",
},
})
}
}
// emitAutopilotPausedNotifications creates one inbox_item per relevant
// recipient and publishes inbox:new events so each lands live. Recipients:
//
// 1. The autopilot creator if a member.
// 2. If the autopilot creator is an agent, the agent's owner_id (mapped to a
// workspace member).
//
// Resolving against owner_id keeps us from pinging an agent whose inbox isn't
// actionable, while still attributing the alert to whoever set the autopilot
// up. If neither path lands a human (e.g. agent has no owner), we skip
// silently — the WS autopilot:updated event still surfaces the change in the
// UI for any logged-in workspace member.
func emitAutopilotPausedNotifications(
ctx context.Context,
queries *db.Queries,
bus *events.Bus,
autopilot db.Autopilot,
candidate db.SelectAutopilotsExceedingFailureThresholdRow,
cfg failureMonitorConfig,
failPct float64,
) {
recipients := resolveAutopilotPausedRecipients(ctx, queries, autopilot)
if len(recipients) == 0 {
return
}
title := fmt.Sprintf("Autopilot paused: %s", autopilot.Title)
body := fmt.Sprintf(
"Auto-paused after %d of %d runs failed (%.1f%%) in the last %s. Investigate the failures, fix the root cause, then re-enable from the autopilot page.",
candidate.FailedRuns, candidate.TotalRuns, failPct, formatLookback(cfg.Lookback),
)
details, _ := json.Marshal(map[string]any{
"autopilot_id": util.UUIDToString(autopilot.ID),
"autopilot_title": autopilot.Title,
"failed_runs": candidate.FailedRuns,
"total_runs": candidate.TotalRuns,
"fail_pct": failPct,
"lookback_seconds": int64(cfg.Lookback.Seconds()),
"threshold_min_runs": cfg.MinRuns,
"threshold_fail_ratio": cfg.FailRatio,
"reason": "auto_paused_high_failure_rate",
})
workspaceID := util.UUIDToString(autopilot.WorkspaceID)
autopilotIDStr := util.UUIDToString(autopilot.ID)
emitted := make(map[string]bool, len(recipients))
for _, r := range recipients {
key := r.Type + ":" + util.UUIDToString(r.ID)
if emitted[key] {
continue
}
emitted[key] = true
item, err := queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: autopilot.WorkspaceID,
RecipientType: r.Type,
RecipientID: r.ID,
Type: "autopilot_paused",
Severity: "attention",
IssueID: pgtype.UUID{},
Title: title,
Body: util.StrToText(body),
ActorType: util.StrToText("system"),
ActorID: pgtype.UUID{},
Details: details,
})
if err != nil {
slog.Warn("autopilot failure monitor: inbox write failed",
"autopilot_id", autopilotIDStr,
"recipient_type", r.Type,
"recipient_id", util.UUIDToString(r.ID),
"error", err,
)
continue
}
bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: workspaceID,
ActorType: "system",
ActorID: "",
Payload: map[string]any{"item": inboxItemToResponse(item)},
})
}
}
// pausedRecipient identifies a single inbox_item recipient.
type pausedRecipient struct {
Type string // "member" or "agent"
ID pgtype.UUID
}
func resolveAutopilotPausedRecipients(
ctx context.Context,
queries *db.Queries,
autopilot db.Autopilot,
) []pausedRecipient {
if autopilot.CreatedByType == "member" {
return []pausedRecipient{{Type: "member", ID: autopilot.CreatedByID}}
}
// Creator is an agent — find the agent's human owner so the alert lands
// somewhere actionable. If we can't resolve a member, skip notification
// rather than spam an agent that can't act on it.
agent, err := queries.GetAgent(ctx, autopilot.CreatedByID)
if err != nil {
slog.Debug("autopilot failure monitor: failed to load creator agent",
"agent_id", util.UUIDToString(autopilot.CreatedByID),
"error", err,
)
return nil
}
if !agent.OwnerID.Valid {
return nil
}
member, err := queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
UserID: agent.OwnerID,
WorkspaceID: autopilot.WorkspaceID,
})
if err != nil {
return nil
}
return []pausedRecipient{{Type: "member", ID: member.UserID}}
}
// autopilotEventPayload builds the minimal payload shape consumed by
// frontend listeners (mirrors handler.AutopilotResponse). Kept here instead
// of importing the handler package to avoid a cycle (handler imports the
// service which we're sitting alongside in cmd/server).
func autopilotEventPayload(a db.Autopilot) map[string]any {
return map[string]any{
"id": util.UUIDToString(a.ID),
"workspace_id": util.UUIDToString(a.WorkspaceID),
"title": a.Title,
"description": util.TextToPtr(a.Description),
"assignee_id": util.UUIDToString(a.AssigneeID),
"status": a.Status,
"execution_mode": a.ExecutionMode,
"issue_title_template": util.TextToPtr(a.IssueTitleTemplate),
"created_by_type": a.CreatedByType,
"created_by_id": util.UUIDToString(a.CreatedByID),
"last_run_at": util.TimestampToPtr(a.LastRunAt),
"created_at": util.TimestampToString(a.CreatedAt),
"updated_at": util.TimestampToString(a.UpdatedAt),
}
}
// isNoRows wraps the sentinel for pgx :one queries that match no rows. The
// SystemPauseAutopilot UPDATE returns no rows when the autopilot was already
// paused/archived, which we want to treat as a benign no-op rather than an
// error to log.
func isNoRows(err error) bool {
return errors.Is(err, pgx.ErrNoRows)
}
func formatLookback(d time.Duration) string {
if d <= 0 {
return "0s"
}
hours := d / time.Hour
if hours >= 24 && d%(24*time.Hour) == 0 {
days := hours / 24
if days == 1 {
return "1 day"
}
return fmt.Sprintf("%d days", days)
}
if d%time.Hour == 0 {
if hours == 1 {
return "1 hour"
}
return fmt.Sprintf("%d hours", hours)
}
return d.String()
}
// envDurationOrZero parses a duration env var. An explicit 0/negative is
// honored (used to disable the monitor); empty returns the default; an
// unparseable value warns and returns the default.
func envDurationOrZero(name string, def time.Duration) time.Duration {
raw := os.Getenv(name)
if raw == "" {
return def
}
v, err := time.ParseDuration(raw)
if err != nil {
slog.Warn("invalid env var, using default", "name", name, "value", raw, "default", def.String(), "error", err)
return def
}
return v
}
func envDurationPositive(name string, def time.Duration) time.Duration {
raw := os.Getenv(name)
if raw == "" {
return def
}
v, err := time.ParseDuration(raw)
if err != nil || v <= 0 {
slog.Warn("invalid env var, using default", "name", name, "value", raw, "default", def.String(), "error", err)
return def
}
return v
}
func envDurationNonNegative(name string, def time.Duration) time.Duration {
raw := os.Getenv(name)
if raw == "" {
return def
}
v, err := time.ParseDuration(raw)
if err != nil || v < 0 {
slog.Warn("invalid env var, using default", "name", name, "value", raw, "default", def.String(), "error", err)
return def
}
return v
}
func envInt64Positive(name string) (int64, bool) {
raw := os.Getenv(name)
if raw == "" {
return 0, false
}
v, err := strconv.ParseInt(raw, 10, 64)
if err != nil || v <= 0 {
slog.Warn("invalid env var, ignored", "name", name, "value", raw, "error", err)
return 0, false
}
return v, true
}
func envFloatInUnitInterval(name string) (float64, bool) {
raw := os.Getenv(name)
if raw == "" {
return 0, false
}
v, err := strconv.ParseFloat(raw, 64)
if err != nil || v <= 0 || v > 1 {
slog.Warn("invalid env var (must be in (0,1]), ignored", "name", name, "value", raw, "error", err)
return 0, false
}
return v, true
}

View File

@@ -0,0 +1,259 @@
package main
import (
"context"
"testing"
"time"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// pickFixtureAgent grabs the first agent in the workspace fixture. The
// integration TestMain seeds exactly one agent, so this is deterministic.
func pickFixtureAgent(t *testing.T) pgtype.UUID {
t.Helper()
var agentID string
if err := testPool.QueryRow(context.Background(),
`SELECT id::text FROM agent WHERE workspace_id = $1 ORDER BY created_at ASC LIMIT 1`,
testWorkspaceID,
).Scan(&agentID); err != nil {
t.Fatalf("load fixture agent: %v", err)
}
return parseUUID(agentID)
}
// seedAutopilot creates an autopilot owned by the given creator (member or
// agent UUID + type) and registers cleanup. Status defaults to "active".
func seedAutopilot(t *testing.T, queries *db.Queries, title, creatorType string, creatorID pgtype.UUID, agentID pgtype.UUID) db.Autopilot {
t.Helper()
ctx := context.Background()
ap, err := queries.CreateAutopilot(ctx, db.CreateAutopilotParams{
WorkspaceID: parseUUID(testWorkspaceID),
Title: title,
AssigneeID: agentID,
Status: "active",
ExecutionMode: "run_only",
CreatedByType: creatorType,
CreatedByID: creatorID,
})
if err != nil {
t.Fatalf("CreateAutopilot: %v", err)
}
t.Cleanup(func() {
// inbox_item has no FK to autopilot, so clean both up explicitly.
testPool.Exec(context.Background(),
`DELETE FROM inbox_item WHERE workspace_id = $1 AND details->>'autopilot_id' = $2`,
testWorkspaceID, util.UUIDToString(ap.ID))
testPool.Exec(context.Background(), `DELETE FROM autopilot WHERE id = $1`, ap.ID)
})
return ap
}
// seedAutopilotRuns inserts n runs for the given autopilot, the first
// `failed` of which have status='failed' and the rest 'completed'. All runs
// are timestamped at `runAt` so they fall inside or outside a chosen lookback
// window deterministically.
func seedAutopilotRuns(t *testing.T, autopilotID pgtype.UUID, total, failed int, runAt time.Time) {
t.Helper()
ctx := context.Background()
for i := 0; i < total; i++ {
status := "completed"
if i < failed {
status = "failed"
}
if _, err := testPool.Exec(ctx, `
INSERT INTO autopilot_run (autopilot_id, source, status, created_at, triggered_at, completed_at)
VALUES ($1, 'schedule', $2, $3, $3, $3)
`, autopilotID, status, runAt); err != nil {
t.Fatalf("seed autopilot_run: %v", err)
}
}
}
func reloadAutopilotStatus(t *testing.T, queries *db.Queries, id pgtype.UUID) string {
t.Helper()
ap, err := queries.GetAutopilot(context.Background(), id)
if err != nil {
t.Fatalf("GetAutopilot: %v", err)
}
return ap.Status
}
func TestAutopilotFailureMonitor_PausesOffenderAndNotifiesCreator(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
cfg := failureMonitorConfig{
Interval: time.Hour,
Lookback: 7 * 24 * time.Hour,
MinRuns: 10,
FailRatio: 0.9,
}
agentID := pickFixtureAgent(t)
offender := seedAutopilot(t, queries, "Failure monitor: offender", "member", parseUUID(testUserID), agentID)
innocent := seedAutopilot(t, queries, "Failure monitor: innocent", "member", parseUUID(testUserID), agentID)
now := time.Now()
// 12 runs in window, 11 failed → 91.6% > 90% and ≥10 min runs.
seedAutopilotRuns(t, offender.ID, 12, 11, now.Add(-1*time.Hour))
// Innocent: also lots of failures, but they fall outside the lookback.
seedAutopilotRuns(t, innocent.ID, 12, 12, now.Add(-30*24*time.Hour))
// Innocent: a few recent runs but below min_runs threshold.
seedAutopilotRuns(t, innocent.ID, 5, 5, now.Add(-1*time.Hour))
var inboxEvents []events.Event
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
inboxEvents = append(inboxEvents, e)
})
var updateEvents []events.Event
bus.Subscribe(protocol.EventAutopilotUpdated, func(e events.Event) {
updateEvents = append(updateEvents, e)
})
tickAutopilotFailureMonitor(context.Background(), queries, bus, cfg)
if got := reloadAutopilotStatus(t, queries, offender.ID); got != "paused" {
t.Fatalf("expected offender to be paused, got %q", got)
}
if got := reloadAutopilotStatus(t, queries, innocent.ID); got != "active" {
t.Fatalf("expected innocent to stay active, got %q", got)
}
if len(updateEvents) != 1 {
t.Fatalf("expected 1 autopilot:updated event, got %d", len(updateEvents))
}
if got := updateEvents[0].Payload.(map[string]any)["reason"]; got != "auto_paused_high_failure_rate" {
t.Fatalf("expected reason auto_paused_high_failure_rate, got %v", got)
}
if len(inboxEvents) != 1 {
t.Fatalf("expected 1 inbox:new event, got %d", len(inboxEvents))
}
item := inboxEvents[0].Payload.(map[string]any)["item"].(map[string]any)
if got := item["type"]; got != "autopilot_paused" {
t.Fatalf("expected inbox type autopilot_paused, got %v", got)
}
if got := item["severity"]; got != "attention" {
t.Fatalf("expected severity attention, got %v", got)
}
if got := item["recipient_id"]; got != testUserID {
t.Fatalf("expected recipient %s, got %v", testUserID, got)
}
// Confirm the inbox item exists in the DB too.
items := inboxItemsForRecipient(t, queries, testUserID)
var found bool
for _, it := range items {
if it.Type == "autopilot_paused" {
found = true
break
}
}
if !found {
t.Fatalf("expected an autopilot_paused inbox_item in DB for user %s", testUserID)
}
}
func TestAutopilotFailureMonitor_LeavesAlreadyPausedAlone(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
cfg := failureMonitorConfig{
Interval: time.Hour,
Lookback: 7 * 24 * time.Hour,
MinRuns: 10,
FailRatio: 0.9,
}
agentID := pickFixtureAgent(t)
ap := seedAutopilot(t, queries, "Failure monitor: already paused", "member", parseUUID(testUserID), agentID)
seedAutopilotRuns(t, ap.ID, 12, 11, time.Now().Add(-1*time.Hour))
// Manually pause first.
if _, err := testPool.Exec(context.Background(),
`UPDATE autopilot SET status = 'paused' WHERE id = $1`, ap.ID); err != nil {
t.Fatalf("manual pause: %v", err)
}
var inboxEvents []events.Event
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
inboxEvents = append(inboxEvents, e)
})
tickAutopilotFailureMonitor(context.Background(), queries, bus, cfg)
if len(inboxEvents) != 0 {
t.Fatalf("paused autopilots must not generate notifications, got %d", len(inboxEvents))
}
}
func TestAutopilotFailureMonitor_AgentCreatorRoutesToOwner(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
cfg := failureMonitorConfig{
Interval: time.Hour,
Lookback: 7 * 24 * time.Hour,
MinRuns: 10,
FailRatio: 0.9,
}
agentID := pickFixtureAgent(t)
// The fixture agent's owner_id is testUserID (set in setupIntegrationTestFixture).
ap := seedAutopilot(t, queries, "Failure monitor: agent-created", "agent", agentID, agentID)
seedAutopilotRuns(t, ap.ID, 11, 10, time.Now().Add(-2*time.Hour))
var inboxEvents []events.Event
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
inboxEvents = append(inboxEvents, e)
})
tickAutopilotFailureMonitor(context.Background(), queries, bus, cfg)
if got := reloadAutopilotStatus(t, queries, ap.ID); got != "paused" {
t.Fatalf("expected paused, got %q", got)
}
if len(inboxEvents) != 1 {
t.Fatalf("expected 1 inbox event for the agent's owner, got %d", len(inboxEvents))
}
item := inboxEvents[0].Payload.(map[string]any)["item"].(map[string]any)
if got := item["recipient_id"]; got != testUserID {
t.Fatalf("expected recipient owner %s, got %v", testUserID, got)
}
if got := item["recipient_type"]; got != "member" {
t.Fatalf("expected member recipient_type, got %v", got)
}
}
func TestAutopilotFailureMonitor_BelowThresholdNoOp(t *testing.T) {
queries := db.New(testPool)
bus := events.New()
cfg := failureMonitorConfig{
Interval: time.Hour,
Lookback: 7 * 24 * time.Hour,
MinRuns: 10,
FailRatio: 0.9,
}
agentID := pickFixtureAgent(t)
ap := seedAutopilot(t, queries, "Failure monitor: under threshold", "member", parseUUID(testUserID), agentID)
// 12 total, 5 failed → 41.6% < 90%.
seedAutopilotRuns(t, ap.ID, 12, 5, time.Now().Add(-1*time.Hour))
var inboxEvents []events.Event
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
inboxEvents = append(inboxEvents, e)
})
tickAutopilotFailureMonitor(context.Background(), queries, bus, cfg)
if got := reloadAutopilotStatus(t, queries, ap.ID); got != "active" {
t.Fatalf("expected active, got %q", got)
}
if len(inboxEvents) != 0 {
t.Fatalf("expected no inbox events, got %d", len(inboxEvents))
}
}

View File

@@ -307,6 +307,7 @@ func main() {
// Start background sweeper to mark stale runtimes as offline.
go runRuntimeSweeper(sweepCtx, queries, liveness, taskSvc, bus)
go runAutopilotScheduler(autopilotCtx, queries, autopilotSvc)
go runAutopilotFailureMonitor(autopilotCtx, queries, bus, envFailureMonitorConfig())
go runDBStatsLogger(sweepCtx, pool)
if metricsServer != nil {

View File

@@ -686,6 +686,115 @@ func (q *Queries) RecoverLostTriggers(ctx context.Context) ([]RecoverLostTrigger
return items, nil
}
const selectAutopilotsExceedingFailureThreshold = `-- name: SelectAutopilotsExceedingFailureThreshold :many
WITH stats AS (
SELECT autopilot_id,
count(*) FILTER (WHERE status IN ('completed', 'failed', 'skipped')) AS total,
count(*) FILTER (WHERE status = 'failed') AS failed
FROM autopilot_run
WHERE created_at >= $3::timestamptz
GROUP BY autopilot_id
)
SELECT a.id, a.workspace_id, a.title, a.assignee_id,
a.created_by_type, a.created_by_id,
s.total::bigint AS total_runs,
s.failed::bigint AS failed_runs
FROM autopilot a
JOIN stats s ON s.autopilot_id = a.id
WHERE a.status = 'active'
AND s.total >= $1::bigint
AND s.failed::float8 / NULLIF(s.total, 0)::float8 >= $2::float8
ORDER BY s.failed DESC, a.id ASC
`
type SelectAutopilotsExceedingFailureThresholdParams struct {
MinRuns int64 `json:"min_runs"`
FailRatioThreshold float64 `json:"fail_ratio_threshold"`
Since pgtype.Timestamptz `json:"since"`
}
type SelectAutopilotsExceedingFailureThresholdRow struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
Title string `json:"title"`
AssigneeID pgtype.UUID `json:"assignee_id"`
CreatedByType string `json:"created_by_type"`
CreatedByID pgtype.UUID `json:"created_by_id"`
TotalRuns int64 `json:"total_runs"`
FailedRuns int64 `json:"failed_runs"`
}
// =====================
// Failure-rate auto-pause
// =====================
// Find active autopilots whose recent run failure rate exceeds the threshold.
// Counts only terminal runs (completed | failed | skipped); pending,
// issue_created and running are excluded so in-flight work isn't penalised.
// Used by the failure monitor to auto-pause sustained-failure autopilots
// (the canonical example from MUL-1336 was an autopilot scheduled every 5 min
// that 100% failed for days, burning ~1.5k useless tasks per week).
func (q *Queries) SelectAutopilotsExceedingFailureThreshold(ctx context.Context, arg SelectAutopilotsExceedingFailureThresholdParams) ([]SelectAutopilotsExceedingFailureThresholdRow, error) {
rows, err := q.db.Query(ctx, selectAutopilotsExceedingFailureThreshold, arg.MinRuns, arg.FailRatioThreshold, arg.Since)
if err != nil {
return nil, err
}
defer rows.Close()
items := []SelectAutopilotsExceedingFailureThresholdRow{}
for rows.Next() {
var i SelectAutopilotsExceedingFailureThresholdRow
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.AssigneeID,
&i.CreatedByType,
&i.CreatedByID,
&i.TotalRuns,
&i.FailedRuns,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const systemPauseAutopilot = `-- name: SystemPauseAutopilot :one
UPDATE autopilot
SET status = 'paused', updated_at = now()
WHERE id = $1 AND status = 'active'
RETURNING id, workspace_id, title, description, assignee_id, status, execution_mode, issue_title_template, created_by_type, created_by_id, last_run_at, created_at, updated_at
`
// Atomically pauses an autopilot only if it is currently active. Returns no
// rows when the autopilot was already paused/archived (or another worker
// raced first), letting the caller treat that as a benign no-op rather than
// an error.
func (q *Queries) SystemPauseAutopilot(ctx context.Context, id pgtype.UUID) (Autopilot, error) {
row := q.db.QueryRow(ctx, systemPauseAutopilot, id)
var i Autopilot
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.Description,
&i.AssigneeID,
&i.Status,
&i.ExecutionMode,
&i.IssueTitleTemplate,
&i.CreatedByType,
&i.CreatedByID,
&i.LastRunAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const updateAutopilot = `-- name: UpdateAutopilot :one
UPDATE autopilot SET
title = COALESCE($2, title),

View File

@@ -194,3 +194,43 @@ WHERE t.kind = 'schedule'
AND t.next_run_at IS NULL
AND t.cron_expression IS NOT NULL
AND a.status = 'active';
-- =====================
-- Failure-rate auto-pause
-- =====================
-- name: SelectAutopilotsExceedingFailureThreshold :many
-- Find active autopilots whose recent run failure rate exceeds the threshold.
-- Counts only terminal runs (completed | failed | skipped); pending,
-- issue_created and running are excluded so in-flight work isn't penalised.
-- Used by the failure monitor to auto-pause sustained-failure autopilots
-- (the canonical example from MUL-1336 was an autopilot scheduled every 5 min
-- that 100% failed for days, burning ~1.5k useless tasks per week).
WITH stats AS (
SELECT autopilot_id,
count(*) FILTER (WHERE status IN ('completed', 'failed', 'skipped')) AS total,
count(*) FILTER (WHERE status = 'failed') AS failed
FROM autopilot_run
WHERE created_at >= sqlc.arg('since')::timestamptz
GROUP BY autopilot_id
)
SELECT a.id, a.workspace_id, a.title, a.assignee_id,
a.created_by_type, a.created_by_id,
s.total::bigint AS total_runs,
s.failed::bigint AS failed_runs
FROM autopilot a
JOIN stats s ON s.autopilot_id = a.id
WHERE a.status = 'active'
AND s.total >= sqlc.arg('min_runs')::bigint
AND s.failed::float8 / NULLIF(s.total, 0)::float8 >= sqlc.arg('fail_ratio_threshold')::float8
ORDER BY s.failed DESC, a.id ASC;
-- name: SystemPauseAutopilot :one
-- Atomically pauses an autopilot only if it is currently active. Returns no
-- rows when the autopilot was already paused/archived (or another worker
-- raced first), letting the caller treat that as a benign no-op rather than
-- an error.
UPDATE autopilot
SET status = 'paused', updated_at = now()
WHERE id = $1 AND status = 'active'
RETURNING *;