Compare commits

...

1 Commits

Author SHA1 Message Date
Eve
bc2a01392f Add business metrics collectors
Co-authored-by: multica-agent <github@multica.ai>
2026-06-03 14:10:22 +08:00
12 changed files with 863 additions and 4 deletions

View File

@@ -259,6 +259,7 @@ func main() {
metricsConfig := obsmetrics.ConfigFromEnv()
var metricsServer *http.Server
var httpMetrics *obsmetrics.HTTPMetrics
var businessMetrics *obsmetrics.BusinessMetrics
if metricsConfig.Enabled() {
metricsRegistry := obsmetrics.NewRegistry(obsmetrics.RegistryOptions{
Pool: pool,
@@ -268,6 +269,7 @@ func main() {
Commit: commit,
})
httpMetrics = metricsRegistry.HTTP
businessMetrics = metricsRegistry.Business
metricsServer = obsmetrics.NewServer(metricsConfig.Addr, metricsRegistry.Gatherer)
if !obsmetrics.IsLoopbackAddr(metricsConfig.Addr) {
slog.Warn(
@@ -285,6 +287,7 @@ func main() {
r := NewRouterWithOptions(pool, hub, bus, analyticsClient, storeRedis, RouterOptions{
HTTPMetrics: httpMetrics,
BusinessMetrics: businessMetrics,
DaemonHub: daemonHub,
DaemonWakeup: daemonWakeup,
HeartbeatScheduler: heartbeatScheduler,
@@ -300,6 +303,7 @@ func main() {
autopilotCtx, autopilotCancel := context.WithCancel(context.Background())
taskSvc := service.NewTaskService(queries, pool, hub, bus, daemonWakeup)
taskSvc.Analytics = analyticsClient
taskSvc.Metrics = businessMetrics
autopilotSvc := service.NewAutopilotService(queries, pool, bus, taskSvc)
registerAutopilotListeners(bus, autopilotSvc)

View File

@@ -98,9 +98,10 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analytics
}
type RouterOptions struct {
HTTPMetrics *obsmetrics.HTTPMetrics
DaemonHub *daemonws.Hub
DaemonWakeup service.TaskWakeupNotifier
HTTPMetrics *obsmetrics.HTTPMetrics
BusinessMetrics *obsmetrics.BusinessMetrics
DaemonHub *daemonws.Hub
DaemonWakeup service.TaskWakeupNotifier
// HeartbeatScheduler, when non-nil, replaces the default synchronous
// passthrough scheduler on the constructed Handler. main.go injects a
// BatchedHeartbeatScheduler here so the caller can also drive Run/Stop;
@@ -141,6 +142,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
CloudRuntimeFleetTimeout: envDuration("MULTICA_CLOUD_FLEET_TIMEOUT", 35*time.Second),
}
h := handler.New(queries, pool, hub, bus, emailSvc, store, cfSigner, analyticsClient, signupConfig, daemonHub)
h.TaskService.Metrics = opts.BusinessMetrics
if opts.DaemonWakeup != nil {
h.TaskService.Wakeup = opts.DaemonWakeup
}

View File

@@ -253,6 +253,7 @@ func sweepStaleTasks(ctx context.Context, queries *db.Queries, taskSvc *service.
}
slog.Info("task sweeper: failed stale tasks", "count", len(failedTasks))
taskSvc.CaptureLeaseExpiredTasks(ctx, failedTasks)
taskSvc.HandleFailedTasks(ctx, failedTasks)
}
@@ -276,6 +277,7 @@ func sweepExpiredQueuedTasks(ctx context.Context, queries *db.Queries, taskSvc *
}
slog.Info("task sweeper: expired stale queued tasks", "count", len(failedTasks))
taskSvc.CaptureQueuedExpiredTasks(ctx, failedTasks)
taskSvc.HandleFailedTasks(ctx, failedTasks)
}

View File

@@ -50,6 +50,7 @@ require (
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.66.1 // indirect

View File

@@ -1860,7 +1860,8 @@ func (h *Handler) ReportTaskUsage(w http.ResponseWriter, r *http.Request) {
taskID := chi.URLParam(r, "taskId")
// Verify the caller owns this task's workspace.
if _, ok := h.requireDaemonTaskAccess(w, r, taskID); !ok {
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
if !ok {
return
}
@@ -1883,7 +1884,9 @@ func (h *Handler) ReportTaskUsage(w http.ResponseWriter, r *http.Request) {
CacheWriteTokens: u.CacheWriteTokens,
}); err != nil {
slog.Warn("upsert task usage failed", "task_id", taskID, "model", u.Model, "error", err)
continue
}
h.TaskService.CaptureTaskUsage(r.Context(), task, u.Provider, u.Model, u.InputTokens, u.OutputTokens, u.CacheReadTokens, u.CacheWriteTokens)
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})

View File

@@ -0,0 +1,328 @@
package metrics
import (
"sync"
"github.com/multica-ai/multica/server/pkg/taskfailure"
"github.com/prometheus/client_golang/prometheus"
)
var taskDurationBuckets = []float64{1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200, 3600, 7200}
type activeTaskLabels struct {
source string
runtimeMode string
}
type BusinessMetrics struct {
taskEnqueued *prometheus.CounterVec
taskDispatched *prometheus.CounterVec
taskStarted *prometheus.CounterVec
taskTerminal *prometheus.CounterVec
taskFailed *prometheus.CounterVec
taskQueueWait *prometheus.HistogramVec
taskRunSeconds *prometheus.HistogramVec
taskTotalSeconds *prometheus.HistogramVec
taskInProgress *prometheus.GaugeVec
taskIterations *prometheus.HistogramVec
llmTokens *prometheus.CounterVec
llmCostUSD *prometheus.CounterVec
llmUnpricedTokens *prometheus.CounterVec
llmRequests *prometheus.CounterVec
taskQueuedExpired *prometheus.CounterVec
taskLeaseExpired *prometheus.CounterVec
activeMu sync.Mutex
activeTasks map[string]activeTaskLabels
}
func NewBusinessMetrics() *BusinessMetrics {
validateBusinessMetricLabels()
m := &BusinessMetrics{
taskEnqueued: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "enqueued_total",
Help: "Total agent tasks enqueued.",
}, metricLabels("multica_agent_task_enqueued_total")),
taskDispatched: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "dispatched_total",
Help: "Total agent tasks dispatched to a runtime.",
}, metricLabels("multica_agent_task_dispatched_total")),
taskStarted: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "started_total",
Help: "Total agent tasks that reached running state.",
}, metricLabels("multica_agent_task_started_total")),
taskTerminal: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "terminal_total",
Help: "Total agent tasks that reached a terminal state.",
}, metricLabels("multica_agent_task_terminal_total")),
taskFailed: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "failed_total",
Help: "Total failed agent tasks by canonical failure reason.",
}, metricLabels("multica_agent_task_failed_total")),
taskQueueWait: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "queue_wait_seconds",
Help: "Time agent tasks spent queued before dispatch.",
Buckets: taskDurationBuckets,
}, metricLabels("multica_agent_task_queue_wait_seconds")),
taskRunSeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "run_seconds",
Help: "Time agent tasks spent running before a terminal state.",
Buckets: taskDurationBuckets,
}, metricLabels("multica_agent_task_run_seconds")),
taskTotalSeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "total_seconds",
Help: "Total time from agent task creation to terminal state.",
Buckets: taskDurationBuckets,
}, metricLabels("multica_agent_task_total_seconds")),
taskInProgress: prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "in_progress",
Help: "Current agent tasks dispatched by this process and not yet terminal.",
}, metricLabels("multica_agent_task_in_progress")),
taskIterations: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "multica",
Subsystem: "agent_task",
Name: "iteration_count",
Help: "Retry attempt count observed when an agent task reaches a terminal state.",
Buckets: []float64{1, 2, 3, 4, 5, 10},
}, metricLabels("multica_agent_task_iteration_count")),
llmTokens: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "llm",
Name: "tokens_total",
Help: "Total priced LLM tokens by provider, model, token type, runtime mode, and task source.",
}, metricLabels("multica_llm_tokens_total")),
llmCostUSD: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "llm",
Name: "cost_usd_total",
Help: "Total estimated priced LLM token cost in USD.",
}, metricLabels("multica_llm_cost_usd_total")),
llmUnpricedTokens: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "llm",
Name: "unpriced_tokens_total",
Help: "Total LLM tokens for model aliases without a fixed TSR price.",
}, metricLabels("multica_llm_unpriced_tokens_total")),
llmRequests: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "llm",
Name: "request_total",
Help: "Total task usage reports by normalized LLM provider and model.",
}, metricLabels("multica_llm_request_total")),
taskQueuedExpired: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "task",
Name: "queued_expired_total",
Help: "Total queued tasks expired by the scheduler.",
}, metricLabels("multica_task_queued_expired_total")),
taskLeaseExpired: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "multica",
Subsystem: "task",
Name: "lease_expired_total",
Help: "Total dispatched or running task leases expired by the scheduler.",
}, metricLabels("multica_task_lease_expired_total")),
activeTasks: map[string]activeTaskLabels{},
}
m.prewarmFailureReasons()
return m
}
func (m *BusinessMetrics) Collectors() []prometheus.Collector {
return []prometheus.Collector{
m.taskEnqueued,
m.taskDispatched,
m.taskStarted,
m.taskTerminal,
m.taskFailed,
m.taskQueueWait,
m.taskRunSeconds,
m.taskTotalSeconds,
m.taskInProgress,
m.taskIterations,
m.llmTokens,
m.llmCostUSD,
m.llmUnpricedTokens,
m.llmRequests,
m.taskQueuedExpired,
m.taskLeaseExpired,
}
}
func (m *BusinessMetrics) RecordTaskEnqueued(source, runtimeMode string) {
if m == nil {
return
}
m.taskEnqueued.WithLabelValues(NormalizeTaskSource(source), NormalizeRuntimeMode(runtimeMode)).Inc()
}
func (m *BusinessMetrics) RecordTaskDispatched(taskID, source, runtimeMode string, queueWaitSeconds float64) {
if m == nil {
return
}
source = NormalizeTaskSource(source)
runtimeMode = NormalizeRuntimeMode(runtimeMode)
m.taskDispatched.WithLabelValues(source, runtimeMode).Inc()
if queueWaitSeconds >= 0 {
m.taskQueueWait.WithLabelValues(source, runtimeMode).Observe(queueWaitSeconds)
}
m.markTaskInProgress(taskID, source, runtimeMode)
}
func (m *BusinessMetrics) RecordTaskStarted(source, runtimeMode, provider string) {
if m == nil {
return
}
m.taskStarted.WithLabelValues(
NormalizeTaskSource(source),
NormalizeRuntimeMode(runtimeMode),
NormalizeRuntimeProvider(provider),
).Inc()
}
func (m *BusinessMetrics) RecordTaskTerminal(taskID, source, runtimeMode, terminalStatus string, runSeconds, totalSeconds float64, attempt int32) {
if m == nil {
return
}
source = NormalizeTaskSource(source)
runtimeMode = NormalizeRuntimeMode(runtimeMode)
terminalStatus = NormalizeTerminalStatus(terminalStatus)
m.taskTerminal.WithLabelValues(source, runtimeMode, terminalStatus).Inc()
if runSeconds >= 0 {
m.taskRunSeconds.WithLabelValues(source, runtimeMode, terminalStatus).Observe(runSeconds)
}
if totalSeconds >= 0 {
m.taskTotalSeconds.WithLabelValues(source, runtimeMode, terminalStatus).Observe(totalSeconds)
}
if attempt < 1 {
attempt = 1
}
m.taskIterations.WithLabelValues(source, terminalStatus).Observe(float64(attempt))
m.clearTaskInProgress(taskID)
}
func (m *BusinessMetrics) RecordTaskFailed(source, runtimeMode, failureReason string) {
if m == nil {
return
}
m.taskFailed.WithLabelValues(
NormalizeTaskSource(source),
NormalizeRuntimeMode(runtimeMode),
NormalizeFailureReason(failureReason),
).Inc()
}
func (m *BusinessMetrics) RecordTaskQueuedExpired(source, runtimeMode string) {
if m == nil {
return
}
m.taskQueuedExpired.WithLabelValues(NormalizeTaskSource(source), NormalizeRuntimeMode(runtimeMode)).Inc()
}
func (m *BusinessMetrics) RecordTaskLeaseExpired(source string) {
if m == nil {
return
}
m.taskLeaseExpired.WithLabelValues(NormalizeTaskSource(source)).Inc()
}
func (m *BusinessMetrics) RecordLLMUsage(source, runtimeMode, rawProvider, modelAlias string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int64) {
if m == nil {
return
}
source = NormalizeTaskSource(source)
runtimeMode = NormalizeRuntimeMode(runtimeMode)
price, priced := PriceForModelAlias(modelAlias)
if !priced {
provider := NormalizeRuntimeProvider(rawProvider)
alias := NormalizeModelAlias(modelAlias)
m.recordUnpricedTokens(provider, alias, "input", inputTokens)
m.recordUnpricedTokens(provider, alias, "output", outputTokens)
m.recordUnpricedTokens(provider, alias, "cache_read", cacheReadTokens)
m.recordUnpricedTokens(provider, alias, "cache_write", cacheWriteTokens)
m.llmRequests.WithLabelValues(provider, "unknown", runtimeMode).Inc()
return
}
m.recordPricedTokens(price.Provider, price.Model, "input", runtimeMode, source, inputTokens, tokenCostUSD(inputTokens, price.InputPerM))
m.recordPricedTokens(price.Provider, price.Model, "output", runtimeMode, source, outputTokens, tokenCostUSD(outputTokens, price.OutputPerM))
m.recordPricedTokens(price.Provider, price.Model, "cache_read", runtimeMode, source, cacheReadTokens, tokenCostUSD(cacheReadTokens, price.CacheReadPerM))
m.recordPricedTokens(price.Provider, price.Model, "cache_write", runtimeMode, source, cacheWriteTokens, tokenCostUSD(cacheWriteTokens, price.CacheWritePerM))
m.llmRequests.WithLabelValues(price.Provider, price.Model, runtimeMode).Inc()
}
func (m *BusinessMetrics) recordPricedTokens(provider, model, tokenType, runtimeMode, source string, tokens int64, cost float64) {
if tokens <= 0 {
return
}
tokenType = NormalizeTokenType(tokenType)
m.llmTokens.WithLabelValues(provider, model, tokenType, runtimeMode, source).Add(float64(tokens))
if cost > 0 {
m.llmCostUSD.WithLabelValues(provider, model, tokenType, runtimeMode, source).Add(cost)
}
}
func (m *BusinessMetrics) recordUnpricedTokens(provider, modelAlias, tokenType string, tokens int64) {
if tokens <= 0 {
return
}
m.llmUnpricedTokens.WithLabelValues(provider, modelAlias, NormalizeTokenType(tokenType)).Add(float64(tokens))
}
func (m *BusinessMetrics) markTaskInProgress(taskID, source, runtimeMode string) {
if taskID == "" {
m.taskInProgress.WithLabelValues(source, runtimeMode).Inc()
return
}
m.activeMu.Lock()
defer m.activeMu.Unlock()
if _, ok := m.activeTasks[taskID]; ok {
return
}
m.activeTasks[taskID] = activeTaskLabels{source: source, runtimeMode: runtimeMode}
m.taskInProgress.WithLabelValues(source, runtimeMode).Inc()
}
func (m *BusinessMetrics) clearTaskInProgress(taskID string) {
if taskID == "" {
return
}
m.activeMu.Lock()
labels, ok := m.activeTasks[taskID]
if ok {
delete(m.activeTasks, taskID)
}
m.activeMu.Unlock()
if ok {
m.taskInProgress.WithLabelValues(labels.source, labels.runtimeMode).Dec()
}
}
func (m *BusinessMetrics) prewarmFailureReasons() {
for _, source := range []string{"issue", "chat", "autopilot", "autopilot_issue", "quick_create", "other"} {
for _, runtimeMode := range []string{"local", "cloud", "unknown"} {
for _, reason := range taskfailure.AllReasons() {
m.taskFailed.WithLabelValues(source, runtimeMode, reason.String()).Add(0)
}
}
}
}

