mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 18:09:14 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/81
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
acadd2a3ec | ||
|
|
c5f55124cf |
425
server/cmd/server/autopilot_failure_monitor.go
Normal file
425
server/cmd/server/autopilot_failure_monitor.go
Normal file
@@ -0,0 +1,425 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"math"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// failureMonitorConfig is the tunable knob set for the autopilot failure
|
||||
// monitor. Defaults match the proposal in MUL-1336 §6 action item #2:
|
||||
// pause autopilots whose recent run history is dominated by failures and that
|
||||
// have run enough times that the failure rate is statistically meaningful.
|
||||
//
|
||||
// All values can be overridden via env vars (see envFailureMonitorConfig).
|
||||
// Setting Interval <= 0 disables the monitor entirely.
|
||||
type failureMonitorConfig struct {
|
||||
Interval time.Duration
|
||||
Lookback time.Duration
|
||||
MinRuns int64
|
||||
FailRatio float64
|
||||
StartupDelay time.Duration
|
||||
}
|
||||
|
||||
func defaultFailureMonitorConfig() failureMonitorConfig {
|
||||
return failureMonitorConfig{
|
||||
Interval: 24 * time.Hour,
|
||||
Lookback: 7 * 24 * time.Hour,
|
||||
MinRuns: 50,
|
||||
FailRatio: 0.9,
|
||||
StartupDelay: 1 * time.Minute,
|
||||
}
|
||||
}
|
||||
|
||||
func envFailureMonitorConfig() failureMonitorConfig {
|
||||
cfg := defaultFailureMonitorConfig()
|
||||
cfg.Interval = envDurationOrZero("AUTOPILOT_FAIL_MONITOR_INTERVAL", cfg.Interval)
|
||||
cfg.Lookback = envDurationPositive("AUTOPILOT_FAIL_MONITOR_LOOKBACK", cfg.Lookback)
|
||||
cfg.StartupDelay = envDurationNonNegative("AUTOPILOT_FAIL_MONITOR_STARTUP_DELAY", cfg.StartupDelay)
|
||||
if v, ok := envInt64Positive("AUTOPILOT_FAIL_MONITOR_MIN_RUNS"); ok {
|
||||
cfg.MinRuns = v
|
||||
}
|
||||
if v, ok := envFloatInUnitInterval("AUTOPILOT_FAIL_MONITOR_FAIL_RATIO"); ok {
|
||||
cfg.FailRatio = v
|
||||
}
|
||||
return cfg
|
||||
}
|
||||
|
||||
// runAutopilotFailureMonitor periodically pauses autopilots whose recent run
|
||||
// history exceeds the configured failure threshold. This stops runaway
|
||||
// scheduled autopilots from burning tasks/tokens on a hot loop (e.g. the
|
||||
// `Registro de ls cada 5 min` case in MUL-1336: 1,475 / 1,476 runs failed
|
||||
// over 7 days, still firing every 5 min). The monitor leaves a
|
||||
// `severity=attention` inbox notification for the autopilot's creator (or the
|
||||
// agent's owner if the autopilot was created by an agent) so somebody human
|
||||
// learns that auto-pause happened.
|
||||
//
|
||||
// Disable with `AUTOPILOT_FAIL_MONITOR_INTERVAL=0`.
|
||||
func runAutopilotFailureMonitor(ctx context.Context, queries *db.Queries, bus *events.Bus, cfg failureMonitorConfig) {
|
||||
if cfg.Interval <= 0 {
|
||||
slog.Info("autopilot failure monitor: disabled (interval <= 0)")
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info(
|
||||
"autopilot failure monitor: starting",
|
||||
"interval", cfg.Interval.String(),
|
||||
"lookback", cfg.Lookback.String(),
|
||||
"min_runs", cfg.MinRuns,
|
||||
"fail_ratio", cfg.FailRatio,
|
||||
)
|
||||
|
||||
// Stagger startup so we don't all-or-nothing hit the DB the moment the
|
||||
// process boots — important during a fleet rolling restart.
|
||||
if cfg.StartupDelay > 0 {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(cfg.StartupDelay):
|
||||
}
|
||||
}
|
||||
|
||||
// Run once immediately after the startup delay so a freshly-deployed node
|
||||
// catches existing offenders without waiting a full interval.
|
||||
tickAutopilotFailureMonitor(ctx, queries, bus, cfg)
|
||||
|
||||
ticker := time.NewTicker(cfg.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
tickAutopilotFailureMonitor(ctx, queries, bus, cfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// tickAutopilotFailureMonitor performs a single sweep: query candidates,
|
||||
// attempt to pause each, and emit notifications + WS events on success.
|
||||
func tickAutopilotFailureMonitor(ctx context.Context, queries *db.Queries, bus *events.Bus, cfg failureMonitorConfig) {
|
||||
since := time.Now().Add(-cfg.Lookback)
|
||||
candidates, err := queries.SelectAutopilotsExceedingFailureThreshold(
|
||||
ctx,
|
||||
db.SelectAutopilotsExceedingFailureThresholdParams{
|
||||
MinRuns: cfg.MinRuns,
|
||||
FailRatioThreshold: cfg.FailRatio,
|
||||
Since: pgtype.Timestamptz{Time: since, Valid: true},
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
slog.Warn("autopilot failure monitor: failed to query candidates", "error", err)
|
||||
return
|
||||
}
|
||||
if len(candidates) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("autopilot failure monitor: candidates", "count", len(candidates))
|
||||
|
||||
for _, c := range candidates {
|
||||
paused, err := queries.SystemPauseAutopilot(ctx, c.ID)
|
||||
if err != nil {
|
||||
// pgx returns ErrNoRows when the WHERE status='active' clause
|
||||
// matched zero rows — i.e. another caller (manual UI action,
|
||||
// concurrent monitor) paused it first. Treat as a benign no-op.
|
||||
if isNoRows(err) {
|
||||
continue
|
||||
}
|
||||
slog.Warn("autopilot failure monitor: pause failed",
|
||||
"autopilot_id", util.UUIDToString(c.ID),
|
||||
"error", err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
failPct := 100.0
|
||||
if c.TotalRuns > 0 {
|
||||
failPct = math.Round(float64(c.FailedRuns)/float64(c.TotalRuns)*1000) / 10 // one decimal place
|
||||
}
|
||||
|
||||
slog.Info(
|
||||
"autopilot failure monitor: paused autopilot",
|
||||
"autopilot_id", util.UUIDToString(c.ID),
|
||||
"workspace_id", util.UUIDToString(c.WorkspaceID),
|
||||
"title", c.Title,
|
||||
"failed_runs", c.FailedRuns,
|
||||
"total_runs", c.TotalRuns,
|
||||
"fail_pct", failPct,
|
||||
)
|
||||
|
||||
emitAutopilotPausedNotifications(ctx, queries, bus, paused, c, cfg, failPct)
|
||||
|
||||
// Fan out the status change so any open UI updates the autopilot row.
|
||||
workspaceID := util.UUIDToString(paused.WorkspaceID)
|
||||
bus.Publish(events.Event{
|
||||
Type: protocol.EventAutopilotUpdated,
|
||||
WorkspaceID: workspaceID,
|
||||
ActorType: "system",
|
||||
Payload: map[string]any{
|
||||
"autopilot": autopilotEventPayload(paused),
|
||||
"reason": "auto_paused_high_failure_rate",
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// emitAutopilotPausedNotifications creates one inbox_item per relevant
|
||||
// recipient and publishes inbox:new events so each lands live. Recipients:
|
||||
//
|
||||
// 1. The autopilot creator if a member.
|
||||
// 2. If the autopilot creator is an agent, the agent's owner_id (mapped to a
|
||||
// workspace member).
|
||||
//
|
||||
// Resolving against owner_id keeps us from pinging an agent whose inbox isn't
|
||||
// actionable, while still attributing the alert to whoever set the autopilot
|
||||
// up. If neither path lands a human (e.g. agent has no owner), we skip
|
||||
// silently — the WS autopilot:updated event still surfaces the change in the
|
||||
// UI for any logged-in workspace member.
|
||||
func emitAutopilotPausedNotifications(
|
||||
ctx context.Context,
|
||||
queries *db.Queries,
|
||||
bus *events.Bus,
|
||||
autopilot db.Autopilot,
|
||||
candidate db.SelectAutopilotsExceedingFailureThresholdRow,
|
||||
cfg failureMonitorConfig,
|
||||
failPct float64,
|
||||
) {
|
||||
recipients := resolveAutopilotPausedRecipients(ctx, queries, autopilot)
|
||||
if len(recipients) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
title := fmt.Sprintf("Autopilot paused: %s", autopilot.Title)
|
||||
body := fmt.Sprintf(
|
||||
"Auto-paused after %d of %d runs failed (%.1f%%) in the last %s. Investigate the failures, fix the root cause, then re-enable from the autopilot page.",
|
||||
candidate.FailedRuns, candidate.TotalRuns, failPct, formatLookback(cfg.Lookback),
|
||||
)
|
||||
details, _ := json.Marshal(map[string]any{
|
||||
"autopilot_id": util.UUIDToString(autopilot.ID),
|
||||
"autopilot_title": autopilot.Title,
|
||||
"failed_runs": candidate.FailedRuns,
|
||||
"total_runs": candidate.TotalRuns,
|
||||
"fail_pct": failPct,
|
||||
"lookback_seconds": int64(cfg.Lookback.Seconds()),
|
||||
"threshold_min_runs": cfg.MinRuns,
|
||||
"threshold_fail_ratio": cfg.FailRatio,
|
||||
"reason": "auto_paused_high_failure_rate",
|
||||
})
|
||||
|
||||
workspaceID := util.UUIDToString(autopilot.WorkspaceID)
|
||||
autopilotIDStr := util.UUIDToString(autopilot.ID)
|
||||
|
||||
emitted := make(map[string]bool, len(recipients))
|
||||
for _, r := range recipients {
|
||||
key := r.Type + ":" + util.UUIDToString(r.ID)
|
||||
if emitted[key] {
|
||||
continue
|
||||
}
|
||||
emitted[key] = true
|
||||
|
||||
item, err := queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
|
||||
WorkspaceID: autopilot.WorkspaceID,
|
||||
RecipientType: r.Type,
|
||||
RecipientID: r.ID,
|
||||
Type: "autopilot_paused",
|
||||
Severity: "attention",
|
||||
IssueID: pgtype.UUID{},
|
||||
Title: title,
|
||||
Body: util.StrToText(body),
|
||||
ActorType: util.StrToText("system"),
|
||||
ActorID: pgtype.UUID{},
|
||||
Details: details,
|
||||
})
|
||||
if err != nil {
|
||||
slog.Warn("autopilot failure monitor: inbox write failed",
|
||||
"autopilot_id", autopilotIDStr,
|
||||
"recipient_type", r.Type,
|
||||
"recipient_id", util.UUIDToString(r.ID),
|
||||
"error", err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
bus.Publish(events.Event{
|
||||
Type: protocol.EventInboxNew,
|
||||
WorkspaceID: workspaceID,
|
||||
ActorType: "system",
|
||||
ActorID: "",
|
||||
Payload: map[string]any{"item": inboxItemToResponse(item)},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// pausedRecipient identifies a single inbox_item recipient.
|
||||
type pausedRecipient struct {
|
||||
Type string // "member" or "agent"
|
||||
ID pgtype.UUID
|
||||
}
|
||||
|
||||
func resolveAutopilotPausedRecipients(
|
||||
ctx context.Context,
|
||||
queries *db.Queries,
|
||||
autopilot db.Autopilot,
|
||||
) []pausedRecipient {
|
||||
if autopilot.CreatedByType == "member" {
|
||||
return []pausedRecipient{{Type: "member", ID: autopilot.CreatedByID}}
|
||||
}
|
||||
|
||||
// Creator is an agent — find the agent's human owner so the alert lands
|
||||
// somewhere actionable. If we can't resolve a member, skip notification
|
||||
// rather than spam an agent that can't act on it.
|
||||
agent, err := queries.GetAgent(ctx, autopilot.CreatedByID)
|
||||
if err != nil {
|
||||
slog.Debug("autopilot failure monitor: failed to load creator agent",
|
||||
"agent_id", util.UUIDToString(autopilot.CreatedByID),
|
||||
"error", err,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
if !agent.OwnerID.Valid {
|
||||
return nil
|
||||
}
|
||||
|
||||
member, err := queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
|
||||
UserID: agent.OwnerID,
|
||||
WorkspaceID: autopilot.WorkspaceID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return []pausedRecipient{{Type: "member", ID: member.UserID}}
|
||||
}
|
||||
|
||||
// autopilotEventPayload builds the minimal payload shape consumed by
|
||||
// frontend listeners (mirrors handler.AutopilotResponse). Kept here instead
|
||||
// of importing the handler package to avoid a cycle (handler imports the
|
||||
// service which we're sitting alongside in cmd/server).
|
||||
func autopilotEventPayload(a db.Autopilot) map[string]any {
|
||||
return map[string]any{
|
||||
"id": util.UUIDToString(a.ID),
|
||||
"workspace_id": util.UUIDToString(a.WorkspaceID),
|
||||
"title": a.Title,
|
||||
"description": util.TextToPtr(a.Description),
|
||||
"assignee_id": util.UUIDToString(a.AssigneeID),
|
||||
"status": a.Status,
|
||||
"execution_mode": a.ExecutionMode,
|
||||
"issue_title_template": util.TextToPtr(a.IssueTitleTemplate),
|
||||
"created_by_type": a.CreatedByType,
|
||||
"created_by_id": util.UUIDToString(a.CreatedByID),
|
||||
"last_run_at": util.TimestampToPtr(a.LastRunAt),
|
||||
"created_at": util.TimestampToString(a.CreatedAt),
|
||||
"updated_at": util.TimestampToString(a.UpdatedAt),
|
||||
}
|
||||
}
|
||||
|
||||
// isNoRows wraps the sentinel for pgx :one queries that match no rows. The
|
||||
// SystemPauseAutopilot UPDATE returns no rows when the autopilot was already
|
||||
// paused/archived, which we want to treat as a benign no-op rather than an
|
||||
// error to log.
|
||||
func isNoRows(err error) bool {
|
||||
return errors.Is(err, pgx.ErrNoRows)
|
||||
}
|
||||
|
||||
func formatLookback(d time.Duration) string {
|
||||
if d <= 0 {
|
||||
return "0s"
|
||||
}
|
||||
hours := d / time.Hour
|
||||
if hours >= 24 && d%(24*time.Hour) == 0 {
|
||||
days := hours / 24
|
||||
if days == 1 {
|
||||
return "1 day"
|
||||
}
|
||||
return fmt.Sprintf("%d days", days)
|
||||
}
|
||||
if d%time.Hour == 0 {
|
||||
if hours == 1 {
|
||||
return "1 hour"
|
||||
}
|
||||
return fmt.Sprintf("%d hours", hours)
|
||||
}
|
||||
return d.String()
|
||||
}
|
||||
|
||||
// envDurationOrZero parses a duration env var. An explicit 0/negative is
|
||||
// honored (used to disable the monitor); empty returns the default; an
|
||||
// unparseable value warns and returns the default.
|
||||
func envDurationOrZero(name string, def time.Duration) time.Duration {
|
||||
raw := os.Getenv(name)
|
||||
if raw == "" {
|
||||
return def
|
||||
}
|
||||
v, err := time.ParseDuration(raw)
|
||||
if err != nil {
|
||||
slog.Warn("invalid env var, using default", "name", name, "value", raw, "default", def.String(), "error", err)
|
||||
return def
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func envDurationPositive(name string, def time.Duration) time.Duration {
|
||||
raw := os.Getenv(name)
|
||||
if raw == "" {
|
||||
return def
|
||||
}
|
||||
v, err := time.ParseDuration(raw)
|
||||
if err != nil || v <= 0 {
|
||||
slog.Warn("invalid env var, using default", "name", name, "value", raw, "default", def.String(), "error", err)
|
||||
return def
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func envDurationNonNegative(name string, def time.Duration) time.Duration {
|
||||
raw := os.Getenv(name)
|
||||
if raw == "" {
|
||||
return def
|
||||
}
|
||||
v, err := time.ParseDuration(raw)
|
||||
if err != nil || v < 0 {
|
||||
slog.Warn("invalid env var, using default", "name", name, "value", raw, "default", def.String(), "error", err)
|
||||
return def
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func envInt64Positive(name string) (int64, bool) {
|
||||
raw := os.Getenv(name)
|
||||
if raw == "" {
|
||||
return 0, false
|
||||
}
|
||||
v, err := strconv.ParseInt(raw, 10, 64)
|
||||
if err != nil || v <= 0 {
|
||||
slog.Warn("invalid env var, ignored", "name", name, "value", raw, "error", err)
|
||||
return 0, false
|
||||
}
|
||||
return v, true
|
||||
}
|
||||
|
||||
func envFloatInUnitInterval(name string) (float64, bool) {
|
||||
raw := os.Getenv(name)
|
||||
if raw == "" {
|
||||
return 0, false
|
||||
}
|
||||
v, err := strconv.ParseFloat(raw, 64)
|
||||
if err != nil || v <= 0 || v > 1 {
|
||||
slog.Warn("invalid env var (must be in (0,1]), ignored", "name", name, "value", raw, "error", err)
|
||||
return 0, false
|
||||
}
|
||||
return v, true
|
||||
}
|
||||
259
server/cmd/server/autopilot_failure_monitor_test.go
Normal file
259
server/cmd/server/autopilot_failure_monitor_test.go
Normal file
@@ -0,0 +1,259 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// pickFixtureAgent grabs the first agent in the workspace fixture. The
|
||||
// integration TestMain seeds exactly one agent, so this is deterministic.
|
||||
func pickFixtureAgent(t *testing.T) pgtype.UUID {
|
||||
t.Helper()
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(context.Background(),
|
||||
`SELECT id::text FROM agent WHERE workspace_id = $1 ORDER BY created_at ASC LIMIT 1`,
|
||||
testWorkspaceID,
|
||||
).Scan(&agentID); err != nil {
|
||||
t.Fatalf("load fixture agent: %v", err)
|
||||
}
|
||||
return parseUUID(agentID)
|
||||
}
|
||||
|
||||
// seedAutopilot creates an autopilot owned by the given creator (member or
|
||||
// agent UUID + type) and registers cleanup. Status defaults to "active".
|
||||
func seedAutopilot(t *testing.T, queries *db.Queries, title, creatorType string, creatorID pgtype.UUID, agentID pgtype.UUID) db.Autopilot {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
ap, err := queries.CreateAutopilot(ctx, db.CreateAutopilotParams{
|
||||
WorkspaceID: parseUUID(testWorkspaceID),
|
||||
Title: title,
|
||||
AssigneeID: agentID,
|
||||
Status: "active",
|
||||
ExecutionMode: "run_only",
|
||||
CreatedByType: creatorType,
|
||||
CreatedByID: creatorID,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("CreateAutopilot: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
// inbox_item has no FK to autopilot, so clean both up explicitly.
|
||||
testPool.Exec(context.Background(),
|
||||
`DELETE FROM inbox_item WHERE workspace_id = $1 AND details->>'autopilot_id' = $2`,
|
||||
testWorkspaceID, util.UUIDToString(ap.ID))
|
||||
testPool.Exec(context.Background(), `DELETE FROM autopilot WHERE id = $1`, ap.ID)
|
||||
})
|
||||
return ap
|
||||
}
|
||||
|
||||
// seedAutopilotRuns inserts n runs for the given autopilot, the first
|
||||
// `failed` of which have status='failed' and the rest 'completed'. All runs
|
||||
// are timestamped at `runAt` so they fall inside or outside a chosen lookback
|
||||
// window deterministically.
|
||||
func seedAutopilotRuns(t *testing.T, autopilotID pgtype.UUID, total, failed int, runAt time.Time) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
for i := 0; i < total; i++ {
|
||||
status := "completed"
|
||||
if i < failed {
|
||||
status = "failed"
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO autopilot_run (autopilot_id, source, status, created_at, triggered_at, completed_at)
|
||||
VALUES ($1, 'schedule', $2, $3, $3, $3)
|
||||
`, autopilotID, status, runAt); err != nil {
|
||||
t.Fatalf("seed autopilot_run: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func reloadAutopilotStatus(t *testing.T, queries *db.Queries, id pgtype.UUID) string {
|
||||
t.Helper()
|
||||
ap, err := queries.GetAutopilot(context.Background(), id)
|
||||
if err != nil {
|
||||
t.Fatalf("GetAutopilot: %v", err)
|
||||
}
|
||||
return ap.Status
|
||||
}
|
||||
|
||||
func TestAutopilotFailureMonitor_PausesOffenderAndNotifiesCreator(t *testing.T) {
|
||||
queries := db.New(testPool)
|
||||
bus := events.New()
|
||||
|
||||
cfg := failureMonitorConfig{
|
||||
Interval: time.Hour,
|
||||
Lookback: 7 * 24 * time.Hour,
|
||||
MinRuns: 10,
|
||||
FailRatio: 0.9,
|
||||
}
|
||||
|
||||
agentID := pickFixtureAgent(t)
|
||||
offender := seedAutopilot(t, queries, "Failure monitor: offender", "member", parseUUID(testUserID), agentID)
|
||||
innocent := seedAutopilot(t, queries, "Failure monitor: innocent", "member", parseUUID(testUserID), agentID)
|
||||
|
||||
now := time.Now()
|
||||
// 12 runs in window, 11 failed → 91.6% > 90% and ≥10 min runs.
|
||||
seedAutopilotRuns(t, offender.ID, 12, 11, now.Add(-1*time.Hour))
|
||||
// Innocent: also lots of failures, but they fall outside the lookback.
|
||||
seedAutopilotRuns(t, innocent.ID, 12, 12, now.Add(-30*24*time.Hour))
|
||||
// Innocent: a few recent runs but below min_runs threshold.
|
||||
seedAutopilotRuns(t, innocent.ID, 5, 5, now.Add(-1*time.Hour))
|
||||
|
||||
var inboxEvents []events.Event
|
||||
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
|
||||
inboxEvents = append(inboxEvents, e)
|
||||
})
|
||||
var updateEvents []events.Event
|
||||
bus.Subscribe(protocol.EventAutopilotUpdated, func(e events.Event) {
|
||||
updateEvents = append(updateEvents, e)
|
||||
})
|
||||
|
||||
tickAutopilotFailureMonitor(context.Background(), queries, bus, cfg)
|
||||
|
||||
if got := reloadAutopilotStatus(t, queries, offender.ID); got != "paused" {
|
||||
t.Fatalf("expected offender to be paused, got %q", got)
|
||||
}
|
||||
if got := reloadAutopilotStatus(t, queries, innocent.ID); got != "active" {
|
||||
t.Fatalf("expected innocent to stay active, got %q", got)
|
||||
}
|
||||
|
||||
if len(updateEvents) != 1 {
|
||||
t.Fatalf("expected 1 autopilot:updated event, got %d", len(updateEvents))
|
||||
}
|
||||
if got := updateEvents[0].Payload.(map[string]any)["reason"]; got != "auto_paused_high_failure_rate" {
|
||||
t.Fatalf("expected reason auto_paused_high_failure_rate, got %v", got)
|
||||
}
|
||||
|
||||
if len(inboxEvents) != 1 {
|
||||
t.Fatalf("expected 1 inbox:new event, got %d", len(inboxEvents))
|
||||
}
|
||||
item := inboxEvents[0].Payload.(map[string]any)["item"].(map[string]any)
|
||||
if got := item["type"]; got != "autopilot_paused" {
|
||||
t.Fatalf("expected inbox type autopilot_paused, got %v", got)
|
||||
}
|
||||
if got := item["severity"]; got != "attention" {
|
||||
t.Fatalf("expected severity attention, got %v", got)
|
||||
}
|
||||
if got := item["recipient_id"]; got != testUserID {
|
||||
t.Fatalf("expected recipient %s, got %v", testUserID, got)
|
||||
}
|
||||
|
||||
// Confirm the inbox item exists in the DB too.
|
||||
items := inboxItemsForRecipient(t, queries, testUserID)
|
||||
var found bool
|
||||
for _, it := range items {
|
||||
if it.Type == "autopilot_paused" {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("expected an autopilot_paused inbox_item in DB for user %s", testUserID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutopilotFailureMonitor_LeavesAlreadyPausedAlone(t *testing.T) {
|
||||
queries := db.New(testPool)
|
||||
bus := events.New()
|
||||
cfg := failureMonitorConfig{
|
||||
Interval: time.Hour,
|
||||
Lookback: 7 * 24 * time.Hour,
|
||||
MinRuns: 10,
|
||||
FailRatio: 0.9,
|
||||
}
|
||||
|
||||
agentID := pickFixtureAgent(t)
|
||||
ap := seedAutopilot(t, queries, "Failure monitor: already paused", "member", parseUUID(testUserID), agentID)
|
||||
seedAutopilotRuns(t, ap.ID, 12, 11, time.Now().Add(-1*time.Hour))
|
||||
|
||||
// Manually pause first.
|
||||
if _, err := testPool.Exec(context.Background(),
|
||||
`UPDATE autopilot SET status = 'paused' WHERE id = $1`, ap.ID); err != nil {
|
||||
t.Fatalf("manual pause: %v", err)
|
||||
}
|
||||
|
||||
var inboxEvents []events.Event
|
||||
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
|
||||
inboxEvents = append(inboxEvents, e)
|
||||
})
|
||||
|
||||
tickAutopilotFailureMonitor(context.Background(), queries, bus, cfg)
|
||||
|
||||
if len(inboxEvents) != 0 {
|
||||
t.Fatalf("paused autopilots must not generate notifications, got %d", len(inboxEvents))
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutopilotFailureMonitor_AgentCreatorRoutesToOwner(t *testing.T) {
|
||||
queries := db.New(testPool)
|
||||
bus := events.New()
|
||||
cfg := failureMonitorConfig{
|
||||
Interval: time.Hour,
|
||||
Lookback: 7 * 24 * time.Hour,
|
||||
MinRuns: 10,
|
||||
FailRatio: 0.9,
|
||||
}
|
||||
|
||||
agentID := pickFixtureAgent(t)
|
||||
// The fixture agent's owner_id is testUserID (set in setupIntegrationTestFixture).
|
||||
ap := seedAutopilot(t, queries, "Failure monitor: agent-created", "agent", agentID, agentID)
|
||||
seedAutopilotRuns(t, ap.ID, 11, 10, time.Now().Add(-2*time.Hour))
|
||||
|
||||
var inboxEvents []events.Event
|
||||
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
|
||||
inboxEvents = append(inboxEvents, e)
|
||||
})
|
||||
|
||||
tickAutopilotFailureMonitor(context.Background(), queries, bus, cfg)
|
||||
|
||||
if got := reloadAutopilotStatus(t, queries, ap.ID); got != "paused" {
|
||||
t.Fatalf("expected paused, got %q", got)
|
||||
}
|
||||
if len(inboxEvents) != 1 {
|
||||
t.Fatalf("expected 1 inbox event for the agent's owner, got %d", len(inboxEvents))
|
||||
}
|
||||
item := inboxEvents[0].Payload.(map[string]any)["item"].(map[string]any)
|
||||
if got := item["recipient_id"]; got != testUserID {
|
||||
t.Fatalf("expected recipient owner %s, got %v", testUserID, got)
|
||||
}
|
||||
if got := item["recipient_type"]; got != "member" {
|
||||
t.Fatalf("expected member recipient_type, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAutopilotFailureMonitor_BelowThresholdNoOp(t *testing.T) {
|
||||
queries := db.New(testPool)
|
||||
bus := events.New()
|
||||
cfg := failureMonitorConfig{
|
||||
Interval: time.Hour,
|
||||
Lookback: 7 * 24 * time.Hour,
|
||||
MinRuns: 10,
|
||||
FailRatio: 0.9,
|
||||
}
|
||||
|
||||
agentID := pickFixtureAgent(t)
|
||||
ap := seedAutopilot(t, queries, "Failure monitor: under threshold", "member", parseUUID(testUserID), agentID)
|
||||
// 12 total, 5 failed → 41.6% < 90%.
|
||||
seedAutopilotRuns(t, ap.ID, 12, 5, time.Now().Add(-1*time.Hour))
|
||||
|
||||
var inboxEvents []events.Event
|
||||
bus.Subscribe(protocol.EventInboxNew, func(e events.Event) {
|
||||
inboxEvents = append(inboxEvents, e)
|
||||
})
|
||||
|
||||
tickAutopilotFailureMonitor(context.Background(), queries, bus, cfg)
|
||||
|
||||
if got := reloadAutopilotStatus(t, queries, ap.ID); got != "active" {
|
||||
t.Fatalf("expected active, got %q", got)
|
||||
}
|
||||
if len(inboxEvents) != 0 {
|
||||
t.Fatalf("expected no inbox events, got %d", len(inboxEvents))
|
||||
}
|
||||
}
|
||||
@@ -307,6 +307,7 @@ func main() {
|
||||
// Start background sweeper to mark stale runtimes as offline.
|
||||
go runRuntimeSweeper(sweepCtx, queries, liveness, taskSvc, bus)
|
||||
go runAutopilotScheduler(autopilotCtx, queries, autopilotSvc)
|
||||
go runAutopilotFailureMonitor(autopilotCtx, queries, bus, envFailureMonitorConfig())
|
||||
go runDBStatsLogger(sweepCtx, pool)
|
||||
|
||||
if metricsServer != nil {
|
||||
|
||||
@@ -686,6 +686,115 @@ func (q *Queries) RecoverLostTriggers(ctx context.Context) ([]RecoverLostTrigger
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const selectAutopilotsExceedingFailureThreshold = `-- name: SelectAutopilotsExceedingFailureThreshold :many
|
||||
|
||||
WITH stats AS (
|
||||
SELECT autopilot_id,
|
||||
count(*) FILTER (WHERE status IN ('completed', 'failed', 'skipped')) AS total,
|
||||
count(*) FILTER (WHERE status = 'failed') AS failed
|
||||
FROM autopilot_run
|
||||
WHERE created_at >= $3::timestamptz
|
||||
GROUP BY autopilot_id
|
||||
)
|
||||
SELECT a.id, a.workspace_id, a.title, a.assignee_id,
|
||||
a.created_by_type, a.created_by_id,
|
||||
s.total::bigint AS total_runs,
|
||||
s.failed::bigint AS failed_runs
|
||||
FROM autopilot a
|
||||
JOIN stats s ON s.autopilot_id = a.id
|
||||
WHERE a.status = 'active'
|
||||
AND s.total >= $1::bigint
|
||||
AND s.failed::float8 / NULLIF(s.total, 0)::float8 >= $2::float8
|
||||
ORDER BY s.failed DESC, a.id ASC
|
||||
`
|
||||
|
||||
type SelectAutopilotsExceedingFailureThresholdParams struct {
|
||||
MinRuns int64 `json:"min_runs"`
|
||||
FailRatioThreshold float64 `json:"fail_ratio_threshold"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
}
|
||||
|
||||
type SelectAutopilotsExceedingFailureThresholdRow struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Title string `json:"title"`
|
||||
AssigneeID pgtype.UUID `json:"assignee_id"`
|
||||
CreatedByType string `json:"created_by_type"`
|
||||
CreatedByID pgtype.UUID `json:"created_by_id"`
|
||||
TotalRuns int64 `json:"total_runs"`
|
||||
FailedRuns int64 `json:"failed_runs"`
|
||||
}
|
||||
|
||||
// =====================
|
||||
// Failure-rate auto-pause
|
||||
// =====================
|
||||
// Find active autopilots whose recent run failure rate exceeds the threshold.
|
||||
// Counts only terminal runs (completed | failed | skipped); pending,
|
||||
// issue_created and running are excluded so in-flight work isn't penalised.
|
||||
// Used by the failure monitor to auto-pause sustained-failure autopilots
|
||||
// (the canonical example from MUL-1336 was an autopilot scheduled every 5 min
|
||||
// that 100% failed for days, burning ~1.5k useless tasks per week).
|
||||
func (q *Queries) SelectAutopilotsExceedingFailureThreshold(ctx context.Context, arg SelectAutopilotsExceedingFailureThresholdParams) ([]SelectAutopilotsExceedingFailureThresholdRow, error) {
|
||||
rows, err := q.db.Query(ctx, selectAutopilotsExceedingFailureThreshold, arg.MinRuns, arg.FailRatioThreshold, arg.Since)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []SelectAutopilotsExceedingFailureThresholdRow{}
|
||||
for rows.Next() {
|
||||
var i SelectAutopilotsExceedingFailureThresholdRow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.AssigneeID,
|
||||
&i.CreatedByType,
|
||||
&i.CreatedByID,
|
||||
&i.TotalRuns,
|
||||
&i.FailedRuns,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const systemPauseAutopilot = `-- name: SystemPauseAutopilot :one
|
||||
UPDATE autopilot
|
||||
SET status = 'paused', updated_at = now()
|
||||
WHERE id = $1 AND status = 'active'
|
||||
RETURNING id, workspace_id, title, description, assignee_id, status, execution_mode, issue_title_template, created_by_type, created_by_id, last_run_at, created_at, updated_at
|
||||
`
|
||||
|
||||
// Atomically pauses an autopilot only if it is currently active. Returns no
|
||||
// rows when the autopilot was already paused/archived (or another worker
|
||||
// raced first), letting the caller treat that as a benign no-op rather than
|
||||
// an error.
|
||||
func (q *Queries) SystemPauseAutopilot(ctx context.Context, id pgtype.UUID) (Autopilot, error) {
|
||||
row := q.db.QueryRow(ctx, systemPauseAutopilot, id)
|
||||
var i Autopilot
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.Description,
|
||||
&i.AssigneeID,
|
||||
&i.Status,
|
||||
&i.ExecutionMode,
|
||||
&i.IssueTitleTemplate,
|
||||
&i.CreatedByType,
|
||||
&i.CreatedByID,
|
||||
&i.LastRunAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const updateAutopilot = `-- name: UpdateAutopilot :one
|
||||
UPDATE autopilot SET
|
||||
title = COALESCE($2, title),
|
||||
|
||||
@@ -194,3 +194,43 @@ WHERE t.kind = 'schedule'
|
||||
AND t.next_run_at IS NULL
|
||||
AND t.cron_expression IS NOT NULL
|
||||
AND a.status = 'active';
|
||||
|
||||
-- =====================
|
||||
-- Failure-rate auto-pause
|
||||
-- =====================
|
||||
|
||||
-- name: SelectAutopilotsExceedingFailureThreshold :many
|
||||
-- Find active autopilots whose recent run failure rate exceeds the threshold.
|
||||
-- Counts only terminal runs (completed | failed | skipped); pending,
|
||||
-- issue_created and running are excluded so in-flight work isn't penalised.
|
||||
-- Used by the failure monitor to auto-pause sustained-failure autopilots
|
||||
-- (the canonical example from MUL-1336 was an autopilot scheduled every 5 min
|
||||
-- that 100% failed for days, burning ~1.5k useless tasks per week).
|
||||
WITH stats AS (
|
||||
SELECT autopilot_id,
|
||||
count(*) FILTER (WHERE status IN ('completed', 'failed', 'skipped')) AS total,
|
||||
count(*) FILTER (WHERE status = 'failed') AS failed
|
||||
FROM autopilot_run
|
||||
WHERE created_at >= sqlc.arg('since')::timestamptz
|
||||
GROUP BY autopilot_id
|
||||
)
|
||||
SELECT a.id, a.workspace_id, a.title, a.assignee_id,
|
||||
a.created_by_type, a.created_by_id,
|
||||
s.total::bigint AS total_runs,
|
||||
s.failed::bigint AS failed_runs
|
||||
FROM autopilot a
|
||||
JOIN stats s ON s.autopilot_id = a.id
|
||||
WHERE a.status = 'active'
|
||||
AND s.total >= sqlc.arg('min_runs')::bigint
|
||||
AND s.failed::float8 / NULLIF(s.total, 0)::float8 >= sqlc.arg('fail_ratio_threshold')::float8
|
||||
ORDER BY s.failed DESC, a.id ASC;
|
||||
|
||||
-- name: SystemPauseAutopilot :one
|
||||
-- Atomically pauses an autopilot only if it is currently active. Returns no
|
||||
-- rows when the autopilot was already paused/archived (or another worker
|
||||
-- raced first), letting the caller treat that as a benign no-op rather than
|
||||
-- an error.
|
||||
UPDATE autopilot
|
||||
SET status = 'paused', updated_at = now()
|
||||
WHERE id = $1 AND status = 'active'
|
||||
RETURNING *;
|
||||
|
||||
Reference in New Issue
Block a user