Compare commits

...

2 Commits

Author SHA1 Message Date
Jiang Bohan
054aab2a1a fix(runtime): bucket daily usage by task_usage.created_at, not enqueue time
ListRuntimeUsage was aggregating by DATE(atq.created_at) and filtering
on atq.created_at. agent_task_queue.created_at is the enqueue timestamp,
which drifts from actual token-production time: a task queued at 23:58
and executed at 00:05 was attributed to yesterday; a task sitting in
the queue overnight was counted on the queue day.

The ?days=N cutoff also became a rolling window (now() - N) instead of
a calendar-day boundary, silently clipping the morning of the earliest
day returned.

Switch bucket + filter to task_usage.created_at (~= task completion /
usage-report time) and snap the since cutoff to start-of-day via
DATE_TRUNC.

Add a regression test covering both scenarios: cross-midnight task
attributes to the day tokens were reported, and the earliest day's
pre-cutoff rows are still included.
2026-04-16 18:24:23 +08:00
Jiang Bohan
980b2bb7ef refactor(runtime): derive runtime usage from task_usage only
The daemon used to scan each runtime's local CLI log directory every 5
minutes (Claude Code, Codex, OpenCode, OpenClaw, Hermes) and post daily
aggregates to /api/daemon/runtimes/{id}/usage. Those directories are
shared with the user's own local CLI sessions, so the user's personal
usage was being counted as Daemon-executed usage. Cursor and Gemini had
no scanner at all, so their runtime-level aggregates were always zero.

Switch GetRuntimeUsage to aggregate task_usage (already scoped to
Daemon-executed tasks) via agent_task_queue.runtime_id. Single source of
truth; Cursor/Gemini/Copilot get runtime usage for free; no reliance on
external CLI log formats.

Removes:
- server/internal/daemon/usage/ (all scanners)
- Daemon.usageScanLoop + providerToRuntimeMap
- Client.ReportUsage
- ReportRuntimeUsage handler + POST /api/daemon/runtimes/{id}/usage
- UpsertRuntimeUsage / GetRuntimeUsageSummary queries
- runtime_usage table (migration 046)

Refs: MUL-786
2026-04-16 17:57:22 +08:00
21 changed files with 205 additions and 1769 deletions

View File

@@ -137,7 +137,6 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime)
r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime)
r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage)
r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult)
r.Post("/runtimes/{runtimeId}/update/{updateId}/result", h.ReportUpdateResult)

View File

@@ -140,12 +140,6 @@ func (c *Client) GetTaskStatus(ctx context.Context, taskID string) (string, erro
return resp.Status, nil
}
func (c *Client) ReportUsage(ctx context.Context, runtimeID string, entries []map[string]any) error {
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/usage", runtimeID), map[string]any{
"entries": entries,
}, nil)
}
// HeartbeatResponse contains the server's response to a heartbeat, including any pending actions.
type HeartbeatResponse struct {
Status string `json:"status"`

View File

@@ -15,7 +15,6 @@ import (
"github.com/multica-ai/multica/server/internal/cli"
"github.com/multica-ai/multica/server/internal/daemon/execenv"
"github.com/multica-ai/multica/server/internal/daemon/repocache"
"github.com/multica-ai/multica/server/internal/daemon/usage"
"github.com/multica-ai/multica/server/pkg/agent"
)
@@ -107,7 +106,6 @@ func (d *Daemon) Run(ctx context.Context) error {
go d.workspaceSyncLoop(ctx)
go d.heartbeatLoop(ctx)
go d.usageScanLoop(ctx)
go d.gcLoop(ctx)
go d.serveHealth(ctx, healthLn, time.Now())
return d.pollLoop(ctx)
@@ -176,17 +174,6 @@ func (d *Daemon) findRuntime(id string) *Runtime {
return nil
}
// providerToRuntimeMap returns a mapping from provider name to runtime ID.
func (d *Daemon) providerToRuntimeMap() map[string]string {
d.mu.Lock()
defer d.mu.Unlock()
m := make(map[string]string)
for id, rt := range d.runtimeIndex {
m[rt.Provider] = id
}
return m
}
func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID string) (*RegisterResponse, error) {
var runtimes []map[string]string
for name, entry := range d.cfg.Agents {
@@ -669,62 +656,6 @@ func (d *Daemon) triggerRestart() {
}
}
func (d *Daemon) usageScanLoop(ctx context.Context) {
scanner := usage.NewScanner(d.logger)
report := func() {
records := scanner.Scan()
if len(records) == 0 {
return
}
// Build provider -> runtime ID mapping from current state.
providerToRuntime := d.providerToRuntimeMap()
// Group records by provider to send to the correct runtime.
byProvider := make(map[string][]map[string]any)
for _, r := range records {
byProvider[r.Provider] = append(byProvider[r.Provider], map[string]any{
"date": r.Date,
"provider": r.Provider,
"model": r.Model,
"input_tokens": r.InputTokens,
"output_tokens": r.OutputTokens,
"cache_read_tokens": r.CacheReadTokens,
"cache_write_tokens": r.CacheWriteTokens,
})
}
for provider, entries := range byProvider {
runtimeID, ok := providerToRuntime[provider]
if !ok {
d.logger.Debug("no runtime for provider, skipping usage report", "provider", provider)
continue
}
if err := d.client.ReportUsage(ctx, runtimeID, entries); err != nil {
d.logger.Warn("usage report failed", "provider", provider, "runtime_id", runtimeID, "error", err)
} else {
d.logger.Info("usage reported", "provider", provider, "runtime_id", runtimeID, "entries", len(entries))
}
}
}
// Initial scan on startup.
report()
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
report()
}
}
}
func (d *Daemon) pollLoop(ctx context.Context) error {
sem := make(chan struct{}, d.cfg.MaxConcurrentTasks)
var wg sync.WaitGroup

View File

@@ -1,173 +0,0 @@
package usage
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"strings"
"time"
)
// scanClaude reads Claude Code JSONL session logs from ~/.config/claude/projects/**/*.jsonl
// and extracts token usage from "assistant" message lines.
func (s *Scanner) scanClaude() []Record {
roots := claudeLogRoots()
if len(roots) == 0 {
return nil
}
var allRecords []Record
seen := make(map[string]bool) // dedup by "messageId:requestId"
for _, root := range roots {
files, err := filepath.Glob(filepath.Join(root, "**", "*.jsonl"))
if err != nil {
s.logger.Debug("claude glob error", "root", root, "error", err)
continue
}
// Also glob one level deeper for subagent logs
deeper, _ := filepath.Glob(filepath.Join(root, "**", "**", "*.jsonl"))
files = append(files, deeper...)
for _, f := range files {
records := s.parseClaudeFile(f, seen)
allRecords = append(allRecords, records...)
}
}
return mergeRecords(allRecords)
}
// claudeLogRoots returns the directories to scan for Claude JSONL logs.
func claudeLogRoots() []string {
var roots []string
// Check CLAUDE_CONFIG_DIR env var
if configDir := os.Getenv("CLAUDE_CONFIG_DIR"); configDir != "" {
for _, dir := range strings.Split(configDir, ",") {
dir = strings.TrimSpace(dir)
if dir != "" {
roots = append(roots, filepath.Join(dir, "projects"))
}
}
}
// Standard locations
home, err := os.UserHomeDir()
if err != nil {
return roots
}
candidates := []string{
filepath.Join(home, ".config", "claude", "projects"),
filepath.Join(home, ".claude", "projects"),
}
for _, dir := range candidates {
if info, err := os.Stat(dir); err == nil && info.IsDir() {
roots = append(roots, dir)
}
}
return roots
}
// claudeLine represents the subset of a Claude JSONL line we care about.
type claudeLine struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"`
RequestID string `json:"requestId"`
Message *struct {
ID string `json:"id"`
Model string `json:"model"`
Usage *struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadInputTokens int64 `json:"cache_read_input_tokens"`
CacheCreationInputTokens int64 `json:"cache_creation_input_tokens"`
} `json:"usage"`
} `json:"message"`
}
func (s *Scanner) parseClaudeFile(path string, seen map[string]bool) []Record {
f, err := os.Open(path)
if err != nil {
return nil
}
defer f.Close()
var records []Record
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024) // up to 1MB lines
for scanner.Scan() {
line := scanner.Bytes()
// Fast pre-filter: skip lines that can't contain what we need
if !bytesContains(line, `"type":"assistant"`) && !bytesContains(line, `"type": "assistant"`) {
continue
}
if !bytesContains(line, `"usage"`) {
continue
}
var entry claudeLine
if err := json.Unmarshal(line, &entry); err != nil {
continue
}
if entry.Type != "assistant" || entry.Message == nil || entry.Message.Usage == nil {
continue
}
// Dedup: Claude streaming produces multiple lines with same message.id + requestId
// with cumulative token counts. Take only the first occurrence.
dedupKey := entry.Message.ID + ":" + entry.RequestID
if dedupKey != ":" && seen[dedupKey] {
continue
}
if dedupKey != ":" {
seen[dedupKey] = true
}
// Parse timestamp to get date
ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp)
if err != nil {
ts, err = time.Parse(time.RFC3339, entry.Timestamp)
if err != nil {
continue
}
}
model := entry.Message.Model
if model == "" {
model = "unknown"
}
records = append(records, Record{
Date: ts.Local().Format("2006-01-02"),
Provider: "claude",
Model: normalizeClaudeModel(model),
InputTokens: entry.Message.Usage.InputTokens,
OutputTokens: entry.Message.Usage.OutputTokens,
CacheReadTokens: entry.Message.Usage.CacheReadInputTokens,
CacheWriteTokens: entry.Message.Usage.CacheCreationInputTokens,
})
}
return records
}
// normalizeClaudeModel strips common prefixes/suffixes from model names.
func normalizeClaudeModel(model string) string {
// Strip "anthropic." prefix
model = strings.TrimPrefix(model, "anthropic.")
// Strip Vertex AI prefixes like "us.anthropic."
if idx := strings.LastIndex(model, "anthropic."); idx >= 0 {
model = model[idx+len("anthropic."):]
}
return model
}
func bytesContains(data []byte, substr string) bool {
return strings.Contains(string(data), substr)
}

