mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 18:09:14 +02:00
Compare commits
3 Commits
agent/lamb
...
agent/j/96
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9fe7340211 | ||
|
|
584344e5bd | ||
|
|
4896128723 |
@@ -38,6 +38,12 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
sessionID = fmt.Sprintf("multica-%d", time.Now().UnixNano())
|
||||
}
|
||||
args := []string{"agent", "--local", "--json", "--session-id", sessionID}
|
||||
if opts.Model != "" {
|
||||
args = append(args, "--model", opts.Model)
|
||||
}
|
||||
if opts.SystemPrompt != "" {
|
||||
args = append(args, "--system-prompt", opts.SystemPrompt)
|
||||
}
|
||||
if opts.Timeout > 0 {
|
||||
args = append(args, "--timeout", fmt.Sprintf("%d", int(opts.Timeout.Seconds())))
|
||||
}
|
||||
@@ -129,22 +135,108 @@ type openclawEventResult struct {
|
||||
}
|
||||
|
||||
// processOutput reads the JSON output from openclaw --json stderr and returns
|
||||
// the parsed result. OpenClaw writes its JSON result to stderr, which may also
|
||||
// contain non-JSON log lines. We scan line-by-line so a final result line can
|
||||
// be recognized without waiting for the entire stderr stream to be buffered.
|
||||
// the parsed result. OpenClaw writes its JSON output to stderr, which may also
|
||||
// contain non-JSON log lines. The stream may contain:
|
||||
//
|
||||
// - NDJSON streaming events (type: "text", "tool_use", "tool_result", "error",
|
||||
// "step_start", "step_finish") — emitted in real time as the agent works
|
||||
// - A final result JSON (with payloads + meta) — the legacy single-blob format
|
||||
//
|
||||
// We scan line-by-line, emitting messages as events arrive so streaming
|
||||
// consumers get real-time feedback instead of waiting for the final blob.
|
||||
func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclawEventResult {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
||||
|
||||
var output strings.Builder
|
||||
var sessionID string
|
||||
var usage TokenUsage
|
||||
finalStatus := "completed"
|
||||
var finalError string
|
||||
gotEvents := false // true if we parsed at least one streaming event or result
|
||||
|
||||
var rawLines []string
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
if result, ok := tryParseOpenclawResult(line); ok {
|
||||
return b.buildOpenclawEventResult(result, ch)
|
||||
|
||||
// Try parsing as a streaming NDJSON event first.
|
||||
if event, ok := tryParseOpenclawEvent(line); ok {
|
||||
gotEvents = true
|
||||
if event.SessionID != "" {
|
||||
sessionID = event.SessionID
|
||||
}
|
||||
switch event.Type {
|
||||
case "text":
|
||||
if event.Text != "" {
|
||||
output.WriteString(event.Text)
|
||||
trySend(ch, Message{Type: MessageText, Content: event.Text})
|
||||
}
|
||||
case "tool_use":
|
||||
var input map[string]any
|
||||
if event.Input != nil {
|
||||
_ = json.Unmarshal(event.Input, &input)
|
||||
}
|
||||
trySend(ch, Message{
|
||||
Type: MessageToolUse,
|
||||
Tool: event.Tool,
|
||||
CallID: event.CallID,
|
||||
Input: input,
|
||||
})
|
||||
case "tool_result":
|
||||
trySend(ch, Message{
|
||||
Type: MessageToolResult,
|
||||
Tool: event.Tool,
|
||||
CallID: event.CallID,
|
||||
Output: event.Text,
|
||||
})
|
||||
case "error":
|
||||
errMsg := event.errorMessage()
|
||||
b.cfg.Logger.Warn("openclaw error event", "error", errMsg)
|
||||
trySend(ch, Message{Type: MessageError, Content: errMsg})
|
||||
finalStatus = "failed"
|
||||
finalError = errMsg
|
||||
case "lifecycle":
|
||||
phase := event.Phase
|
||||
if phase == "error" || phase == "failed" || phase == "cancelled" {
|
||||
errMsg := event.errorMessage()
|
||||
b.cfg.Logger.Warn("openclaw lifecycle failure", "phase", phase, "error", errMsg)
|
||||
trySend(ch, Message{Type: MessageError, Content: errMsg})
|
||||
finalStatus = "failed"
|
||||
finalError = errMsg
|
||||
}
|
||||
case "step_start":
|
||||
trySend(ch, Message{Type: MessageStatus, Status: "running"})
|
||||
case "step_finish":
|
||||
if event.Usage != nil {
|
||||
u := parseOpenclawUsage(event.Usage)
|
||||
usage.InputTokens += u.InputTokens
|
||||
usage.OutputTokens += u.OutputTokens
|
||||
usage.CacheReadTokens += u.CacheReadTokens
|
||||
usage.CacheWriteTokens += u.CacheWriteTokens
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Try parsing as a final result blob (legacy format).
|
||||
if result, ok := tryParseOpenclawResult(line); ok {
|
||||
gotEvents = true
|
||||
res := b.buildOpenclawEventResult(result, ch, &output)
|
||||
if res.sessionID != "" {
|
||||
sessionID = res.sessionID
|
||||
}
|
||||
// Prefer usage from the final result if no streaming events reported it.
|
||||
u := res.usage
|
||||
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
|
||||
usage = u
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Not JSON — treat as log line.
|
||||
b.cfg.Logger.Debug("[openclaw:stderr] " + line)
|
||||
rawLines = append(rawLines, line)
|
||||
}
|
||||
@@ -153,36 +245,65 @@ func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclaw
|
||||
return openclawEventResult{status: "failed", errMsg: fmt.Sprintf("read stderr: %v", err)}
|
||||
}
|
||||
|
||||
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
|
||||
if trimmed != "" {
|
||||
return openclawEventResult{status: "completed", output: trimmed}
|
||||
// If we got no events at all, fall back to raw output.
|
||||
if !gotEvents {
|
||||
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
|
||||
if trimmed != "" {
|
||||
return openclawEventResult{status: "completed", output: trimmed}
|
||||
}
|
||||
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
|
||||
}
|
||||
|
||||
return openclawEventResult{
|
||||
status: finalStatus,
|
||||
errMsg: finalError,
|
||||
output: output.String(),
|
||||
sessionID: sessionID,
|
||||
usage: usage,
|
||||
}
|
||||
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
|
||||
}
|
||||
|
||||
// tryParseOpenclawEvent attempts to parse a line as a streaming NDJSON event.
|
||||
// Returns the event and true if the line is a valid event with a known type.
|
||||
func tryParseOpenclawEvent(line string) (openclawEvent, bool) {
|
||||
if len(line) == 0 || line[0] != '{' {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
var event openclawEvent
|
||||
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
if event.Type == "" {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
return event, true
|
||||
}
|
||||
|
||||
// tryParseOpenclawResult attempts to parse a line as a final result blob
|
||||
// (the legacy format with payloads + meta). Lines must start with '{' to be
|
||||
// considered — we no longer scan for braces at arbitrary positions, which
|
||||
// avoids false matches on log lines containing JSON fragments.
|
||||
func tryParseOpenclawResult(raw string) (openclawResult, bool) {
|
||||
// Try each '{' position until we find valid openclawResult JSON.
|
||||
// Earlier '{' chars may appear in log/error lines (e.g. raw_params={...}).
|
||||
var result openclawResult
|
||||
for i := 0; i < len(raw); i++ {
|
||||
if raw[i] != '{' {
|
||||
continue
|
||||
}
|
||||
if err := json.Unmarshal([]byte(raw[i:]), &result); err == nil && (result.Payloads != nil || result.Meta.DurationMs > 0) {
|
||||
return result, true
|
||||
}
|
||||
if len(raw) == 0 || raw[0] != '{' {
|
||||
return openclawResult{}, false
|
||||
}
|
||||
return openclawResult{}, false
|
||||
var result openclawResult
|
||||
if err := json.Unmarshal([]byte(raw), &result); err != nil {
|
||||
return openclawResult{}, false
|
||||
}
|
||||
if result.Payloads == nil && result.Meta.DurationMs == 0 {
|
||||
return openclawResult{}, false
|
||||
}
|
||||
return result, true
|
||||
}
|
||||
|
||||
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message) openclawEventResult {
|
||||
var output strings.Builder
|
||||
// buildOpenclawEventResult extracts text and metadata from a final result blob.
|
||||
// Text payloads are appended to the shared output builder and emitted to ch.
|
||||
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message, output *strings.Builder) openclawEventResult {
|
||||
for _, p := range result.Payloads {
|
||||
if p.Text != "" {
|
||||
if output.Len() > 0 {
|
||||
output.WriteString("\n")
|
||||
}
|
||||
output.WriteString(p.Text)
|
||||
trySend(ch, Message{Type: MessageText, Content: p.Text})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,17 +314,10 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
|
||||
sessionID = sid
|
||||
}
|
||||
if u, ok := result.Meta.AgentMeta["usage"].(map[string]any); ok {
|
||||
usage.InputTokens = openclawInt64(u, "input")
|
||||
usage.OutputTokens = openclawInt64(u, "output")
|
||||
usage.CacheReadTokens = openclawInt64(u, "cacheRead")
|
||||
usage.CacheWriteTokens = openclawInt64(u, "cacheWrite")
|
||||
usage = parseOpenclawUsage(u)
|
||||
}
|
||||
}
|
||||
|
||||
if output.Len() > 0 {
|
||||
trySend(ch, Message{Type: MessageText, Content: output.String()})
|
||||
}
|
||||
|
||||
return openclawEventResult{
|
||||
status: "completed",
|
||||
output: output.String(),
|
||||
@@ -212,6 +326,33 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
|
||||
}
|
||||
}
|
||||
|
||||
// parseOpenclawUsage extracts token usage from a map, supporting multiple
|
||||
// field name conventions used by different OpenClaw versions and PaperClip:
|
||||
//
|
||||
// input / inputTokens / input_tokens
|
||||
// output / outputTokens / output_tokens
|
||||
// cacheRead / cachedInputTokens / cached_input_tokens / cache_read
|
||||
// cacheWrite / cacheCreationInputTokens / cache_creation_input_tokens / cache_write
|
||||
func parseOpenclawUsage(data map[string]any) TokenUsage {
|
||||
return TokenUsage{
|
||||
InputTokens: openclawInt64FirstOf(data, "input", "inputTokens", "input_tokens"),
|
||||
OutputTokens: openclawInt64FirstOf(data, "output", "outputTokens", "output_tokens"),
|
||||
CacheReadTokens: openclawInt64FirstOf(data, "cacheRead", "cachedInputTokens", "cached_input_tokens", "cache_read", "cache_read_input_tokens"),
|
||||
CacheWriteTokens: openclawInt64FirstOf(data, "cacheWrite", "cacheCreationInputTokens", "cache_creation_input_tokens", "cache_write"),
|
||||
}
|
||||
}
|
||||
|
||||
// openclawInt64FirstOf returns the first non-zero int64 value found under any
|
||||
// of the given keys. This supports field name variants across protocol versions.
|
||||
func openclawInt64FirstOf(data map[string]any, keys ...string) int64 {
|
||||
for _, key := range keys {
|
||||
if v := openclawInt64(data, key); v != 0 {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// openclawInt64 safely extracts an int64 from a JSON-decoded map value (which
|
||||
// may be float64 due to Go's JSON number handling).
|
||||
func openclawInt64(data map[string]any, key string) int64 {
|
||||
@@ -231,7 +372,73 @@ func openclawInt64(data map[string]any, key string) int64 {
|
||||
|
||||
// ── JSON types for `openclaw agent --json` output ──
|
||||
|
||||
// openclawResult represents the JSON output from `openclaw agent --json`.
|
||||
// openclawEvent represents a single streaming NDJSON event from openclaw --json.
|
||||
//
|
||||
// Event types:
|
||||
// - "text" — text output (text field)
|
||||
// - "tool_use" — tool invocation (tool, callId, input)
|
||||
// - "tool_result" — tool output (tool, callId, text)
|
||||
// - "error" — error (text, or structured error object)
|
||||
// - "lifecycle" — phase changes (phase: "error"/"failed"/"cancelled")
|
||||
// - "step_start" — agent step begins
|
||||
// - "step_finish" — agent step ends (usage)
|
||||
type openclawEvent struct {
|
||||
Type string `json:"type"`
|
||||
SessionID string `json:"sessionId,omitempty"`
|
||||
Text string `json:"text,omitempty"`
|
||||
Tool string `json:"tool,omitempty"`
|
||||
CallID string `json:"callId,omitempty"`
|
||||
Input json.RawMessage `json:"input,omitempty"`
|
||||
Usage map[string]any `json:"usage,omitempty"`
|
||||
Phase string `json:"phase,omitempty"` // lifecycle event phase
|
||||
Error *openclawError `json:"error,omitempty"` // structured error object
|
||||
Message string `json:"message,omitempty"` // alternative error message field
|
||||
}
|
||||
|
||||
// errorMessage extracts a human-readable error message from the event,
|
||||
// checking multiple fields: structured error object, text, message, or fallback.
|
||||
func (e openclawEvent) errorMessage() string {
|
||||
if e.Error != nil {
|
||||
if msg := e.Error.message(); msg != "" {
|
||||
return msg
|
||||
}
|
||||
}
|
||||
if e.Text != "" {
|
||||
return e.Text
|
||||
}
|
||||
if e.Message != "" {
|
||||
return e.Message
|
||||
}
|
||||
return "unknown openclaw error"
|
||||
}
|
||||
|
||||
// openclawError represents a structured error in an openclaw event,
|
||||
// compatible with PaperClip's error format (name + data.message).
|
||||
type openclawError struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Data *openclawErrorData `json:"data,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
func (e *openclawError) message() string {
|
||||
if e.Data != nil && e.Data.Message != "" {
|
||||
return e.Data.Message
|
||||
}
|
||||
if e.Message != "" {
|
||||
return e.Message
|
||||
}
|
||||
if e.Name != "" {
|
||||
return e.Name
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type openclawErrorData struct {
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
// openclawResult represents the final JSON output from `openclaw agent --json`
|
||||
// (the legacy single-blob format with payloads + meta).
|
||||
type openclawResult struct {
|
||||
Payloads []openclawPayload `json:"payloads"`
|
||||
Meta openclawMeta `json:"meta"`
|
||||
|
||||
@@ -18,7 +18,7 @@ func TestNewReturnsOpenclawBackend(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ── processOutput tests ──
|
||||
// ── Legacy result format tests (processOutput with final JSON blob) ──
|
||||
|
||||
func TestOpenclawProcessOutputHappyPath(t *testing.T) {
|
||||
t.Parallel()
|
||||
@@ -90,11 +90,24 @@ func TestOpenclawProcessOutputMultiplePayloads(t *testing.T) {
|
||||
|
||||
res := b.processOutput(strings.NewReader(string(data)), ch)
|
||||
|
||||
if res.output != "First\nSecond" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "First\nSecond")
|
||||
if res.output != "FirstSecond" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "FirstSecond")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 text messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[0].Content != "First" {
|
||||
t.Errorf("msg[0]: got %q, want %q", msgs[0].Content, "First")
|
||||
}
|
||||
if msgs[1].Content != "Second" {
|
||||
t.Errorf("msg[1]: got %q, want %q", msgs[1].Content, "Second")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawProcessOutputEmptyPayloads(t *testing.T) {
|
||||
@@ -238,7 +251,8 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
|
||||
Meta: openclawMeta{DurationMs: 500},
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
// Simulate error line containing braces before the real JSON (the exact bug scenario)
|
||||
// Log line with braces should NOT be parsed as JSON — only lines starting
|
||||
// with '{' are considered. The result blob on its own line is still parsed.
|
||||
input := `[tools] exec failed: complex interpreter invocation detected. raw_params={"command":"echo hello"}` + "\n" + string(data)
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
@@ -253,6 +267,558 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawResultBlobWithLeadingPrefixRejected(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
// A line with a prefix before the JSON should NOT be parsed as a result.
|
||||
// This tests that the hardened parser rejects non-'{'-starting lines.
|
||||
result := openclawResult{
|
||||
Payloads: []openclawPayload{{Text: "Should not match"}},
|
||||
Meta: openclawMeta{DurationMs: 500},
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
input := "some prefix " + string(data)
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
// Should fall back to raw output since the JSON has a prefix.
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
if res.output != input {
|
||||
t.Errorf("output: got %q, want raw input back", res.output)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── Streaming NDJSON event tests ──
|
||||
|
||||
func TestOpenclawStreamingTextEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Hello "}`,
|
||||
`{"type":"text","text":"world"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
if res.output != "Hello world" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "Hello world")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[0].Type != MessageText || msgs[0].Content != "Hello " {
|
||||
t.Errorf("msg[0]: type=%s content=%q", msgs[0].Type, msgs[0].Content)
|
||||
}
|
||||
if msgs[1].Type != MessageText || msgs[1].Content != "world" {
|
||||
t.Errorf("msg[1]: type=%s content=%q", msgs[1].Type, msgs[1].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingToolUseEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"tool_use","tool":"bash","callId":"call_1","input":{"command":"ls -la"}}`,
|
||||
`{"type":"tool_result","tool":"bash","callId":"call_1","text":"total 42\ndrwxr-xr-x"}`,
|
||||
`{"type":"text","text":"Listed files."}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 3 {
|
||||
t.Fatalf("expected 3 messages, got %d", len(msgs))
|
||||
}
|
||||
|
||||
// tool_use
|
||||
if msgs[0].Type != MessageToolUse {
|
||||
t.Errorf("msg[0] type: got %s, want tool-use", msgs[0].Type)
|
||||
}
|
||||
if msgs[0].Tool != "bash" {
|
||||
t.Errorf("msg[0] tool: got %q, want %q", msgs[0].Tool, "bash")
|
||||
}
|
||||
if msgs[0].CallID != "call_1" {
|
||||
t.Errorf("msg[0] callID: got %q, want %q", msgs[0].CallID, "call_1")
|
||||
}
|
||||
if msgs[0].Input["command"] != "ls -la" {
|
||||
t.Errorf("msg[0] input: got %v", msgs[0].Input)
|
||||
}
|
||||
|
||||
// tool_result
|
||||
if msgs[1].Type != MessageToolResult {
|
||||
t.Errorf("msg[1] type: got %s, want tool-result", msgs[1].Type)
|
||||
}
|
||||
if msgs[1].CallID != "call_1" {
|
||||
t.Errorf("msg[1] callID: got %q", msgs[1].CallID)
|
||||
}
|
||||
if msgs[1].Output != "total 42\ndrwxr-xr-x" {
|
||||
t.Errorf("msg[1] output: got %q", msgs[1].Output)
|
||||
}
|
||||
|
||||
// text
|
||||
if msgs[2].Type != MessageText || msgs[2].Content != "Listed files." {
|
||||
t.Errorf("msg[2]: type=%s content=%q", msgs[2].Type, msgs[2].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingErrorEvent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Starting..."}`,
|
||||
`{"type":"error","text":"model not found: gpt-99"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "model not found: gpt-99" {
|
||||
t.Errorf("errMsg: got %q", res.errMsg)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[1].Type != MessageError {
|
||||
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingStepFinishUsage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"step_start"}`,
|
||||
`{"type":"text","text":"Done"}`,
|
||||
`{"type":"step_finish","usage":{"input":200,"output":100,"cacheRead":50,"cacheWrite":25}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.usage.InputTokens != 200 {
|
||||
t.Errorf("input tokens: got %d, want 200", res.usage.InputTokens)
|
||||
}
|
||||
if res.usage.OutputTokens != 100 {
|
||||
t.Errorf("output tokens: got %d, want 100", res.usage.OutputTokens)
|
||||
}
|
||||
if res.usage.CacheReadTokens != 50 {
|
||||
t.Errorf("cache read: got %d, want 50", res.usage.CacheReadTokens)
|
||||
}
|
||||
if res.usage.CacheWriteTokens != 25 {
|
||||
t.Errorf("cache write: got %d, want 25", res.usage.CacheWriteTokens)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingSessionID(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Hi","sessionId":"ses_stream_123"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.sessionID != "ses_stream_123" {
|
||||
t.Errorf("sessionID: got %q, want %q", res.sessionID, "ses_stream_123")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingMixedWithLogLines(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
"[info] initializing agent...",
|
||||
`{"type":"text","text":"Hello"}`,
|
||||
"[debug] tool exec completed",
|
||||
`{"type":"text","text":" world"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
if res.output != "Hello world" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "Hello world")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 text messages, got %d", len(msgs))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Lifecycle event tests ──
|
||||
|
||||
func TestOpenclawLifecycleErrorPhase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Working..."}`,
|
||||
`{"type":"lifecycle","phase":"error","text":"agent crashed unexpectedly"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "agent crashed unexpectedly" {
|
||||
t.Errorf("errMsg: got %q", res.errMsg)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[1].Type != MessageError {
|
||||
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawLifecycleFailedPhase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"lifecycle","phase":"failed","message":"timeout exceeded"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "timeout exceeded" {
|
||||
t.Errorf("errMsg: got %q, want %q", res.errMsg, "timeout exceeded")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawLifecycleCancelledPhase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"lifecycle","phase":"cancelled"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
// With no text/message/error, should get the default.
|
||||
if res.errMsg != "unknown openclaw error" {
|
||||
t.Errorf("errMsg: got %q", res.errMsg)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawLifecycleRunningPhaseIgnored(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"lifecycle","phase":"running"}`,
|
||||
`{"type":"text","text":"Hello"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── Structured error tests ──
|
||||
|
||||
func TestOpenclawStructuredErrorObject(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"error","error":{"name":"ModelNotFoundError","data":{"message":"model gpt-99 not available"}}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "model gpt-99 not available" {
|
||||
t.Errorf("errMsg: got %q, want %q", res.errMsg, "model gpt-99 not available")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStructuredErrorNameOnly(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"error","error":{"name":"AuthenticationError"}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.errMsg != "AuthenticationError" {
|
||||
t.Errorf("errMsg: got %q, want %q", res.errMsg, "AuthenticationError")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStructuredErrorMessageField(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"error","error":{"message":"rate limit exceeded"}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.errMsg != "rate limit exceeded" {
|
||||
t.Errorf("errMsg: got %q, want %q", res.errMsg, "rate limit exceeded")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── Usage field name variant tests ──
|
||||
|
||||
func TestOpenclawUsageAlternativeFieldNames(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Test PaperClip-style field names (inputTokens, outputTokens, etc.)
|
||||
data := map[string]any{
|
||||
"inputTokens": float64(500),
|
||||
"outputTokens": float64(200),
|
||||
"cachedInputTokens": float64(100),
|
||||
}
|
||||
usage := parseOpenclawUsage(data)
|
||||
|
||||
if usage.InputTokens != 500 {
|
||||
t.Errorf("InputTokens: got %d, want 500", usage.InputTokens)
|
||||
}
|
||||
if usage.OutputTokens != 200 {
|
||||
t.Errorf("OutputTokens: got %d, want 200", usage.OutputTokens)
|
||||
}
|
||||
if usage.CacheReadTokens != 100 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 100", usage.CacheReadTokens)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawUsageSnakeCaseFieldNames(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Test snake_case field names (Anthropic API style)
|
||||
data := map[string]any{
|
||||
"input_tokens": float64(300),
|
||||
"output_tokens": float64(150),
|
||||
"cache_read_input_tokens": float64(80),
|
||||
"cache_creation_input_tokens": float64(40),
|
||||
}
|
||||
usage := parseOpenclawUsage(data)
|
||||
|
||||
if usage.InputTokens != 300 {
|
||||
t.Errorf("InputTokens: got %d, want 300", usage.InputTokens)
|
||||
}
|
||||
if usage.OutputTokens != 150 {
|
||||
t.Errorf("OutputTokens: got %d, want 150", usage.OutputTokens)
|
||||
}
|
||||
if usage.CacheReadTokens != 80 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 80", usage.CacheReadTokens)
|
||||
}
|
||||
if usage.CacheWriteTokens != 40 {
|
||||
t.Errorf("CacheWriteTokens: got %d, want 40", usage.CacheWriteTokens)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawUsageOriginalFieldNames(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Test the original short field names (input, output, cacheRead, cacheWrite)
|
||||
data := map[string]any{
|
||||
"input": float64(100),
|
||||
"output": float64(50),
|
||||
"cacheRead": float64(10),
|
||||
"cacheWrite": float64(5),
|
||||
}
|
||||
usage := parseOpenclawUsage(data)
|
||||
|
||||
if usage.InputTokens != 100 {
|
||||
t.Errorf("InputTokens: got %d, want 100", usage.InputTokens)
|
||||
}
|
||||
if usage.OutputTokens != 50 {
|
||||
t.Errorf("OutputTokens: got %d, want 50", usage.OutputTokens)
|
||||
}
|
||||
if usage.CacheReadTokens != 10 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 10", usage.CacheReadTokens)
|
||||
}
|
||||
if usage.CacheWriteTokens != 5 {
|
||||
t.Errorf("CacheWriteTokens: got %d, want 5", usage.CacheWriteTokens)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawUsageAccumulationAcrossSteps(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"step_finish","usage":{"inputTokens":100,"outputTokens":50}}`,
|
||||
`{"type":"step_finish","usage":{"inputTokens":200,"outputTokens":80,"cachedInputTokens":60}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.usage.InputTokens != 300 {
|
||||
t.Errorf("InputTokens: got %d, want 300", res.usage.InputTokens)
|
||||
}
|
||||
if res.usage.OutputTokens != 130 {
|
||||
t.Errorf("OutputTokens: got %d, want 130", res.usage.OutputTokens)
|
||||
}
|
||||
if res.usage.CacheReadTokens != 60 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 60", res.usage.CacheReadTokens)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawUsageFinalResultAlternativeFields(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
result := openclawResult{
|
||||
Payloads: []openclawPayload{{Text: "Done"}},
|
||||
Meta: openclawMeta{
|
||||
DurationMs: 1000,
|
||||
AgentMeta: map[string]any{
|
||||
"usage": map[string]any{
|
||||
"inputTokens": float64(400),
|
||||
"outputTokens": float64(180),
|
||||
"cachedInputTokens": float64(90),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
|
||||
res := b.processOutput(strings.NewReader(string(data)), ch)
|
||||
|
||||
if res.usage.InputTokens != 400 {
|
||||
t.Errorf("InputTokens: got %d, want 400", res.usage.InputTokens)
|
||||
}
|
||||
if res.usage.OutputTokens != 180 {
|
||||
t.Errorf("OutputTokens: got %d, want 180", res.usage.OutputTokens)
|
||||
}
|
||||
if res.usage.CacheReadTokens != 90 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 90", res.usage.CacheReadTokens)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── openclawInt64 tests ──
|
||||
|
||||
func TestOpenclawInt64Float(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user