View File

@@ -0,0 +1,125 @@
package metrics
import (
"strconv"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/multica-ai/multica/server/pkg/taskfailure"
)
func TestBusinessMetricsLifecycleCountersAndGauge(t *testing.T) {
m := NewBusinessMetrics()
m.RecordTaskEnqueued("issue", "local")
for i := 0; i < 100; i++ {
m.RecordTaskDispatched("task-"+strconv.Itoa(i), "issue", "local", 2.5)
}
m.RecordTaskStarted("issue", "local", "codex")
m.RecordTaskTerminal("task-0", "issue", "local", "completed", 10, 20, 1)
if got := testutil.ToFloat64(m.taskEnqueued.WithLabelValues("issue", "local")); got != 1 {
t.Fatalf("enqueued counter = %v, want 1", got)
}
if got := testutil.ToFloat64(m.taskDispatched.WithLabelValues("issue", "local")); got != 100 {
t.Fatalf("dispatched counter = %v, want 100", got)
}
if got := testutil.ToFloat64(m.taskStarted.WithLabelValues("issue", "local", "codex")); got != 1 {
t.Fatalf("started counter = %v, want 1", got)
}
if got := testutil.ToFloat64(m.taskTerminal.WithLabelValues("issue", "local", "completed")); got != 1 {
t.Fatalf("terminal counter = %v, want 1", got)
}
if got := testutil.CollectAndCount(m.taskInProgress); got != 1 {
t.Fatalf("in_progress series count = %d, want 1 despite 100 task ids", got)
}
if got := testutil.ToFloat64(m.taskInProgress.WithLabelValues("issue", "local")); got != 99 {
t.Fatalf("in_progress gauge = %v, want 99", got)
}
if got := testutil.CollectAndCount(m.taskQueueWait); got != 1 {
t.Fatalf("queue wait series count = %d, want 1", got)
}
if got := testutil.CollectAndCount(m.taskRunSeconds); got != 1 {
t.Fatalf("run seconds series count = %d, want 1", got)
}
if got := testutil.CollectAndCount(m.taskTotalSeconds); got != 1 {
t.Fatalf("total seconds series count = %d, want 1", got)
}
}
func TestBusinessMetricsFailureReasonUsesCanonicalClassifier(t *testing.T) {
m := NewBusinessMetrics()
rawError := `API Error: 429 {"error":"overloaded"}`
m.RecordTaskFailed("issue", "local", rawError)
wantReason := taskfailure.ReasonAgentProviderCapacityOrRateLimit.String()
if got := testutil.ToFloat64(m.taskFailed.WithLabelValues("issue", "local", wantReason)); got != 1 {
t.Fatalf("classified failure counter = %v, want 1", got)
}
if got := testutil.ToFloat64(m.taskFailed.WithLabelValues("issue", "local", taskfailure.ReasonAgentUnknown.String())); got != 0 {
t.Fatalf("unknown failure counter = %v, want 0", got)
}
}
func TestBusinessMetricsLLMPricingAndUnpricedTokens(t *testing.T) {
m := NewBusinessMetrics()
m.RecordLLMUsage("chat", "cloud", "codex", "gpt-5.4", 1_000_000, 2_000_000, 3_000_000, 4_000_000)
if got := testutil.ToFloat64(m.llmTokens.WithLabelValues("openai", "gpt-5.4", "input", "cloud", "chat")); got != 1_000_000 {
t.Fatalf("priced input tokens = %v, want 1000000", got)
}
if got := testutil.ToFloat64(m.llmTokens.WithLabelValues("openai", "gpt-5.4", "output", "cloud", "chat")); got != 2_000_000 {
t.Fatalf("priced output tokens = %v, want 2000000", got)
}
if got := testutil.ToFloat64(m.llmCostUSD.WithLabelValues("openai", "gpt-5.4", "input", "cloud", "chat")); got != 2.5 {
t.Fatalf("priced input cost = %v, want 2.5", got)
}
if got := testutil.ToFloat64(m.llmCostUSD.WithLabelValues("openai", "gpt-5.4", "output", "cloud", "chat")); got != 30 {
t.Fatalf("priced output cost = %v, want 30", got)
}
if got := testutil.ToFloat64(m.llmRequests.WithLabelValues("openai", "gpt-5.4", "cloud")); got != 1 {
t.Fatalf("priced request counter = %v, want 1", got)
}
m.RecordLLMUsage("issue", "local", "custom-provider", "Free Model!!", 7, 0, 0, 0)
if got := testutil.ToFloat64(m.llmUnpricedTokens.WithLabelValues("other", "free_model_", "input")); got != 7 {
t.Fatalf("unpriced input tokens = %v, want 7", got)
}
if got := testutil.ToFloat64(m.llmRequests.WithLabelValues("other", "unknown", "local")); got != 1 {
t.Fatalf("unpriced request counter = %v, want 1", got)
}
}
func TestBusinessMetricsRegistryExposesAllFamilies(t *testing.T) {
registry := prometheus.NewRegistry()
m := NewBusinessMetrics()
registry.MustRegister(m.Collectors()...)
m.RecordTaskEnqueued("issue", "local")
m.RecordTaskDispatched("task-1", "issue", "local", 1)
m.RecordTaskStarted("issue", "local", "codex")
m.RecordTaskTerminal("task-1", "issue", "local", "completed", 2, 3, 1)
m.RecordTaskFailed("issue", "local", taskfailure.ReasonTimeout.String())
m.RecordTaskQueuedExpired("issue", "local")
m.RecordTaskLeaseExpired("issue")
m.RecordLLMUsage("issue", "local", "codex", "gpt-5.4", 1, 1, 1, 1)
m.RecordLLMUsage("issue", "local", "custom-provider", "custom-model", 1, 0, 0, 0)
families, err := registry.Gather()
if err != nil {
t.Fatalf("gather: %v", err)
}
seen := make(map[string]bool, len(families))
for _, family := range families {
seen[family.GetName()] = true
}
for metric := range businessMetricLabels {
if !seen[metric] {
t.Fatalf("registry did not expose metric family %s", metric)
}
}
}