View File

@@ -1,178 +0,0 @@
package usage
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"strings"
)
// scanCodex reads Codex CLI session logs from ~/.codex/sessions/YYYY/MM/DD/*.jsonl
// and extracts token usage from "token_count" event lines.
func (s *Scanner) scanCodex() []Record {
root := codexLogRoot()
if root == "" {
return nil
}
// Glob for session files: ~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl
pattern := filepath.Join(root, "*", "*", "*", "*.jsonl")
files, err := filepath.Glob(pattern)
if err != nil {
s.logger.Debug("codex glob error", "error", err)
return nil
}
var allRecords []Record
for _, f := range files {
record := s.parseCodexFile(f)
if record != nil {
allRecords = append(allRecords, *record)
}
}
return mergeRecords(allRecords)
}
// codexLogRoot returns the Codex sessions directory.
func codexLogRoot() string {
if codexHome := os.Getenv("CODEX_HOME"); codexHome != "" {
dir := filepath.Join(codexHome, "sessions")
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
}
home, err := os.UserHomeDir()
if err != nil {
return ""
}
dir := filepath.Join(home, ".codex", "sessions")
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
return ""
}
// codexEvent represents a line in a Codex session JSONL file.
type codexEvent struct {
Type string `json:"type"`
Payload *codexPayload `json:"payload"`
}
type codexPayload struct {
Type string `json:"type"`
Info *codexTokenInfo `json:"info"`
Model string `json:"model"` // present in turn_context events
}
type codexTokenInfo struct {
TotalTokenUsage *codexTokenUsage `json:"total_token_usage"`
LastTokenUsage *codexTokenUsage `json:"last_token_usage"`
Model string `json:"model"`
}
type codexTokenUsage struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CachedInputTokens int64 `json:"cached_input_tokens"`
CacheReadInputTokens int64 `json:"cache_read_input_tokens"`
ReasoningOutputTokens int64 `json:"reasoning_output_tokens"`
TotalTokens int64 `json:"total_tokens"`
}
// parseCodexFile extracts the final cumulative token_count from a Codex session file.
// Returns nil if no usage data found.
func (s *Scanner) parseCodexFile(path string) *Record {
f, err := os.Open(path)
if err != nil {
return nil
}
defer f.Close()
// Extract date from directory path: .../sessions/YYYY/MM/DD/file.jsonl
date := extractDateFromPath(path)
if date == "" {
return nil
}
var lastUsage *codexTokenUsage
var lastModel string
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Bytes()
// Fast pre-filter: only parse lines with token_count or turn_context
hasTokenCount := bytesContains(line, `"token_count"`)
hasTurnContext := bytesContains(line, `"turn_context"`)
if !hasTokenCount && !hasTurnContext {
continue
}
var evt codexEvent
if err := json.Unmarshal(line, &evt); err != nil || evt.Payload == nil {
continue
}
// Track model from turn_context events
if evt.Type == "turn_context" && evt.Payload.Model != "" {
lastModel = evt.Payload.Model
continue
}
// Extract token usage from token_count events
if evt.Payload.Type == "token_count" && evt.Payload.Info != nil {
usage := evt.Payload.Info.TotalTokenUsage
if usage == nil {
usage = evt.Payload.Info.LastTokenUsage
}
if usage != nil {
lastUsage = usage
if evt.Payload.Info.Model != "" {
lastModel = evt.Payload.Info.Model
}
}
}
}
if lastUsage == nil {
return nil
}
model := lastModel
if model == "" {
model = "unknown"
}
cachedTokens := lastUsage.CachedInputTokens
if cachedTokens == 0 {
cachedTokens = lastUsage.CacheReadInputTokens
}
return &Record{
Date: date,
Provider: "codex",
Model: model,
InputTokens: lastUsage.InputTokens,
OutputTokens: lastUsage.OutputTokens + lastUsage.ReasoningOutputTokens,
CacheReadTokens: cachedTokens,
CacheWriteTokens: 0,
}
}
// extractDateFromPath extracts YYYY-MM-DD from a path like .../sessions/2026/03/26/file.jsonl
func extractDateFromPath(path string) string {
parts := strings.Split(filepath.ToSlash(path), "/")
// Look for sessions/YYYY/MM/DD pattern
for i := 0; i < len(parts)-3; i++ {
if parts[i] == "sessions" && len(parts[i+1]) == 4 && len(parts[i+2]) == 2 && len(parts[i+3]) == 2 {
return parts[i+1] + "-" + parts[i+2] + "-" + parts[i+3]
}
}
return ""
}

View File

