mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-27 01:19:26 +02:00
Compare commits
1 Commits
codex/agen
...
agent/agen
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bc2a01392f |
@@ -259,6 +259,7 @@ func main() {
|
||||
metricsConfig := obsmetrics.ConfigFromEnv()
|
||||
var metricsServer *http.Server
|
||||
var httpMetrics *obsmetrics.HTTPMetrics
|
||||
var businessMetrics *obsmetrics.BusinessMetrics
|
||||
if metricsConfig.Enabled() {
|
||||
metricsRegistry := obsmetrics.NewRegistry(obsmetrics.RegistryOptions{
|
||||
Pool: pool,
|
||||
@@ -268,6 +269,7 @@ func main() {
|
||||
Commit: commit,
|
||||
})
|
||||
httpMetrics = metricsRegistry.HTTP
|
||||
businessMetrics = metricsRegistry.Business
|
||||
metricsServer = obsmetrics.NewServer(metricsConfig.Addr, metricsRegistry.Gatherer)
|
||||
if !obsmetrics.IsLoopbackAddr(metricsConfig.Addr) {
|
||||
slog.Warn(
|
||||
@@ -285,6 +287,7 @@ func main() {
|
||||
|
||||
r := NewRouterWithOptions(pool, hub, bus, analyticsClient, storeRedis, RouterOptions{
|
||||
HTTPMetrics: httpMetrics,
|
||||
BusinessMetrics: businessMetrics,
|
||||
DaemonHub: daemonHub,
|
||||
DaemonWakeup: daemonWakeup,
|
||||
HeartbeatScheduler: heartbeatScheduler,
|
||||
@@ -300,6 +303,7 @@ func main() {
|
||||
autopilotCtx, autopilotCancel := context.WithCancel(context.Background())
|
||||
taskSvc := service.NewTaskService(queries, pool, hub, bus, daemonWakeup)
|
||||
taskSvc.Analytics = analyticsClient
|
||||
taskSvc.Metrics = businessMetrics
|
||||
autopilotSvc := service.NewAutopilotService(queries, pool, bus, taskSvc)
|
||||
registerAutopilotListeners(bus, autopilotSvc)
|
||||
|
||||
|
||||
@@ -98,9 +98,10 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus, analytics
|
||||
}
|
||||
|
||||
type RouterOptions struct {
|
||||
HTTPMetrics *obsmetrics.HTTPMetrics
|
||||
DaemonHub *daemonws.Hub
|
||||
DaemonWakeup service.TaskWakeupNotifier
|
||||
HTTPMetrics *obsmetrics.HTTPMetrics
|
||||
BusinessMetrics *obsmetrics.BusinessMetrics
|
||||
DaemonHub *daemonws.Hub
|
||||
DaemonWakeup service.TaskWakeupNotifier
|
||||
// HeartbeatScheduler, when non-nil, replaces the default synchronous
|
||||
// passthrough scheduler on the constructed Handler. main.go injects a
|
||||
// BatchedHeartbeatScheduler here so the caller can also drive Run/Stop;
|
||||
@@ -141,6 +142,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
CloudRuntimeFleetTimeout: envDuration("MULTICA_CLOUD_FLEET_TIMEOUT", 35*time.Second),
|
||||
}
|
||||
h := handler.New(queries, pool, hub, bus, emailSvc, store, cfSigner, analyticsClient, signupConfig, daemonHub)
|
||||
h.TaskService.Metrics = opts.BusinessMetrics
|
||||
if opts.DaemonWakeup != nil {
|
||||
h.TaskService.Wakeup = opts.DaemonWakeup
|
||||
}
|
||||
|
||||
@@ -253,6 +253,7 @@ func sweepStaleTasks(ctx context.Context, queries *db.Queries, taskSvc *service.
|
||||
}
|
||||
|
||||
slog.Info("task sweeper: failed stale tasks", "count", len(failedTasks))
|
||||
taskSvc.CaptureLeaseExpiredTasks(ctx, failedTasks)
|
||||
taskSvc.HandleFailedTasks(ctx, failedTasks)
|
||||
}
|
||||
|
||||
@@ -276,6 +277,7 @@ func sweepExpiredQueuedTasks(ctx context.Context, queries *db.Queries, taskSvc *
|
||||
}
|
||||
|
||||
slog.Info("task sweeper: expired stale queued tasks", "count", len(failedTasks))
|
||||
taskSvc.CaptureQueuedExpiredTasks(ctx, failedTasks)
|
||||
taskSvc.HandleFailedTasks(ctx, failedTasks)
|
||||
}
|
||||
|
||||
|
||||
@@ -50,6 +50,7 @@ require (
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/kylelemons/godebug v1.1.0 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.66.1 // indirect
|
||||
|
||||
@@ -1860,7 +1860,8 @@ func (h *Handler) ReportTaskUsage(w http.ResponseWriter, r *http.Request) {
|
||||
taskID := chi.URLParam(r, "taskId")
|
||||
|
||||
// Verify the caller owns this task's workspace.
|
||||
if _, ok := h.requireDaemonTaskAccess(w, r, taskID); !ok {
|
||||
task, ok := h.requireDaemonTaskAccess(w, r, taskID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1883,7 +1884,9 @@ func (h *Handler) ReportTaskUsage(w http.ResponseWriter, r *http.Request) {
|
||||
CacheWriteTokens: u.CacheWriteTokens,
|
||||
}); err != nil {
|
||||
slog.Warn("upsert task usage failed", "task_id", taskID, "model", u.Model, "error", err)
|
||||
continue
|
||||
}
|
||||
h.TaskService.CaptureTaskUsage(r.Context(), task, u.Provider, u.Model, u.InputTokens, u.OutputTokens, u.CacheReadTokens, u.CacheWriteTokens)
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
|
||||
328
server/internal/metrics/business.go
Normal file
328
server/internal/metrics/business.go
Normal file
@@ -0,0 +1,328 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/taskfailure"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
var taskDurationBuckets = []float64{1, 2.5, 5, 10, 30, 60, 120, 300, 600, 1200, 3600, 7200}
|
||||
|
||||
type activeTaskLabels struct {
|
||||
source string
|
||||
runtimeMode string
|
||||
}
|
||||
|
||||
type BusinessMetrics struct {
|
||||
taskEnqueued *prometheus.CounterVec
|
||||
taskDispatched *prometheus.CounterVec
|
||||
taskStarted *prometheus.CounterVec
|
||||
taskTerminal *prometheus.CounterVec
|
||||
taskFailed *prometheus.CounterVec
|
||||
taskQueueWait *prometheus.HistogramVec
|
||||
taskRunSeconds *prometheus.HistogramVec
|
||||
taskTotalSeconds *prometheus.HistogramVec
|
||||
taskInProgress *prometheus.GaugeVec
|
||||
taskIterations *prometheus.HistogramVec
|
||||
|
||||
llmTokens *prometheus.CounterVec
|
||||
llmCostUSD *prometheus.CounterVec
|
||||
llmUnpricedTokens *prometheus.CounterVec
|
||||
llmRequests *prometheus.CounterVec
|
||||
|
||||
taskQueuedExpired *prometheus.CounterVec
|
||||
taskLeaseExpired *prometheus.CounterVec
|
||||
|
||||
activeMu sync.Mutex
|
||||
activeTasks map[string]activeTaskLabels
|
||||
}
|
||||
|
||||
func NewBusinessMetrics() *BusinessMetrics {
|
||||
validateBusinessMetricLabels()
|
||||
m := &BusinessMetrics{
|
||||
taskEnqueued: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "enqueued_total",
|
||||
Help: "Total agent tasks enqueued.",
|
||||
}, metricLabels("multica_agent_task_enqueued_total")),
|
||||
taskDispatched: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "dispatched_total",
|
||||
Help: "Total agent tasks dispatched to a runtime.",
|
||||
}, metricLabels("multica_agent_task_dispatched_total")),
|
||||
taskStarted: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "started_total",
|
||||
Help: "Total agent tasks that reached running state.",
|
||||
}, metricLabels("multica_agent_task_started_total")),
|
||||
taskTerminal: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "terminal_total",
|
||||
Help: "Total agent tasks that reached a terminal state.",
|
||||
}, metricLabels("multica_agent_task_terminal_total")),
|
||||
taskFailed: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "failed_total",
|
||||
Help: "Total failed agent tasks by canonical failure reason.",
|
||||
}, metricLabels("multica_agent_task_failed_total")),
|
||||
taskQueueWait: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "queue_wait_seconds",
|
||||
Help: "Time agent tasks spent queued before dispatch.",
|
||||
Buckets: taskDurationBuckets,
|
||||
}, metricLabels("multica_agent_task_queue_wait_seconds")),
|
||||
taskRunSeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "run_seconds",
|
||||
Help: "Time agent tasks spent running before a terminal state.",
|
||||
Buckets: taskDurationBuckets,
|
||||
}, metricLabels("multica_agent_task_run_seconds")),
|
||||
taskTotalSeconds: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "total_seconds",
|
||||
Help: "Total time from agent task creation to terminal state.",
|
||||
Buckets: taskDurationBuckets,
|
||||
}, metricLabels("multica_agent_task_total_seconds")),
|
||||
taskInProgress: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "in_progress",
|
||||
Help: "Current agent tasks dispatched by this process and not yet terminal.",
|
||||
}, metricLabels("multica_agent_task_in_progress")),
|
||||
taskIterations: prometheus.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "agent_task",
|
||||
Name: "iteration_count",
|
||||
Help: "Retry attempt count observed when an agent task reaches a terminal state.",
|
||||
Buckets: []float64{1, 2, 3, 4, 5, 10},
|
||||
}, metricLabels("multica_agent_task_iteration_count")),
|
||||
llmTokens: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "llm",
|
||||
Name: "tokens_total",
|
||||
Help: "Total priced LLM tokens by provider, model, token type, runtime mode, and task source.",
|
||||
}, metricLabels("multica_llm_tokens_total")),
|
||||
llmCostUSD: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "llm",
|
||||
Name: "cost_usd_total",
|
||||
Help: "Total estimated priced LLM token cost in USD.",
|
||||
}, metricLabels("multica_llm_cost_usd_total")),
|
||||
llmUnpricedTokens: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "llm",
|
||||
Name: "unpriced_tokens_total",
|
||||
Help: "Total LLM tokens for model aliases without a fixed TSR price.",
|
||||
}, metricLabels("multica_llm_unpriced_tokens_total")),
|
||||
llmRequests: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "llm",
|
||||
Name: "request_total",
|
||||
Help: "Total task usage reports by normalized LLM provider and model.",
|
||||
}, metricLabels("multica_llm_request_total")),
|
||||
taskQueuedExpired: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "task",
|
||||
Name: "queued_expired_total",
|
||||
Help: "Total queued tasks expired by the scheduler.",
|
||||
}, metricLabels("multica_task_queued_expired_total")),
|
||||
taskLeaseExpired: prometheus.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: "multica",
|
||||
Subsystem: "task",
|
||||
Name: "lease_expired_total",
|
||||
Help: "Total dispatched or running task leases expired by the scheduler.",
|
||||
}, metricLabels("multica_task_lease_expired_total")),
|
||||
activeTasks: map[string]activeTaskLabels{},
|
||||
}
|
||||
m.prewarmFailureReasons()
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) Collectors() []prometheus.Collector {
|
||||
return []prometheus.Collector{
|
||||
m.taskEnqueued,
|
||||
m.taskDispatched,
|
||||
m.taskStarted,
|
||||
m.taskTerminal,
|
||||
m.taskFailed,
|
||||
m.taskQueueWait,
|
||||
m.taskRunSeconds,
|
||||
m.taskTotalSeconds,
|
||||
m.taskInProgress,
|
||||
m.taskIterations,
|
||||
m.llmTokens,
|
||||
m.llmCostUSD,
|
||||
m.llmUnpricedTokens,
|
||||
m.llmRequests,
|
||||
m.taskQueuedExpired,
|
||||
m.taskLeaseExpired,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) RecordTaskEnqueued(source, runtimeMode string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.taskEnqueued.WithLabelValues(NormalizeTaskSource(source), NormalizeRuntimeMode(runtimeMode)).Inc()
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) RecordTaskDispatched(taskID, source, runtimeMode string, queueWaitSeconds float64) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
source = NormalizeTaskSource(source)
|
||||
runtimeMode = NormalizeRuntimeMode(runtimeMode)
|
||||
m.taskDispatched.WithLabelValues(source, runtimeMode).Inc()
|
||||
if queueWaitSeconds >= 0 {
|
||||
m.taskQueueWait.WithLabelValues(source, runtimeMode).Observe(queueWaitSeconds)
|
||||
}
|
||||
m.markTaskInProgress(taskID, source, runtimeMode)
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) RecordTaskStarted(source, runtimeMode, provider string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.taskStarted.WithLabelValues(
|
||||
NormalizeTaskSource(source),
|
||||
NormalizeRuntimeMode(runtimeMode),
|
||||
NormalizeRuntimeProvider(provider),
|
||||
).Inc()
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) RecordTaskTerminal(taskID, source, runtimeMode, terminalStatus string, runSeconds, totalSeconds float64, attempt int32) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
source = NormalizeTaskSource(source)
|
||||
runtimeMode = NormalizeRuntimeMode(runtimeMode)
|
||||
terminalStatus = NormalizeTerminalStatus(terminalStatus)
|
||||
m.taskTerminal.WithLabelValues(source, runtimeMode, terminalStatus).Inc()
|
||||
if runSeconds >= 0 {
|
||||
m.taskRunSeconds.WithLabelValues(source, runtimeMode, terminalStatus).Observe(runSeconds)
|
||||
}
|
||||
if totalSeconds >= 0 {
|
||||
m.taskTotalSeconds.WithLabelValues(source, runtimeMode, terminalStatus).Observe(totalSeconds)
|
||||
}
|
||||
if attempt < 1 {
|
||||
attempt = 1
|
||||
}
|
||||
m.taskIterations.WithLabelValues(source, terminalStatus).Observe(float64(attempt))
|
||||
m.clearTaskInProgress(taskID)
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) RecordTaskFailed(source, runtimeMode, failureReason string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.taskFailed.WithLabelValues(
|
||||
NormalizeTaskSource(source),
|
||||
NormalizeRuntimeMode(runtimeMode),
|
||||
NormalizeFailureReason(failureReason),
|
||||
).Inc()
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) RecordTaskQueuedExpired(source, runtimeMode string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.taskQueuedExpired.WithLabelValues(NormalizeTaskSource(source), NormalizeRuntimeMode(runtimeMode)).Inc()
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) RecordTaskLeaseExpired(source string) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
m.taskLeaseExpired.WithLabelValues(NormalizeTaskSource(source)).Inc()
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) RecordLLMUsage(source, runtimeMode, rawProvider, modelAlias string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int64) {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
source = NormalizeTaskSource(source)
|
||||
runtimeMode = NormalizeRuntimeMode(runtimeMode)
|
||||
price, priced := PriceForModelAlias(modelAlias)
|
||||
if !priced {
|
||||
provider := NormalizeRuntimeProvider(rawProvider)
|
||||
alias := NormalizeModelAlias(modelAlias)
|
||||
m.recordUnpricedTokens(provider, alias, "input", inputTokens)
|
||||
m.recordUnpricedTokens(provider, alias, "output", outputTokens)
|
||||
m.recordUnpricedTokens(provider, alias, "cache_read", cacheReadTokens)
|
||||
m.recordUnpricedTokens(provider, alias, "cache_write", cacheWriteTokens)
|
||||
m.llmRequests.WithLabelValues(provider, "unknown", runtimeMode).Inc()
|
||||
return
|
||||
}
|
||||
|
||||
m.recordPricedTokens(price.Provider, price.Model, "input", runtimeMode, source, inputTokens, tokenCostUSD(inputTokens, price.InputPerM))
|
||||
m.recordPricedTokens(price.Provider, price.Model, "output", runtimeMode, source, outputTokens, tokenCostUSD(outputTokens, price.OutputPerM))
|
||||
m.recordPricedTokens(price.Provider, price.Model, "cache_read", runtimeMode, source, cacheReadTokens, tokenCostUSD(cacheReadTokens, price.CacheReadPerM))
|
||||
m.recordPricedTokens(price.Provider, price.Model, "cache_write", runtimeMode, source, cacheWriteTokens, tokenCostUSD(cacheWriteTokens, price.CacheWritePerM))
|
||||
m.llmRequests.WithLabelValues(price.Provider, price.Model, runtimeMode).Inc()
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) recordPricedTokens(provider, model, tokenType, runtimeMode, source string, tokens int64, cost float64) {
|
||||
if tokens <= 0 {
|
||||
return
|
||||
}
|
||||
tokenType = NormalizeTokenType(tokenType)
|
||||
m.llmTokens.WithLabelValues(provider, model, tokenType, runtimeMode, source).Add(float64(tokens))
|
||||
if cost > 0 {
|
||||
m.llmCostUSD.WithLabelValues(provider, model, tokenType, runtimeMode, source).Add(cost)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) recordUnpricedTokens(provider, modelAlias, tokenType string, tokens int64) {
|
||||
if tokens <= 0 {
|
||||
return
|
||||
}
|
||||
m.llmUnpricedTokens.WithLabelValues(provider, modelAlias, NormalizeTokenType(tokenType)).Add(float64(tokens))
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) markTaskInProgress(taskID, source, runtimeMode string) {
|
||||
if taskID == "" {
|
||||
m.taskInProgress.WithLabelValues(source, runtimeMode).Inc()
|
||||
return
|
||||
}
|
||||
m.activeMu.Lock()
|
||||
defer m.activeMu.Unlock()
|
||||
if _, ok := m.activeTasks[taskID]; ok {
|
||||
return
|
||||
}
|
||||
m.activeTasks[taskID] = activeTaskLabels{source: source, runtimeMode: runtimeMode}
|
||||
m.taskInProgress.WithLabelValues(source, runtimeMode).Inc()
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) clearTaskInProgress(taskID string) {
|
||||
if taskID == "" {
|
||||
return
|
||||
}
|
||||
m.activeMu.Lock()
|
||||
labels, ok := m.activeTasks[taskID]
|
||||
if ok {
|
||||
delete(m.activeTasks, taskID)
|
||||
}
|
||||
m.activeMu.Unlock()
|
||||
if ok {
|
||||
m.taskInProgress.WithLabelValues(labels.source, labels.runtimeMode).Dec()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *BusinessMetrics) prewarmFailureReasons() {
|
||||
for _, source := range []string{"issue", "chat", "autopilot", "autopilot_issue", "quick_create", "other"} {
|
||||
for _, runtimeMode := range []string{"local", "cloud", "unknown"} {
|
||||
for _, reason := range taskfailure.AllReasons() {
|
||||
m.taskFailed.WithLabelValues(source, runtimeMode, reason.String()).Add(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
125
server/internal/metrics/business_test.go
Normal file
125
server/internal/metrics/business_test.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/taskfailure"
|
||||
)
|
||||
|
||||
func TestBusinessMetricsLifecycleCountersAndGauge(t *testing.T) {
|
||||
m := NewBusinessMetrics()
|
||||
|
||||
m.RecordTaskEnqueued("issue", "local")
|
||||
for i := 0; i < 100; i++ {
|
||||
m.RecordTaskDispatched("task-"+strconv.Itoa(i), "issue", "local", 2.5)
|
||||
}
|
||||
m.RecordTaskStarted("issue", "local", "codex")
|
||||
m.RecordTaskTerminal("task-0", "issue", "local", "completed", 10, 20, 1)
|
||||
|
||||
if got := testutil.ToFloat64(m.taskEnqueued.WithLabelValues("issue", "local")); got != 1 {
|
||||
t.Fatalf("enqueued counter = %v, want 1", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.taskDispatched.WithLabelValues("issue", "local")); got != 100 {
|
||||
t.Fatalf("dispatched counter = %v, want 100", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.taskStarted.WithLabelValues("issue", "local", "codex")); got != 1 {
|
||||
t.Fatalf("started counter = %v, want 1", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.taskTerminal.WithLabelValues("issue", "local", "completed")); got != 1 {
|
||||
t.Fatalf("terminal counter = %v, want 1", got)
|
||||
}
|
||||
if got := testutil.CollectAndCount(m.taskInProgress); got != 1 {
|
||||
t.Fatalf("in_progress series count = %d, want 1 despite 100 task ids", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.taskInProgress.WithLabelValues("issue", "local")); got != 99 {
|
||||
t.Fatalf("in_progress gauge = %v, want 99", got)
|
||||
}
|
||||
if got := testutil.CollectAndCount(m.taskQueueWait); got != 1 {
|
||||
t.Fatalf("queue wait series count = %d, want 1", got)
|
||||
}
|
||||
if got := testutil.CollectAndCount(m.taskRunSeconds); got != 1 {
|
||||
t.Fatalf("run seconds series count = %d, want 1", got)
|
||||
}
|
||||
if got := testutil.CollectAndCount(m.taskTotalSeconds); got != 1 {
|
||||
t.Fatalf("total seconds series count = %d, want 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBusinessMetricsFailureReasonUsesCanonicalClassifier(t *testing.T) {
|
||||
m := NewBusinessMetrics()
|
||||
|
||||
rawError := `API Error: 429 {"error":"overloaded"}`
|
||||
m.RecordTaskFailed("issue", "local", rawError)
|
||||
|
||||
wantReason := taskfailure.ReasonAgentProviderCapacityOrRateLimit.String()
|
||||
if got := testutil.ToFloat64(m.taskFailed.WithLabelValues("issue", "local", wantReason)); got != 1 {
|
||||
t.Fatalf("classified failure counter = %v, want 1", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.taskFailed.WithLabelValues("issue", "local", taskfailure.ReasonAgentUnknown.String())); got != 0 {
|
||||
t.Fatalf("unknown failure counter = %v, want 0", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBusinessMetricsLLMPricingAndUnpricedTokens(t *testing.T) {
|
||||
m := NewBusinessMetrics()
|
||||
|
||||
m.RecordLLMUsage("chat", "cloud", "codex", "gpt-5.4", 1_000_000, 2_000_000, 3_000_000, 4_000_000)
|
||||
|
||||
if got := testutil.ToFloat64(m.llmTokens.WithLabelValues("openai", "gpt-5.4", "input", "cloud", "chat")); got != 1_000_000 {
|
||||
t.Fatalf("priced input tokens = %v, want 1000000", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.llmTokens.WithLabelValues("openai", "gpt-5.4", "output", "cloud", "chat")); got != 2_000_000 {
|
||||
t.Fatalf("priced output tokens = %v, want 2000000", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.llmCostUSD.WithLabelValues("openai", "gpt-5.4", "input", "cloud", "chat")); got != 2.5 {
|
||||
t.Fatalf("priced input cost = %v, want 2.5", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.llmCostUSD.WithLabelValues("openai", "gpt-5.4", "output", "cloud", "chat")); got != 30 {
|
||||
t.Fatalf("priced output cost = %v, want 30", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.llmRequests.WithLabelValues("openai", "gpt-5.4", "cloud")); got != 1 {
|
||||
t.Fatalf("priced request counter = %v, want 1", got)
|
||||
}
|
||||
|
||||
m.RecordLLMUsage("issue", "local", "custom-provider", "Free Model!!", 7, 0, 0, 0)
|
||||
if got := testutil.ToFloat64(m.llmUnpricedTokens.WithLabelValues("other", "free_model_", "input")); got != 7 {
|
||||
t.Fatalf("unpriced input tokens = %v, want 7", got)
|
||||
}
|
||||
if got := testutil.ToFloat64(m.llmRequests.WithLabelValues("other", "unknown", "local")); got != 1 {
|
||||
t.Fatalf("unpriced request counter = %v, want 1", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBusinessMetricsRegistryExposesAllFamilies(t *testing.T) {
|
||||
registry := prometheus.NewRegistry()
|
||||
m := NewBusinessMetrics()
|
||||
registry.MustRegister(m.Collectors()...)
|
||||
|
||||
m.RecordTaskEnqueued("issue", "local")
|
||||
m.RecordTaskDispatched("task-1", "issue", "local", 1)
|
||||
m.RecordTaskStarted("issue", "local", "codex")
|
||||
m.RecordTaskTerminal("task-1", "issue", "local", "completed", 2, 3, 1)
|
||||
m.RecordTaskFailed("issue", "local", taskfailure.ReasonTimeout.String())
|
||||
m.RecordTaskQueuedExpired("issue", "local")
|
||||
m.RecordTaskLeaseExpired("issue")
|
||||
m.RecordLLMUsage("issue", "local", "codex", "gpt-5.4", 1, 1, 1, 1)
|
||||
m.RecordLLMUsage("issue", "local", "custom-provider", "custom-model", 1, 0, 0, 0)
|
||||
|
||||
families, err := registry.Gather()
|
||||
if err != nil {
|
||||
t.Fatalf("gather: %v", err)
|
||||
}
|
||||
seen := make(map[string]bool, len(families))
|
||||
for _, family := range families {
|
||||
seen[family.GetName()] = true
|
||||
}
|
||||
for metric := range businessMetricLabels {
|
||||
if !seen[metric] {
|
||||
t.Fatalf("registry did not expose metric family %s", metric)
|
||||
}
|
||||
}
|
||||
}
|
||||
182
server/internal/metrics/labels.go
Normal file
182
server/internal/metrics/labels.go
Normal file
@@ -0,0 +1,182 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/taskfailure"
|
||||
)
|
||||
|
||||
const (
|
||||
labelSource = "source"
|
||||
labelRuntimeMode = "runtime_mode"
|
||||
labelProvider = "provider"
|
||||
labelTerminalStatus = "terminal_status"
|
||||
labelFailureReason = "failure_reason"
|
||||
labelTokenType = "token_type"
|
||||
labelModel = "model"
|
||||
labelModelAlias = "model_alias"
|
||||
)
|
||||
|
||||
var businessMetricLabels = map[string][]string{
|
||||
"multica_agent_task_enqueued_total": {labelSource, labelRuntimeMode},
|
||||
"multica_agent_task_dispatched_total": {labelSource, labelRuntimeMode},
|
||||
"multica_agent_task_started_total": {labelSource, labelRuntimeMode, labelProvider},
|
||||
"multica_agent_task_terminal_total": {labelSource, labelRuntimeMode, labelTerminalStatus},
|
||||
"multica_agent_task_failed_total": {labelSource, labelRuntimeMode, labelFailureReason},
|
||||
"multica_agent_task_queue_wait_seconds": {labelSource, labelRuntimeMode},
|
||||
"multica_agent_task_run_seconds": {labelSource, labelRuntimeMode, labelTerminalStatus},
|
||||
"multica_agent_task_total_seconds": {labelSource, labelRuntimeMode, labelTerminalStatus},
|
||||
"multica_agent_task_in_progress": {labelSource, labelRuntimeMode},
|
||||
"multica_agent_task_iteration_count": {labelSource, labelTerminalStatus},
|
||||
"multica_llm_tokens_total": {labelProvider, labelModel, labelTokenType, labelRuntimeMode, labelSource},
|
||||
"multica_llm_cost_usd_total": {labelProvider, labelModel, labelTokenType, labelRuntimeMode, labelSource},
|
||||
"multica_llm_unpriced_tokens_total": {labelProvider, labelModelAlias, labelTokenType},
|
||||
"multica_llm_request_total": {labelProvider, labelModel, labelRuntimeMode},
|
||||
"multica_task_queued_expired_total": {labelSource, labelRuntimeMode},
|
||||
"multica_task_lease_expired_total": {labelSource},
|
||||
}
|
||||
|
||||
var forbiddenMetricLabels = map[string]struct{}{
|
||||
"workspace_id": {},
|
||||
"user_id": {},
|
||||
"agent_id": {},
|
||||
"task_id": {},
|
||||
"issue_id": {},
|
||||
"runtime_id": {},
|
||||
"session_id": {},
|
||||
"ip": {},
|
||||
}
|
||||
|
||||
var (
|
||||
knownSources = map[string]string{
|
||||
"issue": "issue",
|
||||
"chat": "chat",
|
||||
"autopilot": "autopilot",
|
||||
"autopilot_issue": "autopilot_issue",
|
||||
"quick_create": "quick_create",
|
||||
"manual": "manual",
|
||||
"api": "api",
|
||||
"other": "other",
|
||||
}
|
||||
knownRuntimeModes = map[string]string{
|
||||
"local": "local",
|
||||
"cloud": "cloud",
|
||||
"unknown": "unknown",
|
||||
}
|
||||
knownRuntimeProviders = map[string]string{
|
||||
"antigravity": "antigravity",
|
||||
"claude": "claude",
|
||||
"codex": "codex",
|
||||
"copilot": "copilot",
|
||||
"cursor": "cursor",
|
||||
"gemini": "gemini",
|
||||
"hermes": "hermes",
|
||||
"kiro": "kiro",
|
||||
"kimi": "kimi",
|
||||
"multica_agent": "multica_agent",
|
||||
"openclaw": "openclaw",
|
||||
"opencode": "opencode",
|
||||
"pi": "pi",
|
||||
"other": "other",
|
||||
}
|
||||
knownTerminalStatuses = map[string]string{
|
||||
"completed": "completed",
|
||||
"failed": "failed",
|
||||
"cancelled": "cancelled",
|
||||
"blocked": "blocked",
|
||||
"other": "other",
|
||||
}
|
||||
knownTokenTypes = map[string]string{
|
||||
"input": "input",
|
||||
"output": "output",
|
||||
"cache_read": "cache_read",
|
||||
"cache_write": "cache_write",
|
||||
}
|
||||
knownFailureReasons = map[string]string{}
|
||||
modelAliasUnsafeRe = regexp.MustCompile(`[^a-z0-9._:/+-]+`)
|
||||
)
|
||||
|
||||
func init() {
|
||||
for _, reason := range taskfailure.AllReasons() {
|
||||
knownFailureReasons[reason.String()] = reason.String()
|
||||
}
|
||||
}
|
||||
|
||||
func validateBusinessMetricLabels() {
|
||||
for metric, labels := range businessMetricLabels {
|
||||
for _, label := range labels {
|
||||
if _, forbidden := forbiddenMetricLabels[label]; forbidden {
|
||||
panic("forbidden high-cardinality label " + label + " on " + metric)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func metricLabels(metric string) []string {
|
||||
labels, ok := businessMetricLabels[metric]
|
||||
if !ok {
|
||||
panic("missing business metric label definition for " + metric)
|
||||
}
|
||||
return labels
|
||||
}
|
||||
|
||||
func NormalizeTaskSource(value string) string {
|
||||
value = strings.ToLower(strings.TrimSpace(value))
|
||||
if normalized, ok := knownSources[value]; ok {
|
||||
return normalized
|
||||
}
|
||||
return "other"
|
||||
}
|
||||
|
||||
func NormalizeRuntimeMode(value string) string {
|
||||
value = strings.ToLower(strings.TrimSpace(value))
|
||||
if normalized, ok := knownRuntimeModes[value]; ok {
|
||||
return normalized
|
||||
}
|
||||
return "unknown"
|
||||
}
|
||||
|
||||
func NormalizeRuntimeProvider(value string) string {
|
||||
value = strings.ToLower(strings.TrimSpace(value))
|
||||
if normalized, ok := knownRuntimeProviders[value]; ok {
|
||||
return normalized
|
||||
}
|
||||
return "other"
|
||||
}
|
||||
|
||||
func NormalizeTerminalStatus(value string) string {
|
||||
value = strings.ToLower(strings.TrimSpace(value))
|
||||
if normalized, ok := knownTerminalStatuses[value]; ok {
|
||||
return normalized
|
||||
}
|
||||
return "other"
|
||||
}
|
||||
|
||||
func NormalizeFailureReason(value string) string {
|
||||
value = strings.TrimSpace(value)
|
||||
if normalized, ok := knownFailureReasons[value]; ok {
|
||||
return normalized
|
||||
}
|
||||
return taskfailure.Classify(value).String()
|
||||
}
|
||||
|
||||
func NormalizeTokenType(value string) string {
|
||||
value = strings.ToLower(strings.TrimSpace(value))
|
||||
if normalized, ok := knownTokenTypes[value]; ok {
|
||||
return normalized
|
||||
}
|
||||
return "input"
|
||||
}
|
||||
|
||||
func NormalizeModelAlias(value string) string {
|
||||
value = strings.ToLower(strings.TrimSpace(value))
|
||||
if value == "" {
|
||||
return "unknown"
|
||||
}
|
||||
value = modelAliasUnsafeRe.ReplaceAllString(value, "_")
|
||||
if len(value) > 128 {
|
||||
return value[:128]
|
||||
}
|
||||
return value
|
||||
}
|
||||
25
server/internal/metrics/labels_test.go
Normal file
25
server/internal/metrics/labels_test.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package metrics
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestBusinessMetricLabelsRejectHighCardinalityNames(t *testing.T) {
|
||||
for metric, labels := range businessMetricLabels {
|
||||
for _, label := range labels {
|
||||
if _, forbidden := forbiddenMetricLabels[label]; forbidden {
|
||||
t.Fatalf("metric %s uses forbidden label %s", metric, label)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeLabelsCollapseUnknownValues(t *testing.T) {
|
||||
if got := NormalizeRuntimeProvider("provider-from-user-input"); got != "other" {
|
||||
t.Fatalf("NormalizeRuntimeProvider unknown = %q, want other", got)
|
||||
}
|
||||
if got := NormalizeRuntimeMode("workspace-123"); got != "unknown" {
|
||||
t.Fatalf("NormalizeRuntimeMode unknown = %q, want unknown", got)
|
||||
}
|
||||
if got := NormalizeTaskSource("task-123"); got != "other" {
|
||||
t.Fatalf("NormalizeTaskSource unknown = %q, want other", got)
|
||||
}
|
||||
}
|
||||
80
server/internal/metrics/pricing.go
Normal file
80
server/internal/metrics/pricing.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type ModelPrice struct {
|
||||
Provider string
|
||||
Model string
|
||||
InputPerM float64
|
||||
CacheReadPerM float64
|
||||
CacheWritePerM float64
|
||||
OutputPerM float64
|
||||
}
|
||||
|
||||
var modelPrices = map[string]ModelPrice{
|
||||
"openai:gpt-5.5": {Provider: "openai", Model: "gpt-5.5", InputPerM: 5.00, CacheReadPerM: 0.50, CacheWritePerM: 0.50, OutputPerM: 30.00},
|
||||
"openai:gpt-5.4": {Provider: "openai", Model: "gpt-5.4", InputPerM: 2.50, CacheReadPerM: 0.25, CacheWritePerM: 0.25, OutputPerM: 15.00},
|
||||
"openai:gpt-5.4-mini": {Provider: "openai", Model: "gpt-5.4-mini", InputPerM: 0.75, CacheReadPerM: 0.075, CacheWritePerM: 0.075, OutputPerM: 4.50},
|
||||
"openai:gpt-5.3-codex": {Provider: "openai", Model: "gpt-5.3-codex", InputPerM: 1.75, CacheReadPerM: 0.175, CacheWritePerM: 0.175, OutputPerM: 14.00},
|
||||
"openai:gpt-5.2-codex": {Provider: "openai", Model: "gpt-5.2-codex", InputPerM: 1.75, CacheReadPerM: 0.175, CacheWritePerM: 0.175, OutputPerM: 14.00},
|
||||
"anthropic:claude-opus-4.7": {Provider: "anthropic", Model: "claude-opus-4.7", InputPerM: 5.00, CacheReadPerM: 0.50, CacheWritePerM: 6.25, OutputPerM: 25.00},
|
||||
"anthropic:claude-opus-4.6": {Provider: "anthropic", Model: "claude-opus-4.6", InputPerM: 5.00, CacheReadPerM: 0.50, CacheWritePerM: 6.25, OutputPerM: 25.00},
|
||||
"anthropic:claude-opus-4.5": {Provider: "anthropic", Model: "claude-opus-4.5", InputPerM: 5.00, CacheReadPerM: 0.50, CacheWritePerM: 6.25, OutputPerM: 25.00},
|
||||
"anthropic:claude-sonnet-4.6": {Provider: "anthropic", Model: "claude-sonnet-4.6", InputPerM: 3.00, CacheReadPerM: 0.30, CacheWritePerM: 3.75, OutputPerM: 15.00},
|
||||
"anthropic:claude-sonnet-4.5": {Provider: "anthropic", Model: "claude-sonnet-4.5", InputPerM: 3.00, CacheReadPerM: 0.30, CacheWritePerM: 3.75, OutputPerM: 15.00},
|
||||
"anthropic:claude-haiku-4.5": {Provider: "anthropic", Model: "claude-haiku-4.5", InputPerM: 1.00, CacheReadPerM: 0.10, CacheWritePerM: 1.25, OutputPerM: 5.00},
|
||||
"deepseek:v4-pro": {Provider: "deepseek", Model: "v4-pro", InputPerM: 1.74, CacheReadPerM: 0.0145, CacheWritePerM: 1.74, OutputPerM: 3.48},
|
||||
"deepseek:v4-flash": {Provider: "deepseek", Model: "v4-flash", InputPerM: 0.56, CacheReadPerM: 0.0112, CacheWritePerM: 0.56, OutputPerM: 1.12},
|
||||
"minimax:m2.7": {Provider: "minimax", Model: "m2.7", InputPerM: 0.30, CacheReadPerM: 0.06, CacheWritePerM: 0.375, OutputPerM: 1.20},
|
||||
"minimax:m2.7-highspeed": {Provider: "minimax", Model: "m2.7-highspeed", InputPerM: 0.60, CacheReadPerM: 0.06, CacheWritePerM: 0.375, OutputPerM: 2.40},
|
||||
"google:gemini-3-flash": {Provider: "google", Model: "gemini-3-flash", InputPerM: 0.50, CacheReadPerM: 0.05, CacheWritePerM: 0.50, OutputPerM: 3.00},
|
||||
"google:gemini-3.1-pro": {Provider: "google", Model: "gemini-3.1-pro", InputPerM: 2.00, CacheReadPerM: 0.20, CacheWritePerM: 2.00, OutputPerM: 12.00},
|
||||
"google:gemini-2.5-pro": {Provider: "google", Model: "gemini-2.5-pro", InputPerM: 1.25, CacheReadPerM: 0.31, CacheWritePerM: 1.25, OutputPerM: 10.00},
|
||||
"google:gemini-2.5-flash": {Provider: "google", Model: "gemini-2.5-flash", InputPerM: 0.30, CacheReadPerM: 0.03, CacheWritePerM: 0.30, OutputPerM: 2.50},
|
||||
}
|
||||
|
||||
var modelAliasRules = []struct {
|
||||
re *regexp.Regexp
|
||||
priceKey string
|
||||
}{
|
||||
{regexp.MustCompile(`(^|/|:)gpt-5[.-]5$|^gpt-5-5$`), "openai:gpt-5.5"},
|
||||
{regexp.MustCompile(`(^|/|:)gpt-5[.-]4($|-2026-03-05|-xhigh)`), "openai:gpt-5.4"},
|
||||
{regexp.MustCompile(`(^|/|:)gpt-5[.-]4-mini($|[^a-z0-9])`), "openai:gpt-5.4-mini"},
|
||||
{regexp.MustCompile(`(^|/|:)gpt-5[.-]3-codex$`), "openai:gpt-5.3-codex"},
|
||||
{regexp.MustCompile(`(^|/|:)gpt-5[.-]2-codex$`), "openai:gpt-5.2-codex"},
|
||||
{regexp.MustCompile(`claude-opus-4[-.]7`), "anthropic:claude-opus-4.7"},
|
||||
{regexp.MustCompile(`claude-opus-4[-.]6`), "anthropic:claude-opus-4.6"},
|
||||
{regexp.MustCompile(`claude-opus-4[-.]5`), "anthropic:claude-opus-4.5"},
|
||||
{regexp.MustCompile(`claude-sonnet-4[-.]6|claude-4[-.]6-sonnet`), "anthropic:claude-sonnet-4.6"},
|
||||
{regexp.MustCompile(`claude-sonnet-4[-.]5|claude-4[-.]5-sonnet`), "anthropic:claude-sonnet-4.5"},
|
||||
{regexp.MustCompile(`claude-haiku-4[-.]5`), "anthropic:claude-haiku-4.5"},
|
||||
{regexp.MustCompile(`deepseek-v4-pro`), "deepseek:v4-pro"},
|
||||
{regexp.MustCompile(`deepseek-v4-flash|^deepseek-chat$|^deepseek-reasoner$`), "deepseek:v4-flash"},
|
||||
{regexp.MustCompile(`minimax-m2[.]7.*highspeed|highspeed.*minimax-m2[.]7`), "minimax:m2.7-highspeed"},
|
||||
{regexp.MustCompile(`minimax-m2[.]7`), "minimax:m2.7"},
|
||||
{regexp.MustCompile(`gemini-3-flash`), "google:gemini-3-flash"},
|
||||
{regexp.MustCompile(`gemini-3[.]1-pro`), "google:gemini-3.1-pro"},
|
||||
{regexp.MustCompile(`gemini-2[.]5-pro`), "google:gemini-2.5-pro"},
|
||||
{regexp.MustCompile(`gemini-2[.]5-flash`), "google:gemini-2.5-flash"},
|
||||
}
|
||||
|
||||
func PriceForModelAlias(model string) (ModelPrice, bool) {
|
||||
model = strings.ToLower(strings.TrimSpace(model))
|
||||
for _, rule := range modelAliasRules {
|
||||
if rule.re.MatchString(model) {
|
||||
price, ok := modelPrices[rule.priceKey]
|
||||
return price, ok
|
||||
}
|
||||
}
|
||||
return ModelPrice{}, false
|
||||
}
|
||||
|
||||
func tokenCostUSD(tokens int64, pricePerM float64) float64 {
|
||||
if tokens <= 0 || pricePerM <= 0 {
|
||||
return 0
|
||||
}
|
||||
return float64(tokens) * pricePerM / 1_000_000
|
||||
}
|
||||
@@ -22,6 +22,7 @@ type RegistryOptions struct {
|
||||
type Registry struct {
|
||||
Gatherer prometheus.Gatherer
|
||||
HTTP *HTTPMetrics
|
||||
Business *BusinessMetrics
|
||||
}
|
||||
|
||||
func NewRegistry(opts RegistryOptions) *Registry {
|
||||
@@ -39,6 +40,9 @@ func NewRegistry(opts RegistryOptions) *Registry {
|
||||
httpMetrics := NewHTTPMetrics()
|
||||
reg.MustRegister(httpMetrics.Collectors()...)
|
||||
|
||||
businessMetrics := NewBusinessMetrics()
|
||||
reg.MustRegister(businessMetrics.Collectors()...)
|
||||
|
||||
if opts.Pool != nil {
|
||||
reg.MustRegister(NewDBCollector(opts.Pool))
|
||||
}
|
||||
@@ -52,6 +56,7 @@ func NewRegistry(opts RegistryOptions) *Registry {
|
||||
return &Registry{
|
||||
Gatherer: reg,
|
||||
HTTP: httpMetrics,
|
||||
Business: businessMetrics,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"github.com/multica-ai/multica/server/internal/analytics"
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/mention"
|
||||
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
|
||||
"github.com/multica-ai/multica/server/internal/realtime"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
@@ -30,6 +31,7 @@ type TaskService struct {
|
||||
Hub *realtime.Hub
|
||||
Bus *events.Bus
|
||||
Analytics analytics.Client
|
||||
Metrics *obsmetrics.BusinessMetrics
|
||||
Wakeup TaskWakeupNotifier
|
||||
// EmptyClaim caches "this runtime has no queued task" so the daemon
|
||||
// poll path can skip a Postgres scan on the steady-state empty case.
|
||||
@@ -134,10 +136,18 @@ func isTrivialDoneOutput(output string) bool {
|
||||
}
|
||||
|
||||
func (s *TaskService) captureTaskQueued(ctx context.Context, task db.AgentTaskQueue) {
|
||||
if s.Metrics != nil {
|
||||
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordTaskEnqueued(source, runtimeMode)
|
||||
}
|
||||
s.captureTaskEvent(ctx, analytics.AgentTaskQueued(s.taskAnalyticsContext(ctx, task)))
|
||||
}
|
||||
|
||||
func (s *TaskService) captureTaskDispatched(ctx context.Context, task db.AgentTaskQueue) {
|
||||
if s.Metrics != nil {
|
||||
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordTaskDispatched(util.UUIDToString(task.ID), source, runtimeMode, taskQueueWaitSeconds(task))
|
||||
}
|
||||
s.captureTaskEvent(ctx, analytics.AgentTaskDispatched(s.taskAnalyticsContext(ctx, task)))
|
||||
}
|
||||
|
||||
@@ -146,10 +156,18 @@ func (s *TaskService) AnalyticsContextForTask(ctx context.Context, task db.Agent
|
||||
}
|
||||
|
||||
func (s *TaskService) captureTaskStarted(ctx context.Context, task db.AgentTaskQueue) {
|
||||
if s.Metrics != nil {
|
||||
source, runtimeMode, provider := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordTaskStarted(source, runtimeMode, provider)
|
||||
}
|
||||
s.captureTaskEvent(ctx, analytics.AgentTaskStarted(s.taskAnalyticsContext(ctx, task)))
|
||||
}
|
||||
|
||||
func (s *TaskService) captureTaskCompleted(ctx context.Context, task db.AgentTaskQueue) {
|
||||
if s.Metrics != nil {
|
||||
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
|
||||
}
|
||||
s.captureTaskEvent(ctx, analytics.AgentTaskCompleted(
|
||||
s.taskAnalyticsContext(ctx, task),
|
||||
taskDurationMS(task),
|
||||
@@ -158,6 +176,11 @@ func (s *TaskService) captureTaskCompleted(ctx context.Context, task db.AgentTas
|
||||
|
||||
func (s *TaskService) captureTaskFailed(ctx context.Context, task db.AgentTaskQueue) {
|
||||
failureReason := taskFailureReason(task)
|
||||
if s.Metrics != nil {
|
||||
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
|
||||
s.Metrics.RecordTaskFailed(source, runtimeMode, failureReason)
|
||||
}
|
||||
s.captureTaskEvent(ctx, analytics.AgentTaskFailed(
|
||||
s.taskAnalyticsContext(ctx, task),
|
||||
taskDurationMS(task),
|
||||
@@ -168,6 +191,10 @@ func (s *TaskService) captureTaskFailed(ctx context.Context, task db.AgentTaskQu
|
||||
}
|
||||
|
||||
func (s *TaskService) captureTaskCancelled(ctx context.Context, task db.AgentTaskQueue) {
|
||||
if s.Metrics != nil {
|
||||
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordTaskTerminal(util.UUIDToString(task.ID), source, runtimeMode, task.Status, taskRunSeconds(task), taskTotalSeconds(task), task.Attempt)
|
||||
}
|
||||
s.captureTaskEvent(ctx, analytics.AgentTaskCancelled(
|
||||
s.taskAnalyticsContext(ctx, task),
|
||||
taskDurationMS(task),
|
||||
@@ -184,6 +211,34 @@ func (s *TaskService) captureTaskCancelled(ctx context.Context, task db.AgentTas
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TaskService) CaptureTaskUsage(ctx context.Context, task db.AgentTaskQueue, provider, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int64) {
|
||||
if s.Metrics == nil {
|
||||
return
|
||||
}
|
||||
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordLLMUsage(source, runtimeMode, provider, model, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens)
|
||||
}
|
||||
|
||||
func (s *TaskService) CaptureQueuedExpiredTasks(ctx context.Context, tasks []db.AgentTaskQueue) {
|
||||
if s.Metrics == nil {
|
||||
return
|
||||
}
|
||||
for _, task := range tasks {
|
||||
source, runtimeMode, _ := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordTaskQueuedExpired(source, runtimeMode)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TaskService) CaptureLeaseExpiredTasks(ctx context.Context, tasks []db.AgentTaskQueue) {
|
||||
if s.Metrics == nil {
|
||||
return
|
||||
}
|
||||
for _, task := range tasks {
|
||||
source, _, _ := s.taskMetricsContext(ctx, task)
|
||||
s.Metrics.RecordTaskLeaseExpired(source)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *TaskService) captureTaskEvent(ctx context.Context, event analytics.Event) {
|
||||
if s.Analytics == nil {
|
||||
return
|
||||
@@ -246,6 +301,30 @@ func taskAnalyticsContextKey(task db.AgentTaskQueue) string {
|
||||
}, "|")
|
||||
}
|
||||
|
||||
func (s *TaskService) taskMetricsContext(ctx context.Context, task db.AgentTaskQueue) (source, runtimeMode, provider string) {
|
||||
tc := s.taskAnalyticsContext(ctx, task)
|
||||
source = "other"
|
||||
switch {
|
||||
case task.ChatSessionID.Valid:
|
||||
source = "chat"
|
||||
case task.IssueID.Valid:
|
||||
if tc.Source == analytics.SourceAutopilot {
|
||||
source = "autopilot_issue"
|
||||
} else {
|
||||
source = "issue"
|
||||
}
|
||||
case task.AutopilotRunID.Valid:
|
||||
source = "autopilot"
|
||||
default:
|
||||
if _, ok := s.parseQuickCreateContext(task); ok {
|
||||
source = "quick_create"
|
||||
} else if tc.Source != "" {
|
||||
source = tc.Source
|
||||
}
|
||||
}
|
||||
return source, tc.RuntimeMode, tc.Provider
|
||||
}
|
||||
|
||||
func (s *TaskService) taskAnalyticsContext(ctx context.Context, task db.AgentTaskQueue) analytics.TaskContext {
|
||||
if tc, ok := s.cachedTaskAnalyticsContext(task); ok {
|
||||
return tc
|
||||
@@ -351,6 +430,29 @@ func taskDurationMS(task db.AgentTaskQueue) int64 {
|
||||
return ms
|
||||
}
|
||||
|
||||
func taskQueueWaitSeconds(task db.AgentTaskQueue) float64 {
|
||||
return durationSeconds(task.CreatedAt, task.DispatchedAt)
|
||||
}
|
||||
|
||||
func taskRunSeconds(task db.AgentTaskQueue) float64 {
|
||||
return durationSeconds(task.StartedAt, task.CompletedAt)
|
||||
}
|
||||
|
||||
func taskTotalSeconds(task db.AgentTaskQueue) float64 {
|
||||
return durationSeconds(task.CreatedAt, task.CompletedAt)
|
||||
}
|
||||
|
||||
func durationSeconds(start, end pgtype.Timestamptz) float64 {
|
||||
if !start.Valid || !end.Valid {
|
||||
return -1
|
||||
}
|
||||
seconds := end.Time.Sub(start.Time).Seconds()
|
||||
if seconds < 0 {
|
||||
return 0
|
||||
}
|
||||
return seconds
|
||||
}
|
||||
|
||||
func taskFailureReason(task db.AgentTaskQueue) string {
|
||||
if task.FailureReason.Valid && task.FailureReason.String != "" {
|
||||
return task.FailureReason.String
|
||||
|
||||
Reference in New Issue
Block a user