View File

@@ -0,0 +1,182 @@
package metrics
import (
"regexp"
"strings"
"github.com/multica-ai/multica/server/pkg/taskfailure"
)
const (
labelSource = "source"
labelRuntimeMode = "runtime_mode"
labelProvider = "provider"
labelTerminalStatus = "terminal_status"
labelFailureReason = "failure_reason"
labelTokenType = "token_type"
labelModel = "model"
labelModelAlias = "model_alias"
)
var businessMetricLabels = map[string][]string{
"multica_agent_task_enqueued_total": {labelSource, labelRuntimeMode},
"multica_agent_task_dispatched_total": {labelSource, labelRuntimeMode},
"multica_agent_task_started_total": {labelSource, labelRuntimeMode, labelProvider},
"multica_agent_task_terminal_total": {labelSource, labelRuntimeMode, labelTerminalStatus},
"multica_agent_task_failed_total": {labelSource, labelRuntimeMode, labelFailureReason},
"multica_agent_task_queue_wait_seconds": {labelSource, labelRuntimeMode},
"multica_agent_task_run_seconds": {labelSource, labelRuntimeMode, labelTerminalStatus},
"multica_agent_task_total_seconds": {labelSource, labelRuntimeMode, labelTerminalStatus},
"multica_agent_task_in_progress": {labelSource, labelRuntimeMode},
"multica_agent_task_iteration_count": {labelSource, labelTerminalStatus},
"multica_llm_tokens_total": {labelProvider, labelModel, labelTokenType, labelRuntimeMode, labelSource},
"multica_llm_cost_usd_total": {labelProvider, labelModel, labelTokenType, labelRuntimeMode, labelSource},
"multica_llm_unpriced_tokens_total": {labelProvider, labelModelAlias, labelTokenType},
"multica_llm_request_total": {labelProvider, labelModel, labelRuntimeMode},
"multica_task_queued_expired_total": {labelSource, labelRuntimeMode},
"multica_task_lease_expired_total": {labelSource},
}
var forbiddenMetricLabels = map[string]struct{}{
"workspace_id": {},
"user_id": {},
"agent_id": {},
"task_id": {},
"issue_id": {},
"runtime_id": {},
"session_id": {},
"ip": {},
}
var (
knownSources = map[string]string{
"issue": "issue",
"chat": "chat",
"autopilot": "autopilot",
"autopilot_issue": "autopilot_issue",
"quick_create": "quick_create",
"manual": "manual",
"api": "api",
"other": "other",
}
knownRuntimeModes = map[string]string{
"local": "local",
"cloud": "cloud",
"unknown": "unknown",
}
knownRuntimeProviders = map[string]string{
"antigravity": "antigravity",
"claude": "claude",
"codex": "codex",
"copilot": "copilot",
"cursor": "cursor",
"gemini": "gemini",
"hermes": "hermes",
"kiro": "kiro",
"kimi": "kimi",
"multica_agent": "multica_agent",
"openclaw": "openclaw",
"opencode": "opencode",
"pi": "pi",
"other": "other",
}
knownTerminalStatuses = map[string]string{
"completed": "completed",
"failed": "failed",
"cancelled": "cancelled",
"blocked": "blocked",
"other": "other",
}
knownTokenTypes = map[string]string{
"input": "input",
"output": "output",
"cache_read": "cache_read",
"cache_write": "cache_write",
}
knownFailureReasons = map[string]string{}
modelAliasUnsafeRe = regexp.MustCompile(`[^a-z0-9._:/+-]+`)
)
func init() {
for _, reason := range taskfailure.AllReasons() {
knownFailureReasons[reason.String()] = reason.String()
}
}
func validateBusinessMetricLabels() {
for metric, labels := range businessMetricLabels {
for _, label := range labels {
if _, forbidden := forbiddenMetricLabels[label]; forbidden {
panic("forbidden high-cardinality label " + label + " on " + metric)
}
}
}
}
func metricLabels(metric string) []string {
labels, ok := businessMetricLabels[metric]
if !ok {
panic("missing business metric label definition for " + metric)
}
return labels
}
func NormalizeTaskSource(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
if normalized, ok := knownSources[value]; ok {
return normalized
}
return "other"
}
func NormalizeRuntimeMode(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
if normalized, ok := knownRuntimeModes[value]; ok {
return normalized
}
return "unknown"
}
func NormalizeRuntimeProvider(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
if normalized, ok := knownRuntimeProviders[value]; ok {
return normalized
}
return "other"
}
func NormalizeTerminalStatus(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
if normalized, ok := knownTerminalStatuses[value]; ok {
return normalized
}
return "other"
}
func NormalizeFailureReason(value string) string {
value = strings.TrimSpace(value)
if normalized, ok := knownFailureReasons[value]; ok {
return normalized
}
return taskfailure.Classify(value).String()
}
func NormalizeTokenType(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
if normalized, ok := knownTokenTypes[value]; ok {
return normalized
}
return "input"
}
func NormalizeModelAlias(value string) string {
value = strings.ToLower(strings.TrimSpace(value))
if value == "" {
return "unknown"
}
value = modelAliasUnsafeRe.ReplaceAllString(value, "_")
if len(value) > 128 {
return value[:128]
}
return value
}