@@ -1,140 +0,0 @@
package usage
import (
"log/slog"
"os"
"path/filepath"
"testing"
)
func TestParseCodexFile(t *testing.T) {
// Create a temp directory structure: sessions/YYYY/MM/DD/file.jsonl
tmp := t.TempDir()
sessionsDir := filepath.Join(tmp, "sessions", "2026", "01", "14")
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
t.Fatal(err)
}
// Real Codex JSONL format with turn_context and token_count events
content := `{"timestamp":"2026-01-13T17:41:31.666Z","type":"turn_context","payload":{"cwd":"/tmp","model":"gpt-5.2-codex","effort":"high"}}
{"timestamp":"2026-01-13T17:41:32.916Z","type":"event_msg","payload":{"type":"token_count","info":null,"rate_limits":{"primary":{"used_percent":24.0}}}}
{"timestamp":"2026-01-13T17:44:06.217Z","type":"event_msg","payload":{"type":"token_count","info":{"total_token_usage":{"input_tokens":328894,"cached_input_tokens":287872,"output_tokens":3071,"reasoning_output_tokens":960,"total_tokens":331965},"last_token_usage":{"input_tokens":24525,"cached_input_tokens":3200,"output_tokens":1815,"reasoning_output_tokens":960,"total_tokens":26340},"model_context_window":258400},"rate_limits":{"primary":{"used_percent":26.0}}}}
`
filePath := filepath.Join(sessionsDir, "rollout-test.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseCodexFile(filePath)
if record == nil {
t.Fatal("expected non-nil record")
}
if record.Date != "2026-01-14" {
t.Errorf("date = %q, want %q", record.Date, "2026-01-14")
}
if record.Provider != "codex" {
t.Errorf("provider = %q, want %q", record.Provider, "codex")
}
if record.Model != "gpt-5.2-codex" {
t.Errorf("model = %q, want %q", record.Model, "gpt-5.2-codex")
}
if record.InputTokens != 328894 {
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 328894)
}
// output_tokens + reasoning_output_tokens
if record.OutputTokens != 3071+960 {
t.Errorf("output_tokens = %d, want %d", record.OutputTokens, 3071+960)
}
if record.CacheReadTokens != 287872 {
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 287872)
}
}
func TestParseCodexFile_NullInfo(t *testing.T) {
// When all token_count events have info:null, should return nil
tmp := t.TempDir()
sessionsDir := filepath.Join(tmp, "sessions", "2026", "01", "14")
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
t.Fatal(err)
}
content := `{"timestamp":"2026-01-13T17:41:32.916Z","type":"event_msg","payload":{"type":"token_count","info":null}}
`
filePath := filepath.Join(sessionsDir, "rollout-test.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseCodexFile(filePath)
if record != nil {
t.Errorf("expected nil record for null info, got %+v", record)
}
}
func TestParseCodexFile_LastTokenUsageFallback(t *testing.T) {
// When total_token_usage is absent but last_token_usage exists
tmp := t.TempDir()
sessionsDir := filepath.Join(tmp, "sessions", "2026", "03", "27")
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
t.Fatal(err)
}
content := `{"timestamp":"2026-03-27T10:00:00Z","type":"turn_context","payload":{"model":"gpt-5"}}
{"timestamp":"2026-03-27T10:01:00Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":1000,"cached_input_tokens":200,"output_tokens":500}}}}
`
filePath := filepath.Join(sessionsDir, "rollout-test.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseCodexFile(filePath)
if record == nil {
t.Fatal("expected non-nil record")
}
if record.InputTokens != 1000 {
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 1000)
}
if record.OutputTokens != 500 {
t.Errorf("output_tokens = %d, want %d", record.OutputTokens, 500)
}
if record.CacheReadTokens != 200 {
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 200)
}
}
func TestParseCodexFile_CacheReadInputTokens(t *testing.T) {
// Test the alternative field name cache_read_input_tokens
tmp := t.TempDir()
sessionsDir := filepath.Join(tmp, "sessions", "2026", "03", "27")
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
t.Fatal(err)
}
content := `{"timestamp":"2026-03-27T10:00:00Z","type":"event_msg","payload":{"type":"token_count","info":{"total_token_usage":{"input_tokens":5000,"cache_read_input_tokens":3000,"output_tokens":800},"model":"gpt-5.2-codex"}}}
`
filePath := filepath.Join(sessionsDir, "rollout-test.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseCodexFile(filePath)
if record == nil {
t.Fatal("expected non-nil record")
}
if record.CacheReadTokens != 3000 {
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 3000)
}
if record.Model != "gpt-5.2-codex" {
t.Errorf("model = %q, want %q", record.Model, "gpt-5.2-codex")
}
}

View File

@@ -1,173 +0,0 @@
package usage
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"time"
)
// scanHermes reads Hermes JSONL session files from
// ~/.hermes/sessions/*.jsonl
// and extracts token usage from assistant message and usage_update entries.
//
// Hermes communicates via the ACP (Agent Communication Protocol) and logs
// session events as JSONL. Token usage appears in:
// - "assistant" messages with a "usage" field
// - "usage_update" notification entries with cumulative token snapshots
func (s *Scanner) scanHermes() []Record {
root := hermesSessionRoot()
if root == "" {
return nil
}
// Glob for session files: sessions/*.jsonl
pattern := filepath.Join(root, "*.jsonl")
files, err := filepath.Glob(pattern)
if err != nil {
s.logger.Debug("hermes glob error", "error", err)
return nil
}
var allRecords []Record
for _, f := range files {
record := s.parseHermesFile(f)
if record != nil {
allRecords = append(allRecords, *record)
}
}
return mergeRecords(allRecords)
}
// hermesSessionRoot returns the Hermes sessions directory.
func hermesSessionRoot() string {
if hermesHome := os.Getenv("HERMES_HOME"); hermesHome != "" {
dir := filepath.Join(hermesHome, "sessions")
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
}
home, err := os.UserHomeDir()
if err != nil {
return ""
}
// Check common locations.
candidates := []string{
filepath.Join(home, ".hermes", "sessions"),
filepath.Join(home, ".local", "share", "hermes", "sessions"),
filepath.Join(home, ".config", "hermes", "sessions"),
}
for _, dir := range candidates {
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
}
return ""
}
// hermesLine represents a line in a Hermes session JSONL file.
// Hermes session logs contain both message events and notification events.
type hermesLine struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"` // RFC3339
Model string `json:"model"`
Usage *struct {
InputTokens int64 `json:"inputTokens"`
OutputTokens int64 `json:"outputTokens"`
CachedReadTokens int64 `json:"cachedReadTokens"`
ThoughtTokens int64 `json:"thoughtTokens"`
} `json:"usage"`
}
// parseHermesFile extracts the final cumulative token usage from a Hermes session file.
// Hermes usage_update events are cumulative snapshots — the last one in the file
// represents the total usage for the session. Returns nil if no usage data found.
func (s *Scanner) parseHermesFile(path string) *Record {
f, err := os.Open(path)
if err != nil {
return nil
}
defer f.Close()
var lastUsage *struct {
InputTokens int64 `json:"inputTokens"`
OutputTokens int64 `json:"outputTokens"`
CachedReadTokens int64 `json:"cachedReadTokens"`
ThoughtTokens int64 `json:"thoughtTokens"`
}
var lastModel string
var lastTimestamp string
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Bytes()
// Fast pre-filter.
if !bytesContains(line, `"usage"`) && !bytesContains(line, `"inputTokens"`) {
continue
}
var entry hermesLine
if err := json.Unmarshal(line, &entry); err != nil {
continue
}
if entry.Usage == nil {
continue
}
// Take the latest usage snapshot (cumulative).
lastUsage = entry.Usage
if entry.Model != "" {
lastModel = entry.Model
}
if entry.Timestamp != "" {
lastTimestamp = entry.Timestamp
}
}
if lastUsage == nil {
return nil
}
if lastUsage.InputTokens == 0 && lastUsage.OutputTokens == 0 {
return nil
}
// Parse timestamp for date.
var date string
if lastTimestamp != "" {
if ts, err := time.Parse(time.RFC3339Nano, lastTimestamp); err == nil {
date = ts.Local().Format("2006-01-02")
} else if ts, err := time.Parse(time.RFC3339, lastTimestamp); err == nil {
date = ts.Local().Format("2006-01-02")
}
}
if date == "" {
// Fall back to file modification time.
if info, err := os.Stat(path); err == nil {
date = info.ModTime().Local().Format("2006-01-02")
} else {
return nil
}
}
model := lastModel
if model == "" {
model = "unknown"
}
return &Record{
Date: date,
Provider: "hermes",
Model: model,
InputTokens: lastUsage.InputTokens,
OutputTokens: lastUsage.OutputTokens + lastUsage.ThoughtTokens,
CacheReadTokens: lastUsage.CachedReadTokens,
}
}

