MUL-2965 fix(metrics): harden business sampler quality (#3738)

* fix(metrics): harden business sampler quality (MUL-2965)

Co-authored-by: multica-agent <github@multica.ai>

* fix(metrics): alert on sampler acquire failures

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: Eve <eve@multica-ai.local>
Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
Multica Eve
2026-06-04 11:21:51 +08:00
committed by GitHub
parent 26971e7e45
commit 945d684afd
9 changed files with 113 additions and 21 deletions

View File

@@ -0,0 +1,43 @@
{{- if .Values.monitoring.prometheusRule.enabled }}
{{- $samplerQueryNames := "active_users|active_workspaces|task_queued|task_running|task_stuck|runtime_online|runtime_heartbeat_age|workspace_total" -}}
{{- $samplerErrorNames := printf "acquire|%s" $samplerQueryNames -}}
apiVersion: monitoring.coreos.com/v1
kind: PrometheusRule
metadata:
name: {{ include "multica.backend.fullname" . }}-sampler
labels:
{{- include "multica.labels" . | nindent 4 }}
app.kubernetes.io/component: backend
{{- with .Values.monitoring.prometheusRule.additionalLabels }}
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
groups:
- name: multica-business-sampler
rules:
- alert: MulticaBusinessSamplerQueryErrors
expr: |
sum by (name) (
rate(multica_business_sampler_query_errors_total{name=~"{{ $samplerErrorNames }}"}[10m])
) > 0
for: {{ .Values.monitoring.prometheusRule.samplerQueryErrorsFor | quote }}
labels:
severity: {{ .Values.monitoring.prometheusRule.severity | quote }}
annotations:
summary: "Business sampler query errors"
description: "Business sampler query {{ "{{ $labels.name }}" }} is returning errors. Metrics may go stale if the query keeps timing out."
- alert: MulticaBusinessSamplerQueryLatencyHigh
expr: |
histogram_quantile(
0.95,
sum by (le, name) (
rate(multica_business_sampler_query_seconds_bucket{name=~"{{ $samplerQueryNames }}"}[10m])
)
) > 0.3
for: {{ .Values.monitoring.prometheusRule.samplerQueryLatencyFor | quote }}
labels:
severity: {{ .Values.monitoring.prometheusRule.severity | quote }}
annotations:
summary: "Business sampler query p95 is high"
description: "Business sampler query {{ "{{ $labels.name }}" }} p95 is above 300ms, leaving little headroom before the 500ms statement_timeout."
{{- end }}

View File

@@ -160,3 +160,17 @@ ingress:
# tls:
# - hosts: [multica.dev.lan, api.multica.dev.lan]
# secretName: multica-tls
# -----------------------------------------------------------------------------
# Monitoring
# -----------------------------------------------------------------------------
monitoring:
prometheusRule:
# Requires the Prometheus Operator CRD (monitoring.coreos.com/v1). Kept
# disabled by default so minimal self-host installs without that CRD still
# render and install.
enabled: false
additionalLabels: {}
samplerQueryErrorsFor: 5m
samplerQueryLatencyFor: 10m
severity: warning

View File

@@ -29,12 +29,12 @@ const (
defaultSamplerQueryTimeout = 500 * time.Millisecond
samplerRowLimit = 100
// Active-user / active-workspace windows. The label set is fixed —
// new windows must be added explicitly here so cardinality stays
// bounded under all input.
// Active-user / active-workspace DB windows. Keep this DB-sampled path
// to the short window only: PR2's counters do not carry user/workspace
// IDs, so PromQL cannot derive distinct 1h/24h active estimates without
// adding high-cardinality labels. Long-window actives need a separate
// counter-derived aggregation, not expanding this sampler over history.
windowFiveMinutes = "5m"
windowOneHour = "1h"
windowOneDay = "24h"
// Runtime is considered online if its last_seen_at is within this
// many seconds of `now()`. 60s matches the daemon heartbeat cadence
@@ -54,8 +54,6 @@ var samplerWindows = []struct {
d time.Duration
}{
{windowFiveMinutes, 5 * time.Minute},
{windowOneHour, time.Hour},
{windowOneDay, 24 * time.Hour},
}
// BusinessSamplerOptions configures the BusinessSamplerCollector. A nil

View File

@@ -19,9 +19,10 @@ import (
// queryActiveUsers fills snap.activeUsers with a count of distinct user_ids
// that have either sent a chat message or had an agent task created for one
// of their issues inside each rolling window. We deliberately skip Postgres
// `now() - interval '$1'` substitution and instead pass a typed interval as
// a bind parameter so the caller cannot inject SQL via the window string.
// of their issues inside the short rolling window. We deliberately skip
// Postgres `now() - interval '$1'` substitution and instead pass a typed
// interval as a bind parameter so the caller cannot inject SQL via the
// window string.
//
// Note on cardinality: there is intentionally NO `LIMIT` on the inner
// distinct subquery — capping it would silently truncate the COUNT to 100
@@ -62,9 +63,9 @@ SELECT count(DISTINCT user_id) FROM (
}
// queryActiveWorkspaces counts distinct workspaces with chat or task
// activity in each window. Mirrors queryActiveUsers and intentionally has
// no inner LIMIT for the same reason: a LIMIT inside the distinct subquery
// would truncate the COUNT and report a wrong value.
// activity in the short window. Mirrors queryActiveUsers and intentionally
// has no inner LIMIT for the same reason: a LIMIT inside the distinct
// subquery would truncate the COUNT and report a wrong value.
func (c *BusinessSamplerCollector) queryActiveWorkspaces(
ctx context.Context, tx pgx.Tx, snap *samplerSnapshot,
) error {
@@ -140,7 +141,19 @@ LIMIT 100
func (c *BusinessSamplerCollector) queryTaskRunning(
ctx context.Context, tx pgx.Tx, snap *samplerSnapshot,
) error {
// Keep dispatched and running in separate UNION ALL branches so
// Postgres can match the existing dispatched partial index and the
// running-only partial index from migration 114 independently.
const stmt = `
WITH in_flight AS (
SELECT chat_session_id, autopilot_run_id, issue_id, runtime_id
FROM agent_task_queue
WHERE status = 'dispatched'
UNION ALL
SELECT chat_session_id, autopilot_run_id, issue_id, runtime_id
FROM agent_task_queue
WHERE status = 'running'
)
SELECT
CASE
WHEN atq.chat_session_id IS NOT NULL THEN 'chat'
@@ -150,9 +163,8 @@ SELECT
END AS source,
COALESCE(ar.runtime_mode, 'unknown') AS runtime_mode,
count(*) AS n
FROM agent_task_queue atq
FROM in_flight atq
LEFT JOIN agent_runtime ar ON ar.id = atq.runtime_id
WHERE atq.status IN ('dispatched', 'running')
GROUP BY 1, 2
LIMIT 100
`

View File

@@ -42,11 +42,7 @@ func newTestSampler(t *testing.T, refresh func(ctx context.Context, now time.Tim
func filledSnapshot(now time.Time) *samplerSnapshot {
snap := newSamplerSnapshot(now)
snap.activeUsers[windowFiveMinutes] = 7
snap.activeUsers[windowOneHour] = 42
snap.activeUsers[windowOneDay] = 100
snap.activeWorkspaces[windowFiveMinutes] = 3
snap.activeWorkspaces[windowOneHour] = 11
snap.activeWorkspaces[windowOneDay] = 25
snap.taskQueued["chat"] = 5
snap.taskQueued["issue"] = 2
@@ -110,8 +106,6 @@ func TestBusinessSamplerCollectorEmitsExpectedMetrics(t *testing.T) {
wantSubstrings := []string{
`multica_active_users{window="5m"} 7`,
`multica_active_users{window="1h"} 42`,
`multica_active_users{window="24h"} 100`,
`multica_active_workspaces{window="5m"} 3`,
`multica_agent_task_queued{source="chat"} 5`,
`multica_agent_task_queued{source="issue"} 2`,
@@ -132,6 +126,16 @@ func TestBusinessSamplerCollectorEmitsExpectedMetrics(t *testing.T) {
t.Errorf("metrics body missing %q\nbody:\n%s", want, body)
}
}
for _, removed := range []string{
`multica_active_users{window="1h"}`,
`multica_active_users{window="24h"}`,
`multica_active_workspaces{window="1h"}`,
`multica_active_workspaces{window="24h"}`,
} {
if strings.Contains(body, removed) {
t.Errorf("metrics body still exposes removed long DB window %q\nbody:\n%s", removed, body)
}
}
}
// TestBusinessSamplerSelfIntrospectionHistogramIsExposed observes a value

View File

@@ -0,0 +1 @@
DROP INDEX CONCURRENTLY IF EXISTS idx_agent_task_queue_running_started_at;

View File

@@ -0,0 +1,11 @@
-- Covers sampler reads over running tasks:
-- * task_stuck: status = 'running' AND started_at < now() - interval '30 minutes'
-- * task_running: the running half of the in-flight task count
--
-- agent_task_queue is hot, so this must stay in its own single-statement
-- migration and must use CONCURRENTLY. The migration runner executes files
-- outside an explicit transaction; keeping CONCURRENTLY isolated avoids
-- Postgres' implicit transaction block for multi-statement query strings.
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_agent_task_queue_running_started_at
ON agent_task_queue (started_at)
WHERE status = 'running';

View File

@@ -0,0 +1 @@
DROP INDEX CONCURRENTLY IF EXISTS idx_agent_runtime_last_seen_at;

View File

@@ -0,0 +1,8 @@
-- Covers sampler reads over runtime heartbeats:
-- * runtime_online: last_seen_at > now() - online window
-- * runtime_heartbeat_age: last_seen_at > now() - 15 minutes ORDER BY last_seen_at DESC
--
-- agent_runtime heartbeat writes are frequent, so this must stay in its own
-- single-statement migration and must use CONCURRENTLY.
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_agent_runtime_last_seen_at
ON agent_runtime (last_seen_at);