View File

@@ -0,0 +1,25 @@
package metrics
import "testing"
func TestBusinessMetricLabelsRejectHighCardinalityNames(t *testing.T) {
for metric, labels := range businessMetricLabels {
for _, label := range labels {
if _, forbidden := forbiddenMetricLabels[label]; forbidden {
t.Fatalf("metric %s uses forbidden label %s", metric, label)
}
}
}
}
func TestNormalizeLabelsCollapseUnknownValues(t *testing.T) {
if got := NormalizeRuntimeProvider("provider-from-user-input"); got != "other" {
t.Fatalf("NormalizeRuntimeProvider unknown = %q, want other", got)
}
if got := NormalizeRuntimeMode("workspace-123"); got != "unknown" {
t.Fatalf("NormalizeRuntimeMode unknown = %q, want unknown", got)
}
if got := NormalizeTaskSource("task-123"); got != "other" {
t.Fatalf("NormalizeTaskSource unknown = %q, want other", got)
}
}

View File

@@ -0,0 +1,80 @@
package metrics
import (
"regexp"
"strings"
)
type ModelPrice struct {
Provider string
Model string
InputPerM float64
CacheReadPerM float64
CacheWritePerM float64
OutputPerM float64
}
var modelPrices = map[string]ModelPrice{
"openai:gpt-5.5": {Provider: "openai", Model: "gpt-5.5", InputPerM: 5.00, CacheReadPerM: 0.50, CacheWritePerM: 0.50, OutputPerM: 30.00},
"openai:gpt-5.4": {Provider: "openai", Model: "gpt-5.4", InputPerM: 2.50, CacheReadPerM: 0.25, CacheWritePerM: 0.25, OutputPerM: 15.00},
"openai:gpt-5.4-mini": {Provider: "openai", Model: "gpt-5.4-mini", InputPerM: 0.75, CacheReadPerM: 0.075, CacheWritePerM: 0.075, OutputPerM: 4.50},
"openai:gpt-5.3-codex": {Provider: "openai", Model: "gpt-5.3-codex", InputPerM: 1.75, CacheReadPerM: 0.175, CacheWritePerM: 0.175, OutputPerM: 14.00},
"openai:gpt-5.2-codex": {Provider: "openai", Model: "gpt-5.2-codex", InputPerM: 1.75, CacheReadPerM: 0.175, CacheWritePerM: 0.175, OutputPerM: 14.00},
"anthropic:claude-opus-4.7": {Provider: "anthropic", Model: "claude-opus-4.7", InputPerM: 5.00, CacheReadPerM: 0.50, CacheWritePerM: 6.25, OutputPerM: 25.00},
"anthropic:claude-opus-4.6": {Provider: "anthropic", Model: "claude-opus-4.6", InputPerM: 5.00, CacheReadPerM: 0.50, CacheWritePerM: 6.25, OutputPerM: 25.00},
"anthropic:claude-opus-4.5": {Provider: "anthropic", Model: "claude-opus-4.5", InputPerM: 5.00, CacheReadPerM: 0.50, CacheWritePerM: 6.25, OutputPerM: 25.00},
"anthropic:claude-sonnet-4.6": {Provider: "anthropic", Model: "claude-sonnet-4.6", InputPerM: 3.00, CacheReadPerM: 0.30, CacheWritePerM: 3.75, OutputPerM: 15.00},
"anthropic:claude-sonnet-4.5": {Provider: "anthropic", Model: "claude-sonnet-4.5", InputPerM: 3.00, CacheReadPerM: 0.30, CacheWritePerM: 3.75, OutputPerM: 15.00},
"anthropic:claude-haiku-4.5": {Provider: "anthropic", Model: "claude-haiku-4.5", InputPerM: 1.00, CacheReadPerM: 0.10, CacheWritePerM: 1.25, OutputPerM: 5.00},
"deepseek:v4-pro": {Provider: "deepseek", Model: "v4-pro", InputPerM: 1.74, CacheReadPerM: 0.0145, CacheWritePerM: 1.74, OutputPerM: 3.48},
"deepseek:v4-flash": {Provider: "deepseek", Model: "v4-flash", InputPerM: 0.56, CacheReadPerM: 0.0112, CacheWritePerM: 0.56, OutputPerM: 1.12},
"minimax:m2.7": {Provider: "minimax", Model: "m2.7", InputPerM: 0.30, CacheReadPerM: 0.06, CacheWritePerM: 0.375, OutputPerM: 1.20},
"minimax:m2.7-highspeed": {Provider: "minimax", Model: "m2.7-highspeed", InputPerM: 0.60, CacheReadPerM: 0.06, CacheWritePerM: 0.375, OutputPerM: 2.40},
"google:gemini-3-flash": {Provider: "google", Model: "gemini-3-flash", InputPerM: 0.50, CacheReadPerM: 0.05, CacheWritePerM: 0.50, OutputPerM: 3.00},
"google:gemini-3.1-pro": {Provider: "google", Model: "gemini-3.1-pro", InputPerM: 2.00, CacheReadPerM: 0.20, CacheWritePerM: 2.00, OutputPerM: 12.00},
"google:gemini-2.5-pro": {Provider: "google", Model: "gemini-2.5-pro", InputPerM: 1.25, CacheReadPerM: 0.31, CacheWritePerM: 1.25, OutputPerM: 10.00},
"google:gemini-2.5-flash": {Provider: "google", Model: "gemini-2.5-flash", InputPerM: 0.30, CacheReadPerM: 0.03, CacheWritePerM: 0.30, OutputPerM: 2.50},
}
var modelAliasRules = []struct {
re *regexp.Regexp
priceKey string
}{
{regexp.MustCompile(`(^|/|:)gpt-5[.-]5$|^gpt-5-5$`), "openai:gpt-5.5"},
{regexp.MustCompile(`(^|/|:)gpt-5[.-]4($|-2026-03-05|-xhigh)`), "openai:gpt-5.4"},
{regexp.MustCompile(`(^|/|:)gpt-5[.-]4-mini($|[^a-z0-9])`), "openai:gpt-5.4-mini"},
{regexp.MustCompile(`(^|/|:)gpt-5[.-]3-codex$`), "openai:gpt-5.3-codex"},
{regexp.MustCompile(`(^|/|:)gpt-5[.-]2-codex$`), "openai:gpt-5.2-codex"},
{regexp.MustCompile(`claude-opus-4[-.]7`), "anthropic:claude-opus-4.7"},
{regexp.MustCompile(`claude-opus-4[-.]6`), "anthropic:claude-opus-4.6"},
{regexp.MustCompile(`claude-opus-4[-.]5`), "anthropic:claude-opus-4.5"},
{regexp.MustCompile(`claude-sonnet-4[-.]6|claude-4[-.]6-sonnet`), "anthropic:claude-sonnet-4.6"},
{regexp.MustCompile(`claude-sonnet-4[-.]5|claude-4[-.]5-sonnet`), "anthropic:claude-sonnet-4.5"},
{regexp.MustCompile(`claude-haiku-4[-.]5`), "anthropic:claude-haiku-4.5"},
{regexp.MustCompile(`deepseek-v4-pro`), "deepseek:v4-pro"},
{regexp.MustCompile(`deepseek-v4-flash|^deepseek-chat$|^deepseek-reasoner$`), "deepseek:v4-flash"},
{regexp.MustCompile(`minimax-m2[.]7.*highspeed|highspeed.*minimax-m2[.]7`), "minimax:m2.7-highspeed"},
{regexp.MustCompile(`minimax-m2[.]7`), "minimax:m2.7"},
{regexp.MustCompile(`gemini-3-flash`), "google:gemini-3-flash"},
{regexp.MustCompile(`gemini-3[.]1-pro`), "google:gemini-3.1-pro"},
{regexp.MustCompile(`gemini-2[.]5-pro`), "google:gemini-2.5-pro"},
{regexp.MustCompile(`gemini-2[.]5-flash`), "google:gemini-2.5-flash"},
}
func PriceForModelAlias(model string) (ModelPrice, bool) {
model = strings.ToLower(strings.TrimSpace(model))
for _, rule := range modelAliasRules {
if rule.re.MatchString(model) {
price, ok := modelPrices[rule.priceKey]
return price, ok
}
}
return ModelPrice{}, false
}
func tokenCostUSD(tokens int64, pricePerM float64) float64 {
if tokens <= 0 || pricePerM <= 0 {
return 0
}
return float64(tokens) * pricePerM / 1_000_000
}