View File

@@ -1,99 +0,0 @@
package usage
import (
"log/slog"
"os"
"path/filepath"
"testing"
)
func TestParseHermesFile(t *testing.T) {
tmp := t.TempDir()
// Hermes session JSONL with usage_update entries (cumulative snapshots)
content := `{"type":"session_start","timestamp":"2026-04-10T14:00:00.000Z","model":"claude-sonnet-4-5"}
{"type":"usage_update","timestamp":"2026-04-10T14:01:00.000Z","model":"claude-sonnet-4-5","usage":{"inputTokens":1000,"outputTokens":200,"cachedReadTokens":500,"thoughtTokens":50}}
{"type":"usage_update","timestamp":"2026-04-10T14:02:00.000Z","model":"claude-sonnet-4-5","usage":{"inputTokens":3000,"outputTokens":600,"cachedReadTokens":1500,"thoughtTokens":100}}
`
filePath := filepath.Join(tmp, "session-001.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseHermesFile(filePath)
if record == nil {
t.Fatal("expected non-nil record")
}
if record.Provider != "hermes" {
t.Errorf("provider = %q, want %q", record.Provider, "hermes")
}
if record.Model != "claude-sonnet-4-5" {
t.Errorf("model = %q, want %q", record.Model, "claude-sonnet-4-5")
}
if record.Date != "2026-04-10" {
t.Errorf("date = %q, want %q", record.Date, "2026-04-10")
}
// Should take the last (cumulative) snapshot
if record.InputTokens != 3000 {
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 3000)
}
// output_tokens + thought_tokens
if record.OutputTokens != 700 {
t.Errorf("output_tokens = %d, want %d (600 + 100)", record.OutputTokens, 700)
}
if record.CacheReadTokens != 1500 {
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 1500)
}
}
func TestParseHermesFile_NoUsage(t *testing.T) {
tmp := t.TempDir()
content := `{"type":"session_start","timestamp":"2026-04-10T14:00:00.000Z","model":"test-model"}
{"type":"message","timestamp":"2026-04-10T14:01:00.000Z","content":"hello"}
`
filePath := filepath.Join(tmp, "session-empty.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseHermesFile(filePath)
if record != nil {
t.Errorf("expected nil record for no usage data, got %+v", record)
}
}
func TestParseHermesFile_SingleUsage(t *testing.T) {
tmp := t.TempDir()
content := `{"type":"usage_update","timestamp":"2026-04-10T14:01:00.000Z","model":"hermes-3","usage":{"inputTokens":500,"outputTokens":100,"cachedReadTokens":0,"thoughtTokens":0}}
`
filePath := filepath.Join(tmp, "session-single.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseHermesFile(filePath)
if record == nil {
t.Fatal("expected non-nil record")
}
if record.InputTokens != 500 {
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 500)
}
if record.OutputTokens != 100 {
t.Errorf("output_tokens = %d, want %d", record.OutputTokens, 100)
}
if record.Model != "hermes-3" {
t.Errorf("model = %q, want %q", record.Model, "hermes-3")
}
}

View File

@@ -1,154 +0,0 @@
package usage
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"strings"
"time"
)
// scanOpenClaw reads OpenClaw JSONL session files from
// ~/.openclaw/agents/*/sessions/*.jsonl
// and extracts token usage from assistant message entries.
func (s *Scanner) scanOpenClaw() []Record {
root := openClawSessionRoot()
if root == "" {
return nil
}
// Glob for session files: agents/*/sessions/*.jsonl
pattern := filepath.Join(root, "*", "sessions", "*.jsonl")
files, err := filepath.Glob(pattern)
if err != nil {
s.logger.Debug("openclaw glob error", "error", err)
return nil
}
var allRecords []Record
for _, f := range files {
records := s.parseOpenClawFile(f)
allRecords = append(allRecords, records...)
}
return mergeRecords(allRecords)
}
// openClawSessionRoot returns the OpenClaw agents directory.
func openClawSessionRoot() string {
if openclawHome := os.Getenv("OPENCLAW_HOME"); openclawHome != "" {
dir := filepath.Join(openclawHome, "agents")
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
}
home, err := os.UserHomeDir()
if err != nil {
return ""
}
dir := filepath.Join(home, ".openclaw", "agents")
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
return ""
}
// openClawLine represents a line in an OpenClaw JSONL session file.
type openClawLine struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"` // RFC3339
Message *struct {
Role string `json:"role"`
Provider string `json:"provider"`
Model string `json:"model"`
Usage *struct {
Input int64 `json:"input"`
Output int64 `json:"output"`
CacheRead int64 `json:"cacheRead"`
CacheWrite int64 `json:"cacheWrite"`
} `json:"usage"`
} `json:"message"`
}
// parseOpenClawFile extracts token usage records from an OpenClaw session JSONL file.
func (s *Scanner) parseOpenClawFile(path string) []Record {
f, err := os.Open(path)
if err != nil {
return nil
}
defer f.Close()
var records []Record
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024)
for scanner.Scan() {
line := scanner.Bytes()
// Fast pre-filter: skip lines that don't contain relevant data.
if !bytesContains(line, `"usage"`) {
continue
}
if !bytesContains(line, `"assistant"`) {
continue
}
var entry openClawLine
if err := json.Unmarshal(line, &entry); err != nil {
continue
}
if entry.Type != "message" || entry.Message == nil || entry.Message.Role != "assistant" || entry.Message.Usage == nil {
continue
}
u := entry.Message.Usage
if u.Input == 0 && u.Output == 0 {
continue
}
// Parse timestamp to get date.
ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp)
if err != nil {
ts, err = time.Parse(time.RFC3339, entry.Timestamp)
if err != nil {
continue
}
}
model := entry.Message.Model
if model == "" {
model = "unknown"
}
// Construct provider string: if the session has a provider, use "openclaw/<provider>"
// for attribution, but the Record.Provider field should be "openclaw".
provider := "openclaw"
_ = entry.Message.Provider // available but not used in provider field
records = append(records, Record{
Date: ts.Local().Format("2006-01-02"),
Provider: provider,
Model: normalizeOpenClawModel(entry.Message.Provider, model),
InputTokens: u.Input,
OutputTokens: u.Output,
CacheReadTokens: u.CacheRead,
CacheWriteTokens: u.CacheWrite,
})
}
return records
}
// normalizeOpenClawModel returns a model identifier. If the provider is known,
// it prefixes the model name for clarity (e.g. "deepseek/deepseek-chat").
func normalizeOpenClawModel(provider, model string) string {
provider = strings.TrimSpace(provider)
model = strings.TrimSpace(model)
if provider != "" && !strings.Contains(model, "/") {
return provider + "/" + model
}
return model
}

