mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-26 17:09:14 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/6c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
054aab2a1a | ||
|
|
980b2bb7ef |
@@ -137,7 +137,6 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
|
||||
|
||||
r.Post("/runtimes/{runtimeId}/tasks/claim", h.ClaimTaskByRuntime)
|
||||
r.Get("/runtimes/{runtimeId}/tasks/pending", h.ListPendingTasksByRuntime)
|
||||
r.Post("/runtimes/{runtimeId}/usage", h.ReportRuntimeUsage)
|
||||
r.Post("/runtimes/{runtimeId}/ping/{pingId}/result", h.ReportPingResult)
|
||||
r.Post("/runtimes/{runtimeId}/update/{updateId}/result", h.ReportUpdateResult)
|
||||
|
||||
|
||||
@@ -140,12 +140,6 @@ func (c *Client) GetTaskStatus(ctx context.Context, taskID string) (string, erro
|
||||
return resp.Status, nil
|
||||
}
|
||||
|
||||
func (c *Client) ReportUsage(ctx context.Context, runtimeID string, entries []map[string]any) error {
|
||||
return c.postJSON(ctx, fmt.Sprintf("/api/daemon/runtimes/%s/usage", runtimeID), map[string]any{
|
||||
"entries": entries,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
// HeartbeatResponse contains the server's response to a heartbeat, including any pending actions.
|
||||
type HeartbeatResponse struct {
|
||||
Status string `json:"status"`
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/multica-ai/multica/server/internal/cli"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/execenv"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/repocache"
|
||||
"github.com/multica-ai/multica/server/internal/daemon/usage"
|
||||
"github.com/multica-ai/multica/server/pkg/agent"
|
||||
)
|
||||
|
||||
@@ -107,7 +106,6 @@ func (d *Daemon) Run(ctx context.Context) error {
|
||||
go d.workspaceSyncLoop(ctx)
|
||||
|
||||
go d.heartbeatLoop(ctx)
|
||||
go d.usageScanLoop(ctx)
|
||||
go d.gcLoop(ctx)
|
||||
go d.serveHealth(ctx, healthLn, time.Now())
|
||||
return d.pollLoop(ctx)
|
||||
@@ -176,17 +174,6 @@ func (d *Daemon) findRuntime(id string) *Runtime {
|
||||
return nil
|
||||
}
|
||||
|
||||
// providerToRuntimeMap returns a mapping from provider name to runtime ID.
|
||||
func (d *Daemon) providerToRuntimeMap() map[string]string {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
m := make(map[string]string)
|
||||
for id, rt := range d.runtimeIndex {
|
||||
m[rt.Provider] = id
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func (d *Daemon) registerRuntimesForWorkspace(ctx context.Context, workspaceID string) (*RegisterResponse, error) {
|
||||
var runtimes []map[string]string
|
||||
for name, entry := range d.cfg.Agents {
|
||||
@@ -669,62 +656,6 @@ func (d *Daemon) triggerRestart() {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) usageScanLoop(ctx context.Context) {
|
||||
scanner := usage.NewScanner(d.logger)
|
||||
|
||||
report := func() {
|
||||
records := scanner.Scan()
|
||||
if len(records) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Build provider -> runtime ID mapping from current state.
|
||||
providerToRuntime := d.providerToRuntimeMap()
|
||||
|
||||
// Group records by provider to send to the correct runtime.
|
||||
byProvider := make(map[string][]map[string]any)
|
||||
for _, r := range records {
|
||||
byProvider[r.Provider] = append(byProvider[r.Provider], map[string]any{
|
||||
"date": r.Date,
|
||||
"provider": r.Provider,
|
||||
"model": r.Model,
|
||||
"input_tokens": r.InputTokens,
|
||||
"output_tokens": r.OutputTokens,
|
||||
"cache_read_tokens": r.CacheReadTokens,
|
||||
"cache_write_tokens": r.CacheWriteTokens,
|
||||
})
|
||||
}
|
||||
|
||||
for provider, entries := range byProvider {
|
||||
runtimeID, ok := providerToRuntime[provider]
|
||||
if !ok {
|
||||
d.logger.Debug("no runtime for provider, skipping usage report", "provider", provider)
|
||||
continue
|
||||
}
|
||||
if err := d.client.ReportUsage(ctx, runtimeID, entries); err != nil {
|
||||
d.logger.Warn("usage report failed", "provider", provider, "runtime_id", runtimeID, "error", err)
|
||||
} else {
|
||||
d.logger.Info("usage reported", "provider", provider, "runtime_id", runtimeID, "entries", len(entries))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Initial scan on startup.
|
||||
report()
|
||||
|
||||
ticker := time.NewTicker(5 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
report()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Daemon) pollLoop(ctx context.Context) error {
|
||||
sem := make(chan struct{}, d.cfg.MaxConcurrentTasks)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
@@ -1,173 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// scanClaude reads Claude Code JSONL session logs from ~/.config/claude/projects/**/*.jsonl
|
||||
// and extracts token usage from "assistant" message lines.
|
||||
func (s *Scanner) scanClaude() []Record {
|
||||
roots := claudeLogRoots()
|
||||
if len(roots) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var allRecords []Record
|
||||
seen := make(map[string]bool) // dedup by "messageId:requestId"
|
||||
|
||||
for _, root := range roots {
|
||||
files, err := filepath.Glob(filepath.Join(root, "**", "*.jsonl"))
|
||||
if err != nil {
|
||||
s.logger.Debug("claude glob error", "root", root, "error", err)
|
||||
continue
|
||||
}
|
||||
// Also glob one level deeper for subagent logs
|
||||
deeper, _ := filepath.Glob(filepath.Join(root, "**", "**", "*.jsonl"))
|
||||
files = append(files, deeper...)
|
||||
|
||||
for _, f := range files {
|
||||
records := s.parseClaudeFile(f, seen)
|
||||
allRecords = append(allRecords, records...)
|
||||
}
|
||||
}
|
||||
|
||||
return mergeRecords(allRecords)
|
||||
}
|
||||
|
||||
// claudeLogRoots returns the directories to scan for Claude JSONL logs.
|
||||
func claudeLogRoots() []string {
|
||||
var roots []string
|
||||
|
||||
// Check CLAUDE_CONFIG_DIR env var
|
||||
if configDir := os.Getenv("CLAUDE_CONFIG_DIR"); configDir != "" {
|
||||
for _, dir := range strings.Split(configDir, ",") {
|
||||
dir = strings.TrimSpace(dir)
|
||||
if dir != "" {
|
||||
roots = append(roots, filepath.Join(dir, "projects"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Standard locations
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return roots
|
||||
}
|
||||
|
||||
candidates := []string{
|
||||
filepath.Join(home, ".config", "claude", "projects"),
|
||||
filepath.Join(home, ".claude", "projects"),
|
||||
}
|
||||
for _, dir := range candidates {
|
||||
if info, err := os.Stat(dir); err == nil && info.IsDir() {
|
||||
roots = append(roots, dir)
|
||||
}
|
||||
}
|
||||
|
||||
return roots
|
||||
}
|
||||
|
||||
// claudeLine represents the subset of a Claude JSONL line we care about.
|
||||
type claudeLine struct {
|
||||
Type string `json:"type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
RequestID string `json:"requestId"`
|
||||
Message *struct {
|
||||
ID string `json:"id"`
|
||||
Model string `json:"model"`
|
||||
Usage *struct {
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadInputTokens int64 `json:"cache_read_input_tokens"`
|
||||
CacheCreationInputTokens int64 `json:"cache_creation_input_tokens"`
|
||||
} `json:"usage"`
|
||||
} `json:"message"`
|
||||
}
|
||||
|
||||
func (s *Scanner) parseClaudeFile(path string, seen map[string]bool) []Record {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var records []Record
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024) // up to 1MB lines
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
|
||||
// Fast pre-filter: skip lines that can't contain what we need
|
||||
if !bytesContains(line, `"type":"assistant"`) && !bytesContains(line, `"type": "assistant"`) {
|
||||
continue
|
||||
}
|
||||
if !bytesContains(line, `"usage"`) {
|
||||
continue
|
||||
}
|
||||
|
||||
var entry claudeLine
|
||||
if err := json.Unmarshal(line, &entry); err != nil {
|
||||
continue
|
||||
}
|
||||
if entry.Type != "assistant" || entry.Message == nil || entry.Message.Usage == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Dedup: Claude streaming produces multiple lines with same message.id + requestId
|
||||
// with cumulative token counts. Take only the first occurrence.
|
||||
dedupKey := entry.Message.ID + ":" + entry.RequestID
|
||||
if dedupKey != ":" && seen[dedupKey] {
|
||||
continue
|
||||
}
|
||||
if dedupKey != ":" {
|
||||
seen[dedupKey] = true
|
||||
}
|
||||
|
||||
// Parse timestamp to get date
|
||||
ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp)
|
||||
if err != nil {
|
||||
ts, err = time.Parse(time.RFC3339, entry.Timestamp)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
model := entry.Message.Model
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
|
||||
records = append(records, Record{
|
||||
Date: ts.Local().Format("2006-01-02"),
|
||||
Provider: "claude",
|
||||
Model: normalizeClaudeModel(model),
|
||||
InputTokens: entry.Message.Usage.InputTokens,
|
||||
OutputTokens: entry.Message.Usage.OutputTokens,
|
||||
CacheReadTokens: entry.Message.Usage.CacheReadInputTokens,
|
||||
CacheWriteTokens: entry.Message.Usage.CacheCreationInputTokens,
|
||||
})
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
// normalizeClaudeModel strips common prefixes/suffixes from model names.
|
||||
func normalizeClaudeModel(model string) string {
|
||||
// Strip "anthropic." prefix
|
||||
model = strings.TrimPrefix(model, "anthropic.")
|
||||
// Strip Vertex AI prefixes like "us.anthropic."
|
||||
if idx := strings.LastIndex(model, "anthropic."); idx >= 0 {
|
||||
model = model[idx+len("anthropic."):]
|
||||
}
|
||||
return model
|
||||
}
|
||||
|
||||
func bytesContains(data []byte, substr string) bool {
|
||||
return strings.Contains(string(data), substr)
|
||||
}
|
||||
@@ -1,178 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// scanCodex reads Codex CLI session logs from ~/.codex/sessions/YYYY/MM/DD/*.jsonl
|
||||
// and extracts token usage from "token_count" event lines.
|
||||
func (s *Scanner) scanCodex() []Record {
|
||||
root := codexLogRoot()
|
||||
if root == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Glob for session files: ~/.codex/sessions/YYYY/MM/DD/rollout-*.jsonl
|
||||
pattern := filepath.Join(root, "*", "*", "*", "*.jsonl")
|
||||
files, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
s.logger.Debug("codex glob error", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var allRecords []Record
|
||||
for _, f := range files {
|
||||
record := s.parseCodexFile(f)
|
||||
if record != nil {
|
||||
allRecords = append(allRecords, *record)
|
||||
}
|
||||
}
|
||||
|
||||
return mergeRecords(allRecords)
|
||||
}
|
||||
|
||||
// codexLogRoot returns the Codex sessions directory.
|
||||
func codexLogRoot() string {
|
||||
if codexHome := os.Getenv("CODEX_HOME"); codexHome != "" {
|
||||
dir := filepath.Join(codexHome, "sessions")
|
||||
if info, err := os.Stat(dir); err == nil && info.IsDir() {
|
||||
return dir
|
||||
}
|
||||
}
|
||||
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
dir := filepath.Join(home, ".codex", "sessions")
|
||||
if info, err := os.Stat(dir); err == nil && info.IsDir() {
|
||||
return dir
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// codexEvent represents a line in a Codex session JSONL file.
|
||||
type codexEvent struct {
|
||||
Type string `json:"type"`
|
||||
Payload *codexPayload `json:"payload"`
|
||||
}
|
||||
|
||||
type codexPayload struct {
|
||||
Type string `json:"type"`
|
||||
Info *codexTokenInfo `json:"info"`
|
||||
Model string `json:"model"` // present in turn_context events
|
||||
}
|
||||
|
||||
type codexTokenInfo struct {
|
||||
TotalTokenUsage *codexTokenUsage `json:"total_token_usage"`
|
||||
LastTokenUsage *codexTokenUsage `json:"last_token_usage"`
|
||||
Model string `json:"model"`
|
||||
}
|
||||
|
||||
type codexTokenUsage struct {
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CachedInputTokens int64 `json:"cached_input_tokens"`
|
||||
CacheReadInputTokens int64 `json:"cache_read_input_tokens"`
|
||||
ReasoningOutputTokens int64 `json:"reasoning_output_tokens"`
|
||||
TotalTokens int64 `json:"total_tokens"`
|
||||
}
|
||||
|
||||
// parseCodexFile extracts the final cumulative token_count from a Codex session file.
|
||||
// Returns nil if no usage data found.
|
||||
func (s *Scanner) parseCodexFile(path string) *Record {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
// Extract date from directory path: .../sessions/YYYY/MM/DD/file.jsonl
|
||||
date := extractDateFromPath(path)
|
||||
if date == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
var lastUsage *codexTokenUsage
|
||||
var lastModel string
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
|
||||
// Fast pre-filter: only parse lines with token_count or turn_context
|
||||
hasTokenCount := bytesContains(line, `"token_count"`)
|
||||
hasTurnContext := bytesContains(line, `"turn_context"`)
|
||||
if !hasTokenCount && !hasTurnContext {
|
||||
continue
|
||||
}
|
||||
|
||||
var evt codexEvent
|
||||
if err := json.Unmarshal(line, &evt); err != nil || evt.Payload == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Track model from turn_context events
|
||||
if evt.Type == "turn_context" && evt.Payload.Model != "" {
|
||||
lastModel = evt.Payload.Model
|
||||
continue
|
||||
}
|
||||
|
||||
// Extract token usage from token_count events
|
||||
if evt.Payload.Type == "token_count" && evt.Payload.Info != nil {
|
||||
usage := evt.Payload.Info.TotalTokenUsage
|
||||
if usage == nil {
|
||||
usage = evt.Payload.Info.LastTokenUsage
|
||||
}
|
||||
if usage != nil {
|
||||
lastUsage = usage
|
||||
if evt.Payload.Info.Model != "" {
|
||||
lastModel = evt.Payload.Info.Model
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if lastUsage == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
model := lastModel
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
|
||||
cachedTokens := lastUsage.CachedInputTokens
|
||||
if cachedTokens == 0 {
|
||||
cachedTokens = lastUsage.CacheReadInputTokens
|
||||
}
|
||||
|
||||
return &Record{
|
||||
Date: date,
|
||||
Provider: "codex",
|
||||
Model: model,
|
||||
InputTokens: lastUsage.InputTokens,
|
||||
OutputTokens: lastUsage.OutputTokens + lastUsage.ReasoningOutputTokens,
|
||||
CacheReadTokens: cachedTokens,
|
||||
CacheWriteTokens: 0,
|
||||
}
|
||||
}
|
||||
|
||||
// extractDateFromPath extracts YYYY-MM-DD from a path like .../sessions/2026/03/26/file.jsonl
|
||||
func extractDateFromPath(path string) string {
|
||||
parts := strings.Split(filepath.ToSlash(path), "/")
|
||||
// Look for sessions/YYYY/MM/DD pattern
|
||||
for i := 0; i < len(parts)-3; i++ {
|
||||
if parts[i] == "sessions" && len(parts[i+1]) == 4 && len(parts[i+2]) == 2 && len(parts[i+3]) == 2 {
|
||||
return parts[i+1] + "-" + parts[i+2] + "-" + parts[i+3]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -1,140 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseCodexFile(t *testing.T) {
|
||||
// Create a temp directory structure: sessions/YYYY/MM/DD/file.jsonl
|
||||
tmp := t.TempDir()
|
||||
sessionsDir := filepath.Join(tmp, "sessions", "2026", "01", "14")
|
||||
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Real Codex JSONL format with turn_context and token_count events
|
||||
content := `{"timestamp":"2026-01-13T17:41:31.666Z","type":"turn_context","payload":{"cwd":"/tmp","model":"gpt-5.2-codex","effort":"high"}}
|
||||
{"timestamp":"2026-01-13T17:41:32.916Z","type":"event_msg","payload":{"type":"token_count","info":null,"rate_limits":{"primary":{"used_percent":24.0}}}}
|
||||
{"timestamp":"2026-01-13T17:44:06.217Z","type":"event_msg","payload":{"type":"token_count","info":{"total_token_usage":{"input_tokens":328894,"cached_input_tokens":287872,"output_tokens":3071,"reasoning_output_tokens":960,"total_tokens":331965},"last_token_usage":{"input_tokens":24525,"cached_input_tokens":3200,"output_tokens":1815,"reasoning_output_tokens":960,"total_tokens":26340},"model_context_window":258400},"rate_limits":{"primary":{"used_percent":26.0}}}}
|
||||
`
|
||||
|
||||
filePath := filepath.Join(sessionsDir, "rollout-test.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseCodexFile(filePath)
|
||||
|
||||
if record == nil {
|
||||
t.Fatal("expected non-nil record")
|
||||
}
|
||||
|
||||
if record.Date != "2026-01-14" {
|
||||
t.Errorf("date = %q, want %q", record.Date, "2026-01-14")
|
||||
}
|
||||
if record.Provider != "codex" {
|
||||
t.Errorf("provider = %q, want %q", record.Provider, "codex")
|
||||
}
|
||||
if record.Model != "gpt-5.2-codex" {
|
||||
t.Errorf("model = %q, want %q", record.Model, "gpt-5.2-codex")
|
||||
}
|
||||
if record.InputTokens != 328894 {
|
||||
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 328894)
|
||||
}
|
||||
// output_tokens + reasoning_output_tokens
|
||||
if record.OutputTokens != 3071+960 {
|
||||
t.Errorf("output_tokens = %d, want %d", record.OutputTokens, 3071+960)
|
||||
}
|
||||
if record.CacheReadTokens != 287872 {
|
||||
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 287872)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCodexFile_NullInfo(t *testing.T) {
|
||||
// When all token_count events have info:null, should return nil
|
||||
tmp := t.TempDir()
|
||||
sessionsDir := filepath.Join(tmp, "sessions", "2026", "01", "14")
|
||||
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
content := `{"timestamp":"2026-01-13T17:41:32.916Z","type":"event_msg","payload":{"type":"token_count","info":null}}
|
||||
`
|
||||
filePath := filepath.Join(sessionsDir, "rollout-test.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseCodexFile(filePath)
|
||||
|
||||
if record != nil {
|
||||
t.Errorf("expected nil record for null info, got %+v", record)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCodexFile_LastTokenUsageFallback(t *testing.T) {
|
||||
// When total_token_usage is absent but last_token_usage exists
|
||||
tmp := t.TempDir()
|
||||
sessionsDir := filepath.Join(tmp, "sessions", "2026", "03", "27")
|
||||
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
content := `{"timestamp":"2026-03-27T10:00:00Z","type":"turn_context","payload":{"model":"gpt-5"}}
|
||||
{"timestamp":"2026-03-27T10:01:00Z","type":"event_msg","payload":{"type":"token_count","info":{"last_token_usage":{"input_tokens":1000,"cached_input_tokens":200,"output_tokens":500}}}}
|
||||
`
|
||||
filePath := filepath.Join(sessionsDir, "rollout-test.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseCodexFile(filePath)
|
||||
|
||||
if record == nil {
|
||||
t.Fatal("expected non-nil record")
|
||||
}
|
||||
if record.InputTokens != 1000 {
|
||||
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 1000)
|
||||
}
|
||||
if record.OutputTokens != 500 {
|
||||
t.Errorf("output_tokens = %d, want %d", record.OutputTokens, 500)
|
||||
}
|
||||
if record.CacheReadTokens != 200 {
|
||||
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 200)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseCodexFile_CacheReadInputTokens(t *testing.T) {
|
||||
// Test the alternative field name cache_read_input_tokens
|
||||
tmp := t.TempDir()
|
||||
sessionsDir := filepath.Join(tmp, "sessions", "2026", "03", "27")
|
||||
if err := os.MkdirAll(sessionsDir, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
content := `{"timestamp":"2026-03-27T10:00:00Z","type":"event_msg","payload":{"type":"token_count","info":{"total_token_usage":{"input_tokens":5000,"cache_read_input_tokens":3000,"output_tokens":800},"model":"gpt-5.2-codex"}}}
|
||||
`
|
||||
filePath := filepath.Join(sessionsDir, "rollout-test.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseCodexFile(filePath)
|
||||
|
||||
if record == nil {
|
||||
t.Fatal("expected non-nil record")
|
||||
}
|
||||
if record.CacheReadTokens != 3000 {
|
||||
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 3000)
|
||||
}
|
||||
if record.Model != "gpt-5.2-codex" {
|
||||
t.Errorf("model = %q, want %q", record.Model, "gpt-5.2-codex")
|
||||
}
|
||||
}
|
||||
@@ -1,173 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
// scanHermes reads Hermes JSONL session files from
|
||||
// ~/.hermes/sessions/*.jsonl
|
||||
// and extracts token usage from assistant message and usage_update entries.
|
||||
//
|
||||
// Hermes communicates via the ACP (Agent Communication Protocol) and logs
|
||||
// session events as JSONL. Token usage appears in:
|
||||
// - "assistant" messages with a "usage" field
|
||||
// - "usage_update" notification entries with cumulative token snapshots
|
||||
func (s *Scanner) scanHermes() []Record {
|
||||
root := hermesSessionRoot()
|
||||
if root == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Glob for session files: sessions/*.jsonl
|
||||
pattern := filepath.Join(root, "*.jsonl")
|
||||
files, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
s.logger.Debug("hermes glob error", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var allRecords []Record
|
||||
for _, f := range files {
|
||||
record := s.parseHermesFile(f)
|
||||
if record != nil {
|
||||
allRecords = append(allRecords, *record)
|
||||
}
|
||||
}
|
||||
|
||||
return mergeRecords(allRecords)
|
||||
}
|
||||
|
||||
// hermesSessionRoot returns the Hermes sessions directory.
|
||||
func hermesSessionRoot() string {
|
||||
if hermesHome := os.Getenv("HERMES_HOME"); hermesHome != "" {
|
||||
dir := filepath.Join(hermesHome, "sessions")
|
||||
if info, err := os.Stat(dir); err == nil && info.IsDir() {
|
||||
return dir
|
||||
}
|
||||
}
|
||||
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
// Check common locations.
|
||||
candidates := []string{
|
||||
filepath.Join(home, ".hermes", "sessions"),
|
||||
filepath.Join(home, ".local", "share", "hermes", "sessions"),
|
||||
filepath.Join(home, ".config", "hermes", "sessions"),
|
||||
}
|
||||
for _, dir := range candidates {
|
||||
if info, err := os.Stat(dir); err == nil && info.IsDir() {
|
||||
return dir
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// hermesLine represents a line in a Hermes session JSONL file.
|
||||
// Hermes session logs contain both message events and notification events.
|
||||
type hermesLine struct {
|
||||
Type string `json:"type"`
|
||||
Timestamp string `json:"timestamp"` // RFC3339
|
||||
Model string `json:"model"`
|
||||
Usage *struct {
|
||||
InputTokens int64 `json:"inputTokens"`
|
||||
OutputTokens int64 `json:"outputTokens"`
|
||||
CachedReadTokens int64 `json:"cachedReadTokens"`
|
||||
ThoughtTokens int64 `json:"thoughtTokens"`
|
||||
} `json:"usage"`
|
||||
}
|
||||
|
||||
// parseHermesFile extracts the final cumulative token usage from a Hermes session file.
|
||||
// Hermes usage_update events are cumulative snapshots — the last one in the file
|
||||
// represents the total usage for the session. Returns nil if no usage data found.
|
||||
func (s *Scanner) parseHermesFile(path string) *Record {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var lastUsage *struct {
|
||||
InputTokens int64 `json:"inputTokens"`
|
||||
OutputTokens int64 `json:"outputTokens"`
|
||||
CachedReadTokens int64 `json:"cachedReadTokens"`
|
||||
ThoughtTokens int64 `json:"thoughtTokens"`
|
||||
}
|
||||
var lastModel string
|
||||
var lastTimestamp string
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
|
||||
// Fast pre-filter.
|
||||
if !bytesContains(line, `"usage"`) && !bytesContains(line, `"inputTokens"`) {
|
||||
continue
|
||||
}
|
||||
|
||||
var entry hermesLine
|
||||
if err := json.Unmarshal(line, &entry); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if entry.Usage == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Take the latest usage snapshot (cumulative).
|
||||
lastUsage = entry.Usage
|
||||
if entry.Model != "" {
|
||||
lastModel = entry.Model
|
||||
}
|
||||
if entry.Timestamp != "" {
|
||||
lastTimestamp = entry.Timestamp
|
||||
}
|
||||
}
|
||||
|
||||
if lastUsage == nil {
|
||||
return nil
|
||||
}
|
||||
if lastUsage.InputTokens == 0 && lastUsage.OutputTokens == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Parse timestamp for date.
|
||||
var date string
|
||||
if lastTimestamp != "" {
|
||||
if ts, err := time.Parse(time.RFC3339Nano, lastTimestamp); err == nil {
|
||||
date = ts.Local().Format("2006-01-02")
|
||||
} else if ts, err := time.Parse(time.RFC3339, lastTimestamp); err == nil {
|
||||
date = ts.Local().Format("2006-01-02")
|
||||
}
|
||||
}
|
||||
if date == "" {
|
||||
// Fall back to file modification time.
|
||||
if info, err := os.Stat(path); err == nil {
|
||||
date = info.ModTime().Local().Format("2006-01-02")
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
model := lastModel
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
|
||||
return &Record{
|
||||
Date: date,
|
||||
Provider: "hermes",
|
||||
Model: model,
|
||||
InputTokens: lastUsage.InputTokens,
|
||||
OutputTokens: lastUsage.OutputTokens + lastUsage.ThoughtTokens,
|
||||
CacheReadTokens: lastUsage.CachedReadTokens,
|
||||
}
|
||||
}
|
||||
@@ -1,99 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseHermesFile(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
// Hermes session JSONL with usage_update entries (cumulative snapshots)
|
||||
content := `{"type":"session_start","timestamp":"2026-04-10T14:00:00.000Z","model":"claude-sonnet-4-5"}
|
||||
{"type":"usage_update","timestamp":"2026-04-10T14:01:00.000Z","model":"claude-sonnet-4-5","usage":{"inputTokens":1000,"outputTokens":200,"cachedReadTokens":500,"thoughtTokens":50}}
|
||||
{"type":"usage_update","timestamp":"2026-04-10T14:02:00.000Z","model":"claude-sonnet-4-5","usage":{"inputTokens":3000,"outputTokens":600,"cachedReadTokens":1500,"thoughtTokens":100}}
|
||||
`
|
||||
|
||||
filePath := filepath.Join(tmp, "session-001.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseHermesFile(filePath)
|
||||
|
||||
if record == nil {
|
||||
t.Fatal("expected non-nil record")
|
||||
}
|
||||
|
||||
if record.Provider != "hermes" {
|
||||
t.Errorf("provider = %q, want %q", record.Provider, "hermes")
|
||||
}
|
||||
if record.Model != "claude-sonnet-4-5" {
|
||||
t.Errorf("model = %q, want %q", record.Model, "claude-sonnet-4-5")
|
||||
}
|
||||
if record.Date != "2026-04-10" {
|
||||
t.Errorf("date = %q, want %q", record.Date, "2026-04-10")
|
||||
}
|
||||
// Should take the last (cumulative) snapshot
|
||||
if record.InputTokens != 3000 {
|
||||
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 3000)
|
||||
}
|
||||
// output_tokens + thought_tokens
|
||||
if record.OutputTokens != 700 {
|
||||
t.Errorf("output_tokens = %d, want %d (600 + 100)", record.OutputTokens, 700)
|
||||
}
|
||||
if record.CacheReadTokens != 1500 {
|
||||
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 1500)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseHermesFile_NoUsage(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
content := `{"type":"session_start","timestamp":"2026-04-10T14:00:00.000Z","model":"test-model"}
|
||||
{"type":"message","timestamp":"2026-04-10T14:01:00.000Z","content":"hello"}
|
||||
`
|
||||
|
||||
filePath := filepath.Join(tmp, "session-empty.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseHermesFile(filePath)
|
||||
|
||||
if record != nil {
|
||||
t.Errorf("expected nil record for no usage data, got %+v", record)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseHermesFile_SingleUsage(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
content := `{"type":"usage_update","timestamp":"2026-04-10T14:01:00.000Z","model":"hermes-3","usage":{"inputTokens":500,"outputTokens":100,"cachedReadTokens":0,"thoughtTokens":0}}
|
||||
`
|
||||
|
||||
filePath := filepath.Join(tmp, "session-single.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseHermesFile(filePath)
|
||||
|
||||
if record == nil {
|
||||
t.Fatal("expected non-nil record")
|
||||
}
|
||||
if record.InputTokens != 500 {
|
||||
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 500)
|
||||
}
|
||||
if record.OutputTokens != 100 {
|
||||
t.Errorf("output_tokens = %d, want %d", record.OutputTokens, 100)
|
||||
}
|
||||
if record.Model != "hermes-3" {
|
||||
t.Errorf("model = %q, want %q", record.Model, "hermes-3")
|
||||
}
|
||||
}
|
||||
@@ -1,154 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// scanOpenClaw reads OpenClaw JSONL session files from
|
||||
// ~/.openclaw/agents/*/sessions/*.jsonl
|
||||
// and extracts token usage from assistant message entries.
|
||||
func (s *Scanner) scanOpenClaw() []Record {
|
||||
root := openClawSessionRoot()
|
||||
if root == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Glob for session files: agents/*/sessions/*.jsonl
|
||||
pattern := filepath.Join(root, "*", "sessions", "*.jsonl")
|
||||
files, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
s.logger.Debug("openclaw glob error", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var allRecords []Record
|
||||
for _, f := range files {
|
||||
records := s.parseOpenClawFile(f)
|
||||
allRecords = append(allRecords, records...)
|
||||
}
|
||||
|
||||
return mergeRecords(allRecords)
|
||||
}
|
||||
|
||||
// openClawSessionRoot returns the OpenClaw agents directory.
|
||||
func openClawSessionRoot() string {
|
||||
if openclawHome := os.Getenv("OPENCLAW_HOME"); openclawHome != "" {
|
||||
dir := filepath.Join(openclawHome, "agents")
|
||||
if info, err := os.Stat(dir); err == nil && info.IsDir() {
|
||||
return dir
|
||||
}
|
||||
}
|
||||
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
dir := filepath.Join(home, ".openclaw", "agents")
|
||||
if info, err := os.Stat(dir); err == nil && info.IsDir() {
|
||||
return dir
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// openClawLine represents a line in an OpenClaw JSONL session file.
|
||||
type openClawLine struct {
|
||||
Type string `json:"type"`
|
||||
Timestamp string `json:"timestamp"` // RFC3339
|
||||
Message *struct {
|
||||
Role string `json:"role"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
Usage *struct {
|
||||
Input int64 `json:"input"`
|
||||
Output int64 `json:"output"`
|
||||
CacheRead int64 `json:"cacheRead"`
|
||||
CacheWrite int64 `json:"cacheWrite"`
|
||||
} `json:"usage"`
|
||||
} `json:"message"`
|
||||
}
|
||||
|
||||
// parseOpenClawFile extracts token usage records from an OpenClaw session JSONL file.
|
||||
func (s *Scanner) parseOpenClawFile(path string) []Record {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
var records []Record
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 0, 256*1024), 1024*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
|
||||
// Fast pre-filter: skip lines that don't contain relevant data.
|
||||
if !bytesContains(line, `"usage"`) {
|
||||
continue
|
||||
}
|
||||
if !bytesContains(line, `"assistant"`) {
|
||||
continue
|
||||
}
|
||||
|
||||
var entry openClawLine
|
||||
if err := json.Unmarshal(line, &entry); err != nil {
|
||||
continue
|
||||
}
|
||||
if entry.Type != "message" || entry.Message == nil || entry.Message.Role != "assistant" || entry.Message.Usage == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
u := entry.Message.Usage
|
||||
if u.Input == 0 && u.Output == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse timestamp to get date.
|
||||
ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp)
|
||||
if err != nil {
|
||||
ts, err = time.Parse(time.RFC3339, entry.Timestamp)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
model := entry.Message.Model
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
|
||||
// Construct provider string: if the session has a provider, use "openclaw/<provider>"
|
||||
// for attribution, but the Record.Provider field should be "openclaw".
|
||||
provider := "openclaw"
|
||||
_ = entry.Message.Provider // available but not used in provider field
|
||||
|
||||
records = append(records, Record{
|
||||
Date: ts.Local().Format("2006-01-02"),
|
||||
Provider: provider,
|
||||
Model: normalizeOpenClawModel(entry.Message.Provider, model),
|
||||
InputTokens: u.Input,
|
||||
OutputTokens: u.Output,
|
||||
CacheReadTokens: u.CacheRead,
|
||||
CacheWriteTokens: u.CacheWrite,
|
||||
})
|
||||
}
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
// normalizeOpenClawModel returns a model identifier. If the provider is known,
|
||||
// it prefixes the model name for clarity (e.g. "deepseek/deepseek-chat").
|
||||
func normalizeOpenClawModel(provider, model string) string {
|
||||
provider = strings.TrimSpace(provider)
|
||||
model = strings.TrimSpace(model)
|
||||
if provider != "" && !strings.Contains(model, "/") {
|
||||
return provider + "/" + model
|
||||
}
|
||||
return model
|
||||
}
|
||||
@@ -1,93 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseOpenClawFile(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
// Real OpenClaw session JSONL with session header, model_change, and assistant messages
|
||||
content := `{"type":"session","version":3,"id":"multica-test","timestamp":"2026-04-11T13:53:05.847Z"}
|
||||
{"type":"model_change","id":"03c18aae","timestamp":"2026-04-11T13:53:05.855Z","provider":"deepseek","modelId":"deepseek-chat"}
|
||||
{"type":"message","id":"162ce1b7","parentId":"c90ecabe","timestamp":"2026-04-11T13:53:09.986Z","message":{"role":"assistant","content":[{"type":"text","text":"I'll start by getting the issue details."}],"api":"openai-completions","provider":"deepseek","model":"deepseek-chat","usage":{"input":133,"output":81,"cacheRead":16448,"cacheWrite":0,"totalTokens":16662}}}
|
||||
{"type":"message","id":"3c063300","parentId":"50e4feb6","timestamp":"2026-04-11T13:53:14.750Z","message":{"role":"assistant","content":[{"type":"text","text":"Let me check the workspace."}],"provider":"deepseek","model":"deepseek-chat","usage":{"input":286,"output":94,"cacheRead":16448,"cacheWrite":0}}}
|
||||
{"type":"message","id":"user001","timestamp":"2026-04-11T13:54:00.000Z","message":{"role":"user","content":[{"type":"text","text":"hello"}]}}
|
||||
`
|
||||
|
||||
filePath := filepath.Join(tmp, "session.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
records := s.parseOpenClawFile(filePath)
|
||||
|
||||
if len(records) != 2 {
|
||||
t.Fatalf("expected 2 records, got %d", len(records))
|
||||
}
|
||||
|
||||
r := records[0]
|
||||
if r.Provider != "openclaw" {
|
||||
t.Errorf("provider = %q, want %q", r.Provider, "openclaw")
|
||||
}
|
||||
if r.Model != "deepseek/deepseek-chat" {
|
||||
t.Errorf("model = %q, want %q", r.Model, "deepseek/deepseek-chat")
|
||||
}
|
||||
if r.InputTokens != 133 {
|
||||
t.Errorf("input_tokens = %d, want %d", r.InputTokens, 133)
|
||||
}
|
||||
if r.OutputTokens != 81 {
|
||||
t.Errorf("output_tokens = %d, want %d", r.OutputTokens, 81)
|
||||
}
|
||||
if r.CacheReadTokens != 16448 {
|
||||
t.Errorf("cache_read_tokens = %d, want %d", r.CacheReadTokens, 16448)
|
||||
}
|
||||
if r.Date != "2026-04-11" {
|
||||
t.Errorf("date = %q, want %q", r.Date, "2026-04-11")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseOpenClawFile_NoUsage(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
// Session with no assistant messages containing usage
|
||||
content := `{"type":"session","version":3,"id":"empty-session","timestamp":"2026-04-11T13:53:05.847Z"}
|
||||
{"type":"message","id":"user001","timestamp":"2026-04-11T13:54:00.000Z","message":{"role":"user","content":[{"type":"text","text":"hello"}]}}
|
||||
`
|
||||
|
||||
filePath := filepath.Join(tmp, "session.jsonl")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
records := s.parseOpenClawFile(filePath)
|
||||
|
||||
if len(records) != 0 {
|
||||
t.Errorf("expected 0 records, got %d", len(records))
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeOpenClawModel(t *testing.T) {
|
||||
tests := []struct {
|
||||
provider string
|
||||
model string
|
||||
want string
|
||||
}{
|
||||
{"deepseek", "deepseek-chat", "deepseek/deepseek-chat"},
|
||||
{"anthropic", "claude-sonnet-4-5", "anthropic/claude-sonnet-4-5"},
|
||||
{"", "gpt-4o", "gpt-4o"},
|
||||
{"openai", "openai/gpt-4o", "openai/gpt-4o"}, // already has /
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
got := normalizeOpenClawModel(tt.provider, tt.model)
|
||||
if got != tt.want {
|
||||
t.Errorf("normalizeOpenClawModel(%q, %q) = %q, want %q", tt.provider, tt.model, got, tt.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,122 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
// scanOpenCode reads OpenCode message JSON files from
|
||||
// ~/.local/share/opencode/storage/message/ses_*/*.json
|
||||
// and extracts token usage from assistant messages.
|
||||
func (s *Scanner) scanOpenCode() []Record {
|
||||
root := openCodeStorageRoot()
|
||||
if root == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Glob for message files: storage/message/ses_*/*.json
|
||||
pattern := filepath.Join(root, "ses_*", "*.json")
|
||||
files, err := filepath.Glob(pattern)
|
||||
if err != nil {
|
||||
s.logger.Debug("opencode glob error", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var allRecords []Record
|
||||
for _, f := range files {
|
||||
record := s.parseOpenCodeFile(f)
|
||||
if record != nil {
|
||||
allRecords = append(allRecords, *record)
|
||||
}
|
||||
}
|
||||
|
||||
return mergeRecords(allRecords)
|
||||
}
|
||||
|
||||
// openCodeStorageRoot returns the OpenCode message storage directory.
|
||||
func openCodeStorageRoot() string {
|
||||
// Check XDG_DATA_HOME first, then fall back to ~/.local/share
|
||||
dataHome := os.Getenv("XDG_DATA_HOME")
|
||||
if dataHome == "" {
|
||||
home, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
dataHome = filepath.Join(home, ".local", "share")
|
||||
}
|
||||
|
||||
dir := filepath.Join(dataHome, "opencode", "storage", "message")
|
||||
if info, err := os.Stat(dir); err == nil && info.IsDir() {
|
||||
return dir
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// openCodeMessage represents the subset of an OpenCode message JSON file we need.
|
||||
type openCodeMessage struct {
|
||||
Role string `json:"role"`
|
||||
ModelID string `json:"modelID"`
|
||||
ProviderID string `json:"providerID"`
|
||||
Time *struct {
|
||||
Created int64 `json:"created"` // unix milliseconds
|
||||
} `json:"time"`
|
||||
Tokens *struct {
|
||||
Input int64 `json:"input"`
|
||||
Output int64 `json:"output"`
|
||||
Reasoning int64 `json:"reasoning"`
|
||||
Cache *struct {
|
||||
Read int64 `json:"read"`
|
||||
Write int64 `json:"write"`
|
||||
} `json:"cache"`
|
||||
} `json:"tokens"`
|
||||
}
|
||||
|
||||
// parseOpenCodeFile reads a single OpenCode message JSON file and returns a Record
|
||||
// if it contains assistant token usage. Returns nil otherwise.
|
||||
func (s *Scanner) parseOpenCodeFile(path string) *Record {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var msg openCodeMessage
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Only count assistant messages with token usage.
|
||||
if msg.Role != "assistant" || msg.Tokens == nil || msg.Time == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Skip messages with no meaningful token usage.
|
||||
if msg.Tokens.Input == 0 && msg.Tokens.Output == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ts := time.UnixMilli(msg.Time.Created)
|
||||
date := ts.Local().Format("2006-01-02")
|
||||
|
||||
model := msg.ModelID
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
|
||||
var cacheRead, cacheWrite int64
|
||||
if msg.Tokens.Cache != nil {
|
||||
cacheRead = msg.Tokens.Cache.Read
|
||||
cacheWrite = msg.Tokens.Cache.Write
|
||||
}
|
||||
|
||||
return &Record{
|
||||
Date: date,
|
||||
Provider: "opencode",
|
||||
Model: model,
|
||||
InputTokens: msg.Tokens.Input,
|
||||
OutputTokens: msg.Tokens.Output + msg.Tokens.Reasoning,
|
||||
CacheReadTokens: cacheRead,
|
||||
CacheWriteTokens: cacheWrite,
|
||||
}
|
||||
}
|
||||
@@ -1,141 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParseOpenCodeFile(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
// Real OpenCode message JSON format with token usage
|
||||
content := `{
|
||||
"id": "msg_test001",
|
||||
"sessionID": "ses_test001",
|
||||
"role": "assistant",
|
||||
"time": {"created": 1768332037518, "completed": 1768332039410},
|
||||
"modelID": "claude-sonnet-4-5",
|
||||
"providerID": "anthropic",
|
||||
"tokens": {"input": 10916, "output": 5, "reasoning": 100, "cache": {"read": 448, "write": 50}}
|
||||
}`
|
||||
|
||||
filePath := filepath.Join(tmp, "msg_test001.json")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseOpenCodeFile(filePath)
|
||||
|
||||
if record == nil {
|
||||
t.Fatal("expected non-nil record")
|
||||
}
|
||||
|
||||
if record.Provider != "opencode" {
|
||||
t.Errorf("provider = %q, want %q", record.Provider, "opencode")
|
||||
}
|
||||
if record.Model != "claude-sonnet-4-5" {
|
||||
t.Errorf("model = %q, want %q", record.Model, "claude-sonnet-4-5")
|
||||
}
|
||||
if record.InputTokens != 10916 {
|
||||
t.Errorf("input_tokens = %d, want %d", record.InputTokens, 10916)
|
||||
}
|
||||
// output_tokens + reasoning
|
||||
if record.OutputTokens != 105 {
|
||||
t.Errorf("output_tokens = %d, want %d", record.OutputTokens, 105)
|
||||
}
|
||||
if record.CacheReadTokens != 448 {
|
||||
t.Errorf("cache_read_tokens = %d, want %d", record.CacheReadTokens, 448)
|
||||
}
|
||||
if record.CacheWriteTokens != 50 {
|
||||
t.Errorf("cache_write_tokens = %d, want %d", record.CacheWriteTokens, 50)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseOpenCodeFile_UserMessage(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
// User messages should be ignored (no token usage to report)
|
||||
content := `{
|
||||
"id": "msg_user001",
|
||||
"sessionID": "ses_test001",
|
||||
"role": "user",
|
||||
"time": {"created": 1768332037000}
|
||||
}`
|
||||
|
||||
filePath := filepath.Join(tmp, "msg_user001.json")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseOpenCodeFile(filePath)
|
||||
|
||||
if record != nil {
|
||||
t.Errorf("expected nil record for user message, got %+v", record)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseOpenCodeFile_NoCache(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
// Message without cache field
|
||||
content := `{
|
||||
"id": "msg_test002",
|
||||
"sessionID": "ses_test002",
|
||||
"role": "assistant",
|
||||
"time": {"created": 1768332037518},
|
||||
"modelID": "gpt-4o",
|
||||
"providerID": "openai",
|
||||
"tokens": {"input": 500, "output": 200, "reasoning": 0}
|
||||
}`
|
||||
|
||||
filePath := filepath.Join(tmp, "msg_test002.json")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseOpenCodeFile(filePath)
|
||||
|
||||
if record == nil {
|
||||
t.Fatal("expected non-nil record")
|
||||
}
|
||||
if record.CacheReadTokens != 0 {
|
||||
t.Errorf("cache_read_tokens = %d, want 0", record.CacheReadTokens)
|
||||
}
|
||||
if record.CacheWriteTokens != 0 {
|
||||
t.Errorf("cache_write_tokens = %d, want 0", record.CacheWriteTokens)
|
||||
}
|
||||
if record.Model != "gpt-4o" {
|
||||
t.Errorf("model = %q, want %q", record.Model, "gpt-4o")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseOpenCodeFile_ZeroTokens(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
|
||||
// Message with zero tokens should return nil
|
||||
content := `{
|
||||
"id": "msg_test003",
|
||||
"sessionID": "ses_test003",
|
||||
"role": "assistant",
|
||||
"time": {"created": 1768332037518},
|
||||
"modelID": "test-model",
|
||||
"tokens": {"input": 0, "output": 0, "reasoning": 0}
|
||||
}`
|
||||
|
||||
filePath := filepath.Join(tmp, "msg_test003.json")
|
||||
if err := os.WriteFile(filePath, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s := NewScanner(slog.Default())
|
||||
record := s.parseOpenCodeFile(filePath)
|
||||
|
||||
if record != nil {
|
||||
t.Errorf("expected nil record for zero tokens, got %+v", record)
|
||||
}
|
||||
}
|
||||
@@ -1,126 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/multica-ai/multica/server/pkg/agent"
|
||||
)
|
||||
|
||||
// scanPi reads Pi session JSONL logs produced by the multica daemon and
|
||||
// extracts token usage from assistant `message` events.
|
||||
//
|
||||
// The agent backend writes every run's session file into
|
||||
// ~/.multica/pi-sessions/ (see agent.PiSessionDir). Pi appends events to
|
||||
// the file as it runs; each assistant `message` event carries cumulative
|
||||
// usage for that turn in the shape:
|
||||
//
|
||||
// {"type":"message","timestamp":"...",
|
||||
// "message":{"role":"assistant","model":"...",
|
||||
// "usage":{"input":N,"output":N,"cacheRead":N,"cacheWrite":N,...}}}
|
||||
func (s *Scanner) scanPi() []Record {
|
||||
root, err := agent.PiSessionDir()
|
||||
if err != nil || root == "" {
|
||||
return nil
|
||||
}
|
||||
if info, err := os.Stat(root); err != nil || !info.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
files, err := filepath.Glob(filepath.Join(root, "*.jsonl"))
|
||||
if err != nil {
|
||||
s.logger.Debug("pi glob error", "error", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var records []Record
|
||||
for _, f := range files {
|
||||
records = append(records, s.parsePiFile(f)...)
|
||||
}
|
||||
return mergeRecords(records)
|
||||
}
|
||||
|
||||
type piSessionLine struct {
|
||||
Type string `json:"type"`
|
||||
Timestamp string `json:"timestamp"`
|
||||
Message *struct {
|
||||
Role string `json:"role"`
|
||||
Model string `json:"model"`
|
||||
Usage *struct {
|
||||
Input int64 `json:"input"`
|
||||
Output int64 `json:"output"`
|
||||
CacheRead int64 `json:"cacheRead"`
|
||||
CacheWrite int64 `json:"cacheWrite"`
|
||||
} `json:"usage"`
|
||||
} `json:"message"`
|
||||
}
|
||||
|
||||
// parsePiFile walks a single session file and emits one Record per
|
||||
// assistant message with non-zero usage. Each assistant message carries
|
||||
// the cost for that specific turn (not cumulative), so they can be
|
||||
// summed by mergeRecords downstream.
|
||||
func (s *Scanner) parsePiFile(path string) []Record {
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
scanner := bufio.NewScanner(f)
|
||||
scanner.Buffer(make([]byte, 0, 256*1024), 8*1024*1024)
|
||||
|
||||
var records []Record
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
if !bytesContains(line, `"usage"`) {
|
||||
continue
|
||||
}
|
||||
var entry piSessionLine
|
||||
if err := json.Unmarshal(line, &entry); err != nil {
|
||||
continue
|
||||
}
|
||||
if entry.Type != "message" || entry.Message == nil || entry.Message.Usage == nil {
|
||||
continue
|
||||
}
|
||||
if entry.Message.Role != "assistant" {
|
||||
continue
|
||||
}
|
||||
u := entry.Message.Usage
|
||||
if u.Input == 0 && u.Output == 0 && u.CacheRead == 0 && u.CacheWrite == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
date := ""
|
||||
if entry.Timestamp != "" {
|
||||
if ts, err := time.Parse(time.RFC3339Nano, entry.Timestamp); err == nil {
|
||||
date = ts.Local().Format("2006-01-02")
|
||||
}
|
||||
}
|
||||
if date == "" {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
date = info.ModTime().Local().Format("2006-01-02")
|
||||
}
|
||||
|
||||
model := entry.Message.Model
|
||||
if model == "" {
|
||||
model = "unknown"
|
||||
}
|
||||
|
||||
records = append(records, Record{
|
||||
Date: date,
|
||||
Provider: "pi",
|
||||
Model: model,
|
||||
InputTokens: u.Input,
|
||||
OutputTokens: u.Output,
|
||||
CacheReadTokens: u.CacheRead,
|
||||
CacheWriteTokens: u.CacheWrite,
|
||||
})
|
||||
}
|
||||
return records
|
||||
}
|
||||
@@ -1,82 +0,0 @@
|
||||
package usage
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
)
|
||||
|
||||
// Record represents aggregated token usage for one (date, provider, model) tuple.
|
||||
type Record struct {
|
||||
Date string `json:"date"` // "2006-01-02"
|
||||
Provider string `json:"provider"` // "claude" or "codex"
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
// Scanner scans local CLI log files for token usage data.
|
||||
type Scanner struct {
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewScanner creates a new usage scanner.
|
||||
func NewScanner(logger *slog.Logger) *Scanner {
|
||||
return &Scanner{logger: logger}
|
||||
}
|
||||
|
||||
// Scan reads local log files for all supported agent runtimes (Claude Code,
|
||||
// Codex, OpenCode, OpenClaw, Hermes) and returns aggregated usage records
|
||||
// keyed by (date, provider, model). Supports Claude Code, Codex, OpenCode,
|
||||
// OpenClaw, Hermes, and Pi.
|
||||
func (s *Scanner) Scan() []Record {
|
||||
var records []Record
|
||||
|
||||
claudeRecords := s.scanClaude()
|
||||
records = append(records, claudeRecords...)
|
||||
|
||||
codexRecords := s.scanCodex()
|
||||
records = append(records, codexRecords...)
|
||||
|
||||
openCodeRecords := s.scanOpenCode()
|
||||
records = append(records, openCodeRecords...)
|
||||
|
||||
openClawRecords := s.scanOpenClaw()
|
||||
records = append(records, openClawRecords...)
|
||||
|
||||
hermesRecords := s.scanHermes()
|
||||
records = append(records, hermesRecords...)
|
||||
|
||||
piRecords := s.scanPi()
|
||||
records = append(records, piRecords...)
|
||||
|
||||
return records
|
||||
}
|
||||
|
||||
// aggregation key for merging records.
|
||||
type aggKey struct {
|
||||
Date string
|
||||
Provider string
|
||||
Model string
|
||||
}
|
||||
|
||||
func mergeRecords(records []Record) []Record {
|
||||
m := make(map[aggKey]*Record)
|
||||
for _, r := range records {
|
||||
k := aggKey{Date: r.Date, Provider: r.Provider, Model: r.Model}
|
||||
if existing, ok := m[k]; ok {
|
||||
existing.InputTokens += r.InputTokens
|
||||
existing.OutputTokens += r.OutputTokens
|
||||
existing.CacheReadTokens += r.CacheReadTokens
|
||||
existing.CacheWriteTokens += r.CacheWriteTokens
|
||||
} else {
|
||||
copy := r
|
||||
m[k] = ©
|
||||
}
|
||||
}
|
||||
result := make([]Record, 0, len(m))
|
||||
for _, r := range m {
|
||||
result = append(result, *r)
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -59,16 +59,6 @@ func runtimeToResponse(rt db.AgentRuntime) AgentRuntimeResponse {
|
||||
// Runtime Usage
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type RuntimeUsageEntry struct {
|
||||
Date string `json:"date"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
type RuntimeUsageResponse struct {
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
Date string `json:"date"`
|
||||
@@ -80,48 +70,10 @@ type RuntimeUsageResponse struct {
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
// ReportRuntimeUsage receives usage data from the daemon.
|
||||
func (h *Handler) ReportRuntimeUsage(w http.ResponseWriter, r *http.Request) {
|
||||
runtimeID := chi.URLParam(r, "runtimeId")
|
||||
if runtimeID == "" {
|
||||
writeError(w, http.StatusBadRequest, "runtimeId is required")
|
||||
return
|
||||
}
|
||||
|
||||
// Verify the caller owns this runtime's workspace.
|
||||
if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var req struct {
|
||||
Entries []RuntimeUsageEntry `json:"entries"`
|
||||
}
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
for _, entry := range req.Entries {
|
||||
date, err := time.Parse("2006-01-02", entry.Date)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
h.Queries.UpsertRuntimeUsage(r.Context(), db.UpsertRuntimeUsageParams{
|
||||
RuntimeID: parseUUID(runtimeID),
|
||||
Date: pgtype.Date{Time: date, Valid: true},
|
||||
Provider: entry.Provider,
|
||||
Model: entry.Model,
|
||||
InputTokens: entry.InputTokens,
|
||||
OutputTokens: entry.OutputTokens,
|
||||
CacheReadTokens: entry.CacheReadTokens,
|
||||
CacheWriteTokens: entry.CacheWriteTokens,
|
||||
})
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
}
|
||||
|
||||
// GetRuntimeUsage returns usage data for a runtime (protected route).
|
||||
// GetRuntimeUsage returns daily token usage for a runtime, aggregated from
|
||||
// per-task usage records captured by the daemon. This is scoped to
|
||||
// Daemon-executed tasks only (i.e. excludes users' local CLI usage of the
|
||||
// same tool).
|
||||
func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
|
||||
runtimeID := chi.URLParam(r, "runtimeId")
|
||||
|
||||
@@ -135,17 +87,11 @@ func (h *Handler) GetRuntimeUsage(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
days := 90
|
||||
if d := r.URL.Query().Get("days"); d != "" {
|
||||
if parsed, err := strconv.Atoi(d); err == nil && parsed > 0 && parsed <= 365 {
|
||||
days = parsed
|
||||
}
|
||||
}
|
||||
since := pgtype.Date{Time: time.Now().AddDate(0, 0, -days), Valid: true}
|
||||
since := parseSinceParam(r, 90)
|
||||
|
||||
rows, err := h.Queries.ListRuntimeUsage(r.Context(), db.ListRuntimeUsageParams{
|
||||
RuntimeID: parseUUID(runtimeID),
|
||||
Date: since,
|
||||
Since: since,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list usage")
|
||||
|
||||
117
server/internal/handler/runtime_test.go
Normal file
117
server/internal/handler/runtime_test.go
Normal file
@@ -0,0 +1,117 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestGetRuntimeUsage_BucketsByUsageTime ensures a task that was enqueued on
|
||||
// one calendar day but whose tokens were reported the next day (e.g. execution
|
||||
// crossed midnight, or the task sat in the queue) is attributed to the day
|
||||
// tokens were actually produced, not the enqueue day. It also verifies the
|
||||
// ?days=N cutoff covers the full earliest calendar day, not just "now minus N
|
||||
// days" which would clip the morning of that day.
|
||||
func TestGetRuntimeUsage_BucketsByUsageTime(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
ctx := context.Background()
|
||||
|
||||
// Pick a runtime bound to the fixture workspace.
|
||||
var runtimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT id FROM agent_runtime WHERE workspace_id = $1 LIMIT 1
|
||||
`, testWorkspaceID).Scan(&runtimeID); err != nil {
|
||||
t.Fatalf("fetch runtime: %v", err)
|
||||
}
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1
|
||||
`, testWorkspaceID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("fetch agent: %v", err)
|
||||
}
|
||||
|
||||
// Create an issue for the tasks to reference.
|
||||
var issueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, creator_id, creator_type)
|
||||
VALUES ($1, 'runtime usage test', $2, 'member')
|
||||
RETURNING id
|
||||
`, testWorkspaceID, testUserID).Scan(&issueID); err != nil {
|
||||
t.Fatalf("create issue: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
|
||||
})
|
||||
|
||||
// enqueued yesterday 23:58 UTC, finished today 00:05 UTC — tokens belong to today.
|
||||
now := time.Now().UTC()
|
||||
today := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC)
|
||||
yesterdayLate := today.Add(-2 * time.Minute)
|
||||
todayEarly := today.Add(5 * time.Minute)
|
||||
// Task that ran entirely yesterday around 05:00 — used to verify the
|
||||
// ?days cutoff isn't clipping yesterday's morning.
|
||||
yesterdayMorning := today.Add(-19 * time.Hour)
|
||||
|
||||
insertTaskWithUsage := func(enqueueAt, usageAt time.Time, inputTokens int64) string {
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, issue_id, runtime_id, status, created_at)
|
||||
VALUES ($1, $2, $3, 'completed', $4)
|
||||
RETURNING id
|
||||
`, agentID, issueID, runtimeID, enqueueAt).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO task_usage (task_id, provider, model, input_tokens, output_tokens, created_at)
|
||||
VALUES ($1, 'claude', 'claude-3-5-sonnet', $2, 0, $3)
|
||||
`, taskID, inputTokens, usageAt); err != nil {
|
||||
t.Fatalf("insert task_usage: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
testPool.Exec(ctx, `DELETE FROM agent_task_queue WHERE id = $1`, taskID)
|
||||
})
|
||||
return taskID
|
||||
}
|
||||
|
||||
insertTaskWithUsage(yesterdayLate, todayEarly, 1000) // cross-midnight
|
||||
insertTaskWithUsage(yesterdayMorning, yesterdayMorning, 2000) // full-day yesterday
|
||||
|
||||
// Call the handler with ?days=1 at whatever "now" is. That should include
|
||||
// both today and yesterday in full.
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("GET", "/api/runtimes/"+runtimeID+"/usage?days=1", nil)
|
||||
req = withURLParam(req, "runtimeId", runtimeID)
|
||||
testHandler.GetRuntimeUsage(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("GetRuntimeUsage: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var resp []RuntimeUsageResponse
|
||||
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
|
||||
t.Fatalf("decode response: %v", err)
|
||||
}
|
||||
|
||||
byDate := make(map[string]int64)
|
||||
for _, r := range resp {
|
||||
byDate[r.Date] += r.InputTokens
|
||||
}
|
||||
|
||||
todayKey := today.Format("2006-01-02")
|
||||
yesterdayKey := today.Add(-24 * time.Hour).Format("2006-01-02")
|
||||
|
||||
// Cross-midnight task must attribute to today (tu.created_at), not yesterday
|
||||
// (atq.created_at). Before the fix this was 0 on today / 1000 on yesterday.
|
||||
if byDate[todayKey] != 1000 {
|
||||
t.Errorf("cross-midnight task: today bucket expected 1000 input tokens, got %d (full map: %v)", byDate[todayKey], byDate)
|
||||
}
|
||||
// Yesterday's morning task must still be included — this is what breaks
|
||||
// when ?days=N is interpreted as a rolling window instead of calendar days.
|
||||
if byDate[yesterdayKey] != 2000 {
|
||||
t.Errorf("yesterday morning task: yesterday bucket expected 2000 input tokens, got %d (full map: %v)", byDate[yesterdayKey], byDate)
|
||||
}
|
||||
}
|
||||
16
server/migrations/046_drop_runtime_usage.down.sql
Normal file
16
server/migrations/046_drop_runtime_usage.down.sql
Normal file
@@ -0,0 +1,16 @@
|
||||
CREATE TABLE IF NOT EXISTS runtime_usage (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
runtime_id UUID NOT NULL REFERENCES agent_runtime(id) ON DELETE CASCADE,
|
||||
date DATE NOT NULL,
|
||||
provider TEXT NOT NULL,
|
||||
model TEXT NOT NULL DEFAULT '',
|
||||
input_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
output_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_read_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
cache_write_tokens BIGINT NOT NULL DEFAULT 0,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
UNIQUE (runtime_id, date, provider, model)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_runtime_usage_runtime_date ON runtime_usage(runtime_id, date DESC);
|
||||
1
server/migrations/046_drop_runtime_usage.up.sql
Normal file
1
server/migrations/046_drop_runtime_usage.up.sql
Normal file
@@ -0,0 +1 @@
|
||||
DROP TABLE IF EXISTS runtime_usage;
|
||||
@@ -336,20 +336,6 @@ type Project struct {
|
||||
Priority string `json:"priority"`
|
||||
}
|
||||
|
||||
type RuntimeUsage struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
Date pgtype.Date `json:"date"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
InputTokens int64 `json:"input_tokens"`
|
||||
OutputTokens int64 `json:"output_tokens"`
|
||||
CacheReadTokens int64 `json:"cache_read_tokens"`
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
}
|
||||
|
||||
type Skill struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
|
||||
@@ -44,112 +44,29 @@ func (q *Queries) GetRuntimeTaskHourlyActivity(ctx context.Context, runtimeID pg
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getRuntimeUsageSummary = `-- name: GetRuntimeUsageSummary :many
|
||||
SELECT provider, model,
|
||||
SUM(input_tokens)::bigint AS total_input_tokens,
|
||||
SUM(output_tokens)::bigint AS total_output_tokens,
|
||||
SUM(cache_read_tokens)::bigint AS total_cache_read_tokens,
|
||||
SUM(cache_write_tokens)::bigint AS total_cache_write_tokens
|
||||
FROM runtime_usage
|
||||
WHERE runtime_id = $1
|
||||
GROUP BY provider, model
|
||||
ORDER BY provider, model
|
||||
`
|
||||
|
||||
type GetRuntimeUsageSummaryRow struct {
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
TotalInputTokens int64 `json:"total_input_tokens"`
|
||||
TotalOutputTokens int64 `json:"total_output_tokens"`
|
||||
TotalCacheReadTokens int64 `json:"total_cache_read_tokens"`
|
||||
TotalCacheWriteTokens int64 `json:"total_cache_write_tokens"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetRuntimeUsageSummary(ctx context.Context, runtimeID pgtype.UUID) ([]GetRuntimeUsageSummaryRow, error) {
|
||||
rows, err := q.db.Query(ctx, getRuntimeUsageSummary, runtimeID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []GetRuntimeUsageSummaryRow{}
|
||||
for rows.Next() {
|
||||
var i GetRuntimeUsageSummaryRow
|
||||
if err := rows.Scan(
|
||||
&i.Provider,
|
||||
&i.Model,
|
||||
&i.TotalInputTokens,
|
||||
&i.TotalOutputTokens,
|
||||
&i.TotalCacheReadTokens,
|
||||
&i.TotalCacheWriteTokens,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listRuntimeUsage = `-- name: ListRuntimeUsage :many
|
||||
SELECT id, runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, created_at, updated_at FROM runtime_usage
|
||||
WHERE runtime_id = $1
|
||||
AND date >= $2
|
||||
ORDER BY date DESC
|
||||
SELECT
|
||||
DATE(tu.created_at) AS date,
|
||||
tu.provider,
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
WHERE atq.runtime_id = $1
|
||||
AND tu.created_at >= DATE_TRUNC('day', $2::timestamptz)
|
||||
GROUP BY DATE(tu.created_at), tu.provider, tu.model
|
||||
ORDER BY DATE(tu.created_at) DESC, tu.provider, tu.model
|
||||
`
|
||||
|
||||
type ListRuntimeUsageParams struct {
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
Date pgtype.Date `json:"date"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
Since pgtype.Timestamptz `json:"since"`
|
||||
}
|
||||
|
||||
func (q *Queries) ListRuntimeUsage(ctx context.Context, arg ListRuntimeUsageParams) ([]RuntimeUsage, error) {
|
||||
rows, err := q.db.Query(ctx, listRuntimeUsage, arg.RuntimeID, arg.Date)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []RuntimeUsage{}
|
||||
for rows.Next() {
|
||||
var i RuntimeUsage
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.RuntimeID,
|
||||
&i.Date,
|
||||
&i.Provider,
|
||||
&i.Model,
|
||||
&i.InputTokens,
|
||||
&i.OutputTokens,
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const upsertRuntimeUsage = `-- name: UpsertRuntimeUsage :exec
|
||||
INSERT INTO runtime_usage (runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (runtime_id, date, provider, model)
|
||||
DO UPDATE SET
|
||||
input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
||||
updated_at = now()
|
||||
`
|
||||
|
||||
type UpsertRuntimeUsageParams struct {
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
type ListRuntimeUsageRow struct {
|
||||
Date pgtype.Date `json:"date"`
|
||||
Provider string `json:"provider"`
|
||||
Model string `json:"model"`
|
||||
@@ -159,16 +76,34 @@ type UpsertRuntimeUsageParams struct {
|
||||
CacheWriteTokens int64 `json:"cache_write_tokens"`
|
||||
}
|
||||
|
||||
func (q *Queries) UpsertRuntimeUsage(ctx context.Context, arg UpsertRuntimeUsageParams) error {
|
||||
_, err := q.db.Exec(ctx, upsertRuntimeUsage,
|
||||
arg.RuntimeID,
|
||||
arg.Date,
|
||||
arg.Provider,
|
||||
arg.Model,
|
||||
arg.InputTokens,
|
||||
arg.OutputTokens,
|
||||
arg.CacheReadTokens,
|
||||
arg.CacheWriteTokens,
|
||||
)
|
||||
return err
|
||||
// Bucket by tu.created_at (usage report time, ~= task completion time), not
|
||||
// atq.created_at (task enqueue time), so tasks that queue one day and execute
|
||||
// the next are attributed to the day tokens were actually produced. The since
|
||||
// cutoff is truncated to start-of-day so `days=N` yields full calendar days.
|
||||
func (q *Queries) ListRuntimeUsage(ctx context.Context, arg ListRuntimeUsageParams) ([]ListRuntimeUsageRow, error) {
|
||||
rows, err := q.db.Query(ctx, listRuntimeUsage, arg.RuntimeID, arg.Since)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ListRuntimeUsageRow{}
|
||||
for rows.Next() {
|
||||
var i ListRuntimeUsageRow
|
||||
if err := rows.Scan(
|
||||
&i.Date,
|
||||
&i.Provider,
|
||||
&i.Model,
|
||||
&i.InputTokens,
|
||||
&i.OutputTokens,
|
||||
&i.CacheReadTokens,
|
||||
&i.CacheWriteTokens,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
@@ -1,30 +1,22 @@
|
||||
-- name: UpsertRuntimeUsage :exec
|
||||
INSERT INTO runtime_usage (runtime_id, date, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (runtime_id, date, provider, model)
|
||||
DO UPDATE SET
|
||||
input_tokens = EXCLUDED.input_tokens,
|
||||
output_tokens = EXCLUDED.output_tokens,
|
||||
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
||||
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
||||
updated_at = now();
|
||||
|
||||
-- name: ListRuntimeUsage :many
|
||||
SELECT * FROM runtime_usage
|
||||
WHERE runtime_id = $1
|
||||
AND date >= $2
|
||||
ORDER BY date DESC;
|
||||
|
||||
-- name: GetRuntimeUsageSummary :many
|
||||
SELECT provider, model,
|
||||
SUM(input_tokens)::bigint AS total_input_tokens,
|
||||
SUM(output_tokens)::bigint AS total_output_tokens,
|
||||
SUM(cache_read_tokens)::bigint AS total_cache_read_tokens,
|
||||
SUM(cache_write_tokens)::bigint AS total_cache_write_tokens
|
||||
FROM runtime_usage
|
||||
WHERE runtime_id = $1
|
||||
GROUP BY provider, model
|
||||
ORDER BY provider, model;
|
||||
-- Bucket by tu.created_at (usage report time, ~= task completion time), not
|
||||
-- atq.created_at (task enqueue time), so tasks that queue one day and execute
|
||||
-- the next are attributed to the day tokens were actually produced. The since
|
||||
-- cutoff is truncated to start-of-day so `days=N` yields full calendar days.
|
||||
SELECT
|
||||
DATE(tu.created_at) AS date,
|
||||
tu.provider,
|
||||
tu.model,
|
||||
SUM(tu.input_tokens)::bigint AS input_tokens,
|
||||
SUM(tu.output_tokens)::bigint AS output_tokens,
|
||||
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
||||
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens
|
||||
FROM task_usage tu
|
||||
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
||||
WHERE atq.runtime_id = $1
|
||||
AND tu.created_at >= DATE_TRUNC('day', @since::timestamptz)
|
||||
GROUP BY DATE(tu.created_at), tu.provider, tu.model
|
||||
ORDER BY DATE(tu.created_at) DESC, tu.provider, tu.model;
|
||||
|
||||
-- name: GetRuntimeTaskHourlyActivity :many
|
||||
SELECT EXTRACT(HOUR FROM started_at)::int AS hour, COUNT(*)::int AS count
|
||||
|
||||
Reference in New Issue
Block a user