View File

@@ -22,6 +22,7 @@ type RegistryOptions struct {
type Registry struct {
Gatherer prometheus.Gatherer
HTTP *HTTPMetrics
Business *BusinessMetrics
}
func NewRegistry(opts RegistryOptions) *Registry {
@@ -39,6 +40,9 @@ func NewRegistry(opts RegistryOptions) *Registry {
httpMetrics := NewHTTPMetrics()
reg.MustRegister(httpMetrics.Collectors()...)
businessMetrics := NewBusinessMetrics()
reg.MustRegister(businessMetrics.Collectors()...)
if opts.Pool != nil {
reg.MustRegister(NewDBCollector(opts.Pool))
}
@@ -52,6 +56,7 @@ func NewRegistry(opts RegistryOptions) *Registry {
return &Registry{
Gatherer: reg,
HTTP: httpMetrics,
Business: businessMetrics,
}
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/multica-ai/multica/server/internal/analytics"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/mention"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
@@ -30,6 +31,7 @@ type TaskService struct {
Hub *realtime.Hub
Bus *events.Bus
Analytics analytics.Client
Metrics *obsmetrics.BusinessMetrics
Wakeup TaskWakeupNotifier
// EmptyClaim caches "this runtime has no queued task" so the daemon
// poll path can skip a Postgres scan on the steady-state empty case.
@@ -134,10 +136,18 @@ func isTrivialDoneOutput(output string) bool {
}
func (s *TaskService) captureTaskQueued(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskEnqueued(source, runtimeMode)
}
s.captureTaskEvent(ctx, analytics.AgentTaskQueued(s.taskAnalyticsContext(ctx, task)))
}
func (s *TaskService) captureTaskDispatched(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskDispatched(util.UUIDToString(task.ID), source, runtimeMode, taskQueueWaitSeconds(task))
}
s.captureTaskEvent(ctx, analytics.AgentTaskDispatched(s.taskAnalyticsContext(ctx, task)))
}
@@ -146,10 +156,18 @@ func (s *TaskService) AnalyticsContextForTask(ctx context.Context, task db.Agent
}
func (s *TaskService) captureTaskStarted(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, provider := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskStarted(source, runtimeMode, provider)
}
s.captureTaskEvent(ctx, analytics.AgentTaskStarted(s.taskAnalyticsContext(ctx, task)))
}
func (s *TaskService) captureTaskCompleted(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
}
s.captureTaskEvent(ctx, analytics.AgentTaskCompleted(
s.taskAnalyticsContext(ctx, task),
taskDurationMS(task),
@@ -158,6 +176,11 @@ func (s *TaskService) captureTaskCompleted(ctx context.Context, task db.AgentTas
func (s *TaskService) captureTaskFailed(ctx context.Context, task db.AgentTaskQueue) {
failureReason := taskFailureReason(task)
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
s.Metrics.RecordTaskFailed(source, runtimeMode, failureReason)
}
s.captureTaskEvent(ctx, analytics.AgentTaskFailed(
s.taskAnalyticsContext(ctx, task),
taskDurationMS(task),
@@ -168,6 +191,10 @@ func (s *TaskService) captureTaskFailed(ctx context.Context, task db.AgentTaskQu
}
func (s *TaskService) captureTaskCancelled(ctx context.Context, task db.AgentTaskQueue) {
if s.Metrics != nil {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
}
s.captureTaskEvent(ctx, analytics.AgentTaskCancelled(
s.taskAnalyticsContext(ctx, task),
taskDurationMS(task),
@@ -184,6 +211,34 @@ func (s *TaskService) captureTaskCancelled(ctx context.Context, task db.AgentTas
}
}
func (s *TaskService) CaptureTaskUsage(ctx context.Context, task db.AgentTaskQueue, provider, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int64) {
if s.Metrics == nil {
return
}
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordLLMUsage(source, runtimeMode, provider, model, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens)
}
func (s *TaskService) CaptureQueuedExpiredTasks(ctx context.Context, tasks []db.AgentTaskQueue) {
if s.Metrics == nil {
return
}
for _, task := range tasks {
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskQueuedExpired(source, runtimeMode)
}
}
func (s *TaskService) CaptureLeaseExpiredTasks(ctx context.Context, tasks []db.AgentTaskQueue) {
if s.Metrics == nil {
return
}
for _, task := range tasks {
source, _, _ := s.taskMetricsContext(ctx, task)
s.Metrics.RecordTaskLeaseExpired(source)
}
}
func (s *TaskService) captureTaskEvent(ctx context.Context, event analytics.Event) {
if s.Analytics == nil {
return
@@ -246,6 +301,30 @@ func taskAnalyticsContextKey(task db.AgentTaskQueue) string {
}, "|")
}
func (s *TaskService) taskMetricsContext(ctx context.Context, task db.AgentTaskQueue) (source, runtimeMode, provider string) {
tc := s.taskAnalyticsContext(ctx, task)
source = "other"
switch {
case task.ChatSessionID.Valid:
source = "chat"
case task.IssueID.Valid:
if tc.Source == analytics.SourceAutopilot {
source = "autopilot_issue"
} else {
source = "issue"
}
case task.AutopilotRunID.Valid:
source = "autopilot"
default:
if _, ok := s.parseQuickCreateContext(task); ok {
source = "quick_create"
} else if tc.Source != "" {
source = tc.Source
}
}
return source, tc.RuntimeMode, tc.Provider
}
func (s *TaskService) taskAnalyticsContext(ctx context.Context, task db.AgentTaskQueue) analytics.TaskContext {
if tc, ok := s.cachedTaskAnalyticsContext(task); ok {
return tc
@@ -351,6 +430,29 @@ func taskDurationMS(task db.AgentTaskQueue) int64 {
return ms
}
func taskQueueWaitSeconds(task db.AgentTaskQueue) float64 {
return durationSeconds(task.CreatedAt, task.DispatchedAt)
}
func taskRunSeconds(task db.AgentTaskQueue) float64 {
return durationSeconds(task.StartedAt, task.CompletedAt)
}
func taskTotalSeconds(task db.AgentTaskQueue) float64 {
return durationSeconds(task.CreatedAt, task.CompletedAt)
}
func durationSeconds(start, end pgtype.Timestamptz) float64 {
if !start.Valid || !end.Valid {
return -1
}
seconds := end.Time.Sub(start.Time).Seconds()
if seconds < 0 {
return 0
}
return seconds
}
func taskFailureReason(task db.AgentTaskQueue) string {
if task.FailureReason.Valid && task.FailureReason.String != "" {
return task.FailureReason.String