View File

@@ -1,93 +0,0 @@
package usage
import (
"log/slog"
"os"
"path/filepath"
"testing"
)
func TestParseOpenClawFile(t *testing.T) {
tmp := t.TempDir()
// Real OpenClaw session JSONL with session header, model_change, and assistant messages
content := `{"type":"session","version":3,"id":"multica-test","timestamp":"2026-04-11T13:53:05.847Z"}
{"type":"model_change","id":"03c18aae","timestamp":"2026-04-11T13:53:05.855Z","provider":"deepseek","modelId":"deepseek-chat"}
{"type":"message","id":"162ce1b7","parentId":"c90ecabe","timestamp":"2026-04-11T13:53:09.986Z","message":{"role":"assistant","content":[{"type":"text","text":"I'll start by getting the issue details."}],"api":"openai-completions","provider":"deepseek","model":"deepseek-chat","usage":{"input":133,"output":81,"cacheRead":16448,"cacheWrite":0,"totalTokens":16662}}}
{"type":"message","id":"3c063300","parentId":"50e4feb6","timestamp":"2026-04-11T13:53:14.750Z","message":{"role":"assistant","content":[{"type":"text","text":"Let me check the workspace."}],"provider":"deepseek","model":"deepseek-chat","usage":{"input":286,"output":94,"cacheRead":16448,"cacheWrite":0}}}
{"type":"message","id":"user001","timestamp":"2026-04-11T13:54:00.000Z","message":{"role":"user","content":[{"type":"text","text":"hello"}]}}
`
filePath := filepath.Join(tmp, "session.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
records := s.parseOpenClawFile(filePath)
if len(records) != 2 {
t.Fatalf("expected 2 records, got %d", len(records))
}
r := records[0]
if r.Provider != "openclaw" {
t.Errorf("provider = %q, want %q", r.Provider, "openclaw")
}
if r.Model != "deepseek/deepseek-chat" {
t.Errorf("model = %q, want %q", r.Model, "deepseek/deepseek-chat")
}
if r.InputTokens != 133 {
t.Errorf("input_tokens = %d, want %d", r.InputTokens, 133)
}
if r.OutputTokens != 81 {
t.Errorf("output_tokens = %d, want %d", r.OutputTokens, 81)
}
if r.CacheReadTokens != 16448 {
t.Errorf("cache_read_tokens = %d, want %d", r.CacheReadTokens, 16448)
}
if r.Date != "2026-04-11" {
t.Errorf("date = %q, want %q", r.Date, "2026-04-11")
}
}
func TestParseOpenClawFile_NoUsage(t *testing.T) {
tmp := t.TempDir()
// Session with no assistant messages containing usage
content := `{"type":"session","version":3,"id":"empty-session","timestamp":"2026-04-11T13:53:05.847Z"}
{"type":"message","id":"user001","timestamp":"2026-04-11T13:54:00.000Z","message":{"role":"user","content":[{"type":"text","text":"hello"}]}}
`
filePath := filepath.Join(tmp, "session.jsonl")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
records := s.parseOpenClawFile(filePath)
if len(records) != 0 {
t.Errorf("expected 0 records, got %d", len(records))
}
}
func TestNormalizeOpenClawModel(t *testing.T) {
tests := []struct {
provider string
model string
want string
}{
{"deepseek", "deepseek-chat", "deepseek/deepseek-chat"},
{"anthropic", "claude-sonnet-4-5", "anthropic/claude-sonnet-4-5"},
{"", "gpt-4o", "gpt-4o"},
{"openai", "openai/gpt-4o", "openai/gpt-4o"}, // already has /
}
for _, tt := range tests {
got := normalizeOpenClawModel(tt.provider, tt.model)
if got != tt.want {
t.Errorf("normalizeOpenClawModel(%q, %q) = %q, want %q", tt.provider, tt.model, got, tt.want)
}
}
}

View File

@@ -1,122 +0,0 @@
package usage
import (
"encoding/json"
"os"
"path/filepath"
"time"
)
// scanOpenCode reads OpenCode message JSON files from
// ~/.local/share/opencode/storage/message/ses_*/*.json
// and extracts token usage from assistant messages.
func (s *Scanner) scanOpenCode() []Record {
root := openCodeStorageRoot()
if root == "" {
return nil
}
// Glob for message files: storage/message/ses_*/*.json
pattern := filepath.Join(root, "ses_*", "*.json")
files, err := filepath.Glob(pattern)
if err != nil {
s.logger.Debug("opencode glob error", "error", err)
return nil
}
var allRecords []Record
for _, f := range files {
record := s.parseOpenCodeFile(f)
if record != nil {
allRecords = append(allRecords, *record)
}
}
return mergeRecords(allRecords)
}
// openCodeStorageRoot returns the OpenCode message storage directory.
func openCodeStorageRoot() string {
// Check XDG_DATA_HOME first, then fall back to ~/.local/share
dataHome := os.Getenv("XDG_DATA_HOME")
if dataHome == "" {
home, err := os.UserHomeDir()
if err != nil {
return ""
}
dataHome = filepath.Join(home, ".local", "share")
}
dir := filepath.Join(dataHome, "opencode", "storage", "message")
if info, err := os.Stat(dir); err == nil && info.IsDir() {
return dir
}
return ""
}
// openCodeMessage represents the subset of an OpenCode message JSON file we need.
type openCodeMessage struct {
Role string `json:"role"`
ModelID string `json:"modelID"`
ProviderID string `json:"providerID"`
Time *struct {
Created int64 `json:"created"` // unix milliseconds
} `json:"time"`
Tokens *struct {
Input int64 `json:"input"`
Output int64 `json:"output"`
Reasoning int64 `json:"reasoning"`
Cache *struct {
Read int64 `json:"read"`
Write int64 `json:"write"`
} `json:"cache"`
} `json:"tokens"`
}
// parseOpenCodeFile reads a single OpenCode message JSON file and returns a Record
// if it contains assistant token usage. Returns nil otherwise.
func (s *Scanner) parseOpenCodeFile(path string) *Record {
data, err := os.ReadFile(path)
if err != nil {
return nil
}
var msg openCodeMessage
if err := json.Unmarshal(data, &msg); err != nil {
return nil
}
// Only count assistant messages with token usage.
if msg.Role != "assistant" || msg.Tokens == nil || msg.Time == nil {
return nil
}
// Skip messages with no meaningful token usage.
if msg.Tokens.Input == 0 && msg.Tokens.Output == 0 {
return nil
}
ts := time.UnixMilli(msg.Time.Created)
date := ts.Local().Format("2006-01-02")
model := msg.ModelID
if model == "" {
model = "unknown"
}
var cacheRead, cacheWrite int64
if msg.Tokens.Cache != nil {
cacheRead = msg.Tokens.Cache.Read
cacheWrite = msg.Tokens.Cache.Write
}
return &Record{
Date: date,
Provider: "opencode",
Model: model,
InputTokens: msg.Tokens.Input,
OutputTokens: msg.Tokens.Output + msg.Tokens.Reasoning,
CacheReadTokens: cacheRead,
CacheWriteTokens: cacheWrite,
}
}

View File

@@ -1,141 +0,0 @@
package usage
import (
"log/slog"
"os"
"path/filepath"
"testing"
)
func TestParseOpenCodeFile(t *testing.T) {
tmp := t.TempDir()
// Real OpenCode message JSON format with token usage
content := `{
"id": "msg_test001",
"sessionID": "ses_test001",
"role": "assistant",
"time": {"created": 1768332037518, "completed": 1768332039410},
"modelID": "claude-sonnet-4-5",
"providerID": "anthropic",
"tokens": {"input": 10916, "output": 5, "reasoning": 100, "cache": {"read": 448, "write": 50}}
}`
filePath := filepath.Join(tmp, "msg_test001.json")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseOpenCodeFile(filePath)
if record == nil {
t.Fatal("expected non-nil record")
}
if record.Provider != "opencode" {
t.Errorf("provider = %q, want %q", record.Provider, "opencode")
}
if record.Model != "claude-sonnet-4-5" {
t.Errorf("model = %q, want %q", record.Model, "claude-sonnet-4-5")
}
if record.InputTokens != 10916 {
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 10916)
}
// output_tokens + reasoning
if record.OutputTokens != 105 {
t.Errorf("output_tokens = %d, want %d", record.OutputTokens, 105)
}
if record.CacheReadTokens != 448 {
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 448)
}
if record.CacheWriteTokens != 50 {
t.Errorf("cache_write_tokens = %d, want %d", record.CacheWriteTokens, 50)
}
}
func TestParseOpenCodeFile_UserMessage(t *testing.T) {
tmp := t.TempDir()
// User messages should be ignored (no token usage to report)
content := `{
"id": "msg_user001",
"sessionID": "ses_test001",
"role": "user",
"time": {"created": 1768332037000}
}`
filePath := filepath.Join(tmp, "msg_user001.json")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseOpenCodeFile(filePath)
if record != nil {
t.Errorf("expected nil record for user message, got %+v", record)
}
}
func TestParseOpenCodeFile_NoCache(t *testing.T) {
tmp := t.TempDir()
// Message without cache field
content := `{
"id": "msg_test002",
"sessionID": "ses_test002",
"role": "assistant",
"time": {"created": 1768332037518},
"modelID": "gpt-4o",
"providerID": "openai",
"tokens": {"input": 500, "output": 200, "reasoning": 0}
}`
filePath := filepath.Join(tmp, "msg_test002.json")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseOpenCodeFile(filePath)
if record == nil {
t.Fatal("expected non-nil record")
}
if record.CacheReadTokens != 0 {
t.Errorf("cache_read_tokens = %d, want 0", record.CacheReadTokens)
}
if record.CacheWriteTokens != 0 {
t.Errorf("cache_write_tokens = %d, want 0", record.CacheWriteTokens)
}
if record.Model != "gpt-4o" {
t.Errorf("model = %q, want %q", record.Model, "gpt-4o")
}
}
func TestParseOpenCodeFile_ZeroTokens(t *testing.T) {
tmp := t.TempDir()
// Message with zero tokens should return nil
content := `{
"id": "msg_test003",
"sessionID": "ses_test003",
"role": "assistant",
"time": {"created": 1768332037518},
"modelID": "test-model",
"tokens": {"input": 0, "output": 0, "reasoning": 0}
}`
filePath := filepath.Join(tmp, "msg_test003.json")
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
s := NewScanner(slog.Default())
record := s.parseOpenCodeFile(filePath)
if record != nil {
t.Errorf("expected nil record for zero tokens, got %+v", record)
}
}

View File

@@ -1,126 +0,0 @@
package usage
import (
"bufio"
"encoding/json"
"os"
"path/filepath"
"time"
"github.com/multica-ai/multica/server/pkg/agent"
)
// scanPi reads Pi session JSONL logs produced by the multica daemon and
// extracts token usage from assistant `message` events.
//
// The agent backend writes every run's session file into
// ~/.multica/pi-sessions/ (see agent.PiSessionDir). Pi appends events to
// the file as it runs; each assistant `message` event carries cumulative
// usage for that turn in the shape:
//
// {"type":"message","timestamp":"...",
// "message":{"role":"assistant","model":"...",
// "usage":{"input":N,"output":N,"cacheRead":N,"cacheWrite":N,...}}}
func (s *Scanner) scanPi() []Record {
root, err := agent.PiSessionDir()
if err != nil || root == "" {
return nil
}
if info, err := os.Stat(root); err != nil || !info.IsDir() {
return nil
}
files, err := filepath.Glob(filepath.Join(root, "*.jsonl"))
if err != nil {
s.logger.Debug("pi glob error", "error", err)
return nil
}
var records []Record
for _, f := range files {
records = append(records, s.parsePiFile(f)...)
}
return mergeRecords(records)
}
type piSessionLine struct {
Type string `json:"type"`
Timestamp string `json:"timestamp"`
Message *struct {
Role string `json:"role"`
Model string `json:"model"`
Usage *struct {
Input int64 `json:"input"`
Output int64 `json:"output"`
CacheRead int64 `json:"cacheRead"`
CacheWrite int64 `json:"cacheWrite"`
} `json:"usage"`
} `json:"message"`
}
// parsePiFile walks a single session file and emits one Record per
// assistant message with non-zero usage. Each assistant message carries
// the cost for that specific turn (not cumulative), so they can be
// summed by mergeRecords downstream.
func (s *Scanner) parsePiFile(path string) []Record {
f, err := os.Open(path)
if err != nil {
return nil
}
defer f.Close()
scanner := bufio.NewScanner(f)
scanner.Buffer(make([]byte, 0, 256*1024), 8*1024*1024)
var records []Record
for scanner.Scan() {
line := scanner.Bytes()
if !bytesContains(line, `"usage"`) {
continue
}
var entry piSessionLine
if err := json.Unmarshal(line, &entry); err != nil {
continue
}
if entry.Type != "message" || entry.Message == nil || entry.Message.Usage == nil {
continue
}
if entry.Message.Role != "assistant" {
continue
}
u := entry.Message.Usage
if u.Input == 0 && u.Output == 0 && u.CacheRead == 0 && u.CacheWrite == 0 {
continue
}
date := ""
if entry.Timestamp != "" {
if ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil {
date = ts.Local().Format("2006-01-02")
}
}
if date == "" {
info, err := os.Stat(path)
if err != nil {
continue
}
date = info.ModTime().Local().Format("2006-01-02")
}
model := entry.Message.Model
if model == "" {
model = "unknown"
}
records = append(records, Record{
Date: date,
Provider: "pi",
Model: model,
InputTokens: u.Input,
OutputTokens: u.Output,
CacheReadTokens: u.CacheRead,
CacheWriteTokens: u.CacheWrite,
})
}
return records
}

View File

@@ -1,82 +0,0 @@
package usage
import (
"log/slog"
)
// Record represents aggregated token usage for one (date, provider, model) tuple.
type Record struct {
Date string `json:"date"` // "2006-01-02"
Provider string `json:"provider"` // "claude" or "codex"
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// Scanner scans local CLI log files for token usage data.
type Scanner struct {
logger *slog.Logger
}
// NewScanner creates a new usage scanner.
func NewScanner(logger *slog.Logger) *Scanner {
return &Scanner{logger: logger}
}
// Scan reads local log files for all supported agent runtimes (Claude Code,
// Codex, OpenCode, OpenClaw, Hermes) and returns aggregated usage records
// keyed by (date, provider, model). Supports Claude Code, Codex, OpenCode,
// OpenClaw, Hermes, and Pi.
func (s *Scanner) Scan() []Record {
var records []Record
claudeRecords := s.scanClaude()
records = append(records, claudeRecords...)
codexRecords := s.scanCodex()
records = append(records, codexRecords...)
openCodeRecords := s.scanOpenCode()
records = append(records, openCodeRecords...)
openClawRecords := s.scanOpenClaw()
records = append(records, openClawRecords...)
hermesRecords := s.scanHermes()
records = append(records, hermesRecords...)
piRecords := s.scanPi()
records = append(records, piRecords...)
return records
}
// aggregation key for merging records.
type aggKey struct {
Date string
Provider string
Model string
}
func mergeRecords(records []Record) []Record {
m := make(map[aggKey]*Record)
for _, r := range records {
k := aggKey{Date: r.Date, Provider: r.Provider, Model: r.Model}
if existing, ok := m[k]; ok {
existing.InputTokens += r.InputTokens
existing.OutputTokens += r.OutputTokens
existing.CacheReadTokens += r.CacheReadTokens
existing.CacheWriteTokens += r.CacheWriteTokens
} else {
copy := r
m[k] = &copy
}
}
result := make([]Record, 0, len(m))
for _, r := range m {
result = append(result, *r)
}
return result
}

View File

@@ -59,16 +59,6 @@ func runtimeToResponse(rt db.AgentRuntime) AgentRuntimeResponse {
// Runtime Usage
// ---------------------------------------------------------------------------
type RuntimeUsageEntry struct {
Date string `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
type RuntimeUsageResponse struct {
RuntimeID string `json:"runtime_id"`
Date string `json:"date"`
@@ -80,48 +70,10 @@ type RuntimeUsageResponse struct {
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
// ReportRuntimeUsage receives usage data from the daemon.
func (h *Handler) ReportRuntimeUsage(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
if runtimeID == "" {
writeError(w, http.StatusBadRequest, "runtimeId is required")
return
}
// Verify the caller owns this runtime's workspace.
if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok {
return
}
var req struct {
Entries []RuntimeUsageEntry `json:"entries"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
for _, entry := range req.Entries {
date, err := time.Parse("2006-01-02", entry.Date)
if err != nil {
continue
}
h.Queries.UpsertRuntimeUsage(r.Context(), db.UpsertRuntimeUsageParams{
RuntimeID: parseUUID(runtimeID),
Date: pgtype.Date{Time: date, Valid: true},
Provider: entry.Provider,
Model: entry.Model,
InputTokens: entry.InputTokens,
OutputTokens: entry.OutputTokens,
CacheReadTokens: entry.CacheReadTokens,
CacheWriteTokens: entry.CacheWriteTokens,
})
}
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
}
// GetRuntimeUsage returns usage data for a runtime (protected route).
// GetRuntimeUsage returns daily token usage for a runtime, aggregated from
// per-task usage records captured by the daemon. This is scoped to
// Daemon-executed tasks only (i.e. excludes users' local CLI usage of the
// same tool).
func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
runtimeID := chi.URLParam(r, "runtimeId")
@@ -135,17 +87,11 @@ func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
return
}
days := 90
if d := r.URL.Query().Get("days"); d != "" {
if parsed, err := strconv.Atoi(d); err == nil && parsed > 0 && parsed <= 365 {
days = parsed
}
}
since := pgtype.Date{Time: time.Now().AddDate(0, 0, -days), Valid: true}
since := parseSinceParam(r, 90)
rows, err := h.Queries.ListRuntimeUsage(r.Context(), db.ListRuntimeUsageParams{
RuntimeID: parseUUID(runtimeID),
Date: since,
Since: since,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list usage")

View File

@@ -0,0 +1,117 @@
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
)
// TestGetRuntimeUsage_BucketsByUsageTime ensures a task that was enqueued on
// one calendar day but whose tokens were reported the next day (e.g. execution
// crossed midnight, or the task sat in the queue) is attributed to the day
// tokens were actually produced, not the enqueue day. It also verifies the
// ?days=N cutoff covers the full earliest calendar day, not just "now minus N
// days" which would clip the morning of that day.
func TestGetRuntimeUsage_BucketsByUsageTime(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
ctx := context.Background()
// Pick a runtime bound to the fixture workspace.
var runtimeID string
if err := testPool.QueryRow(ctx, `
SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1
`, testWorkspaceID).Scan(&runtimeID); err != nil {
t.Fatalf("fetch runtime: %v", err)
}
var agentID string
if err := testPool.QueryRow(ctx, `
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("fetch agent: %v", err)
}
// Create an issue for the tasks to reference.
var issueID string
if err := testPool.QueryRow(ctx, `
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
VALUES ($1, 'runtime usage test', $2, 'member')
RETURNING id
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
t.Fatalf("create issue: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
})
// enqueued yesterday 23:58 UTC, finished today 00:05 UTC — tokens belong to today.
now := time.Now().UTC()
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
yesterdayLate := today.Add(-2 * time.Minute)
todayEarly := today.Add(5 * time.Minute)
// Task that ran entirely yesterday around 05:00 — used to verify the
// ?days cutoff isn't clipping yesterday's morning.
yesterdayMorning := today.Add(-19 * time.Hour)
insertTaskWithUsage := func(enqueueAt, usageAt time.Time, inputTokens int64) string {
var taskID string
if err := testPool.QueryRow(ctx, `
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
VALUES ($1, $2, $3, 'completed', $4)
RETURNING id
`, agentID, issueID, runtimeID, enqueueAt).Scan(&taskID); err != nil {
t.Fatalf("insert task: %v", err)
}
if _, err := testPool.Exec(ctx, `
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
VALUES ($1, 'claude', 'claude-3-5-sonnet', $2, 0, $3)
`, taskID, inputTokens, usageAt); err != nil {
t.Fatalf("insert task_usage: %v", err)
}
t.Cleanup(func() {
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID)
})
return taskID
}
insertTaskWithUsage(yesterdayLate, todayEarly, 1000) // cross-midnight
insertTaskWithUsage(yesterdayMorning, yesterdayMorning, 2000) // full-day yesterday
// Call the handler with ?days=1 at whatever "now" is. That should include
// both today and yesterday in full.
w := httptest.NewRecorder()
req := newRequest("GET", "/api/runtimes/"+runtimeID+"/usage?days=1", nil)
req = withURLParam(req, "runtimeId", runtimeID)
testHandler.GetRuntimeUsage(w, req)
if w.Code != http.StatusOK {
t.Fatalf("GetRuntimeUsage: expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp []RuntimeUsageResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode response: %v", err)
}
byDate := make(map[string]int64)
for _, r := range resp {
byDate[r.Date] += r.InputTokens
}
todayKey := today.Format("2006-01-02")
yesterdayKey := today.Add(-24 * time.Hour).Format("2006-01-02")
// Cross-midnight task must attribute to today (tu.created_at), not yesterday
// (atq.created_at). Before the fix this was 0 on today / 1000 on yesterday.
if byDate[todayKey] != 1000 {
t.Errorf("cross-midnight task: today bucket expected 1000 input tokens, got %d (full map: %v)", byDate[todayKey], byDate)
}
// Yesterday's morning task must still be included — this is what breaks
// when ?days=N is interpreted as a rolling window instead of calendar days.
if byDate[yesterdayKey] != 2000 {
t.Errorf("yesterday morning task: yesterday bucket expected 2000 input tokens, got %d (full map: %v)", byDate[yesterdayKey], byDate)
}
}

View File

@@ -0,0 +1,16 @@
CREATE TABLE IF NOT EXISTS runtime_usage (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
runtime_id UUID NOT NULL REFERENCES agent_runtime(id) ON DELETE CASCADE,
date DATE NOT NULL,
provider TEXT NOT NULL,
model TEXT NOT NULL DEFAULT '',
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (runtime_id, date, provider, model)
);
CREATE INDEX IF NOT EXISTS idx_runtime_usage_runtime_date ON runtime_usage(runtime_id, date DESC);

View File

@@ -0,0 +1 @@
DROP TABLE IF EXISTS runtime_usage;

View File

@@ -336,20 +336,6 @@ type Project struct {
Priority string `json:"priority"`
}
type RuntimeUsage struct {
ID pgtype.UUID `json:"id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Date pgtype.Date `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadTokens int64 `json:"cache_read_tokens"`
CacheWriteTokens int64 `json:"cache_write_tokens"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type Skill struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`

View File

@@ -44,112 +44,29 @@ func (q *Queries) GetRuntimeTaskHourlyActivity(ctx context.Context, runtimeID pg
return items, nil
}
const getRuntimeUsageSummary = `-- name: GetRuntimeUsageSummary :many
SELECT provider, model,
SUM(input_tokens)::bigint AS total_input_tokens,
SUM(output_tokens)::bigint AS total_output_tokens,
SUM(cache_read_tokens)::bigint AS total_cache_read_tokens,
SUM(cache_write_tokens)::bigint AS total_cache_write_tokens
FROM runtime_usage
WHERE runtime_id = $1
GROUP BY provider, model
ORDER BY provider, model
`
type GetRuntimeUsageSummaryRow struct {
Provider string `json:"provider"`
Model string `json:"model"`
TotalInputTokens int64 `json:"total_input_tokens"`
TotalOutputTokens int64 `json:"total_output_tokens"`
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
}
func (q *Queries) GetRuntimeUsageSummary(ctx context.Context, runtimeID pgtype.UUID) ([]GetRuntimeUsageSummaryRow, error) {
rows, err := q.db.Query(ctx, getRuntimeUsageSummary, runtimeID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []GetRuntimeUsageSummaryRow{}
for rows.Next() {
var i GetRuntimeUsageSummaryRow
if err := rows.Scan(
&i.Provider,
&i.Model,
&i.TotalInputTokens,
&i.TotalOutputTokens,
&i.TotalCacheReadTokens,
&i.TotalCacheWriteTokens,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listRuntimeUsage = `-- name: ListRuntimeUsage :many
SELECT id, runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at, updated_at FROM runtime_usage
WHERE runtime_id = $1
AND date >= $2
ORDER BY date DESC
SELECT
DATE(tu.created_at) AS date,
tu.provider,
tu.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
WHERE atq.runtime_id = $1
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
GROUP BY DATE(tu.created_at), tu.provider, tu.model
ORDER BY DATE(tu.created_at) DESC, tu.provider, tu.model
`
type ListRuntimeUsageParams struct {
RuntimeID pgtype.UUID `json:"runtime_id"`
Date pgtype.Date `json:"date"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Since pgtype.Timestamptz `json:"since"`
}
func (q *Queries) ListRuntimeUsage(ctx context.Context, arg ListRuntimeUsageParams) ([]RuntimeUsage, error) {
rows, err := q.db.Query(ctx, listRuntimeUsage, arg.RuntimeID, arg.Date)
if err != nil {
return nil, err
}
defer rows.Close()
items := []RuntimeUsage{}
for rows.Next() {
var i RuntimeUsage
if err := rows.Scan(
&i.ID,
&i.RuntimeID,
&i.Date,
&i.Provider,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const upsertRuntimeUsage = `-- name: UpsertRuntimeUsage :exec
INSERT INTO runtime_usage (runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (runtime_id, date, provider, model)
DO UPDATE SET
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
updated_at = now()
`
type UpsertRuntimeUsageParams struct {
RuntimeID pgtype.UUID `json:"runtime_id"`
type ListRuntimeUsageRow struct {
Date pgtype.Date `json:"date"`
Provider string `json:"provider"`
Model string `json:"model"`
@@ -159,16 +76,34 @@ type UpsertRuntimeUsageParams struct {
CacheWriteTokens int64 `json:"cache_write_tokens"`
}
func (q *Queries) UpsertRuntimeUsage(ctx context.Context, arg UpsertRuntimeUsageParams) error {
_, err := q.db.Exec(ctx, upsertRuntimeUsage,
arg.RuntimeID,
arg.Date,
arg.Provider,
arg.Model,
arg.InputTokens,
arg.OutputTokens,
arg.CacheReadTokens,
arg.CacheWriteTokens,
)
return err
// Bucket by tu.created_at (usage report time, ~= task completion time), not
// atq.created_at (task enqueue time), so tasks that queue one day and execute
// the next are attributed to the day tokens were actually produced. The since
// cutoff is truncated to start-of-day so `days=N` yields full calendar days.
func (q *Queries) ListRuntimeUsage(ctx context.Context, arg ListRuntimeUsageParams) ([]ListRuntimeUsageRow, error) {
rows, err := q.db.Query(ctx, listRuntimeUsage, arg.RuntimeID, arg.Since)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ListRuntimeUsageRow{}
for rows.Next() {
var i ListRuntimeUsageRow
if err := rows.Scan(
&i.Date,
&i.Provider,
&i.Model,
&i.InputTokens,
&i.OutputTokens,
&i.CacheReadTokens,
&i.CacheWriteTokens,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}

View File

@@ -1,30 +1,22 @@
-- name: UpsertRuntimeUsage :exec
INSERT INTO runtime_usage (runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (runtime_id, date, provider, model)
DO UPDATE SET
input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
updated_at = now();
-- name: ListRuntimeUsage :many
SELECT * FROM runtime_usage
WHERE runtime_id = $1
AND date >= $2
ORDER BY date DESC;
-- name: GetRuntimeUsageSummary :many
SELECT provider, model,
SUM(input_tokens)::bigint AS total_input_tokens,
SUM(output_tokens)::bigint AS total_output_tokens,
SUM(cache_read_tokens)::bigint AS total_cache_read_tokens,
SUM(cache_write_tokens)::bigint AS total_cache_write_tokens
FROM runtime_usage
WHERE runtime_id = $1
GROUP BY provider, model
ORDER BY provider, model;
-- Bucket by tu.created_at (usage report time, ~= task completion time), not
-- atq.created_at (task enqueue time), so tasks that queue one day and execute
-- the next are attributed to the day tokens were actually produced. The since
-- cutoff is truncated to start-of-day so `days=N` yields full calendar days.
SELECT
DATE(tu.created_at) AS date,
tu.provider,
tu.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
WHERE atq.runtime_id = $1
AND tu.created_at >= DATE_TRUNC('day', @since::timestamptz)
GROUP BY DATE(tu.created_at), tu.provider, tu.model
ORDER BY DATE(tu.created_at) DESC, tu.provider, tu.model;
-- name: GetRuntimeTaskHourlyActivity :many
SELECT EXTRACT(HOUR FROM started_at)::int AS hour, COUNT(*)::int AS count