Compare commits

...

3 Commits

Author SHA1 Message Date
Jiang Bohan
9fe7340211 feat(agent): OpenClaw backend P1 — hardened parsing, lifecycle events, usage variants
Three robustness improvements to the OpenClaw agent backend:

1. Hardened JSON parsing — tryParseOpenclawResult now requires lines to
   start with '{', eliminating the fragile brace-scanning fallback that
   could false-match JSON fragments inside log lines.

2. Lifecycle event handling — new "lifecycle" event type with phase
   tracking (error/failed/cancelled). Structured error objects are now
   parsed (error.name, error.data.message, error.message) matching the
   PaperClip adapter pattern.

3. Usage field name variants — parseOpenclawUsage supports multiple
   naming conventions (input/inputTokens/input_tokens, cacheRead/
   cachedInputTokens/cache_read_input_tokens, etc.) so usage is
   correctly extracted regardless of which OpenClaw version or protocol
   the agent uses. Usage accumulates incrementally across step_finish
   events.

Adds 13 new tests (31 total for the openclaw backend).

Refs MUL-726
2026-04-14 01:47:48 +08:00
Jiang Bohan
584344e5bd Merge remote-tracking branch 'origin/agent/j/2b740415' into agent/j/96e2c59a 2026-04-14 01:43:26 +08:00
Jiang Bohan
4896128723 feat(agent): improve OpenClaw backend with streaming, tool use, and model args
P0 improvements to the OpenClaw agent backend, informed by PaperClip's
adapter architecture:

1. Streaming output — emit MessageText as NDJSON events arrive, instead
   of waiting for the final result blob. Users now see real-time progress.

2. Tool use support — parse and emit MessageToolUse / MessageToolResult
   from streaming events, matching the Claude and OpenCode backends.

3. Model & system prompt — pass --model and --system-prompt to the
   OpenClaw CLI when configured, previously silently ignored.

The implementation supports both the new streaming NDJSON format and the
legacy single-blob result format for backwards compatibility.

Closes MUL-726
2026-04-14 01:37:49 +08:00
2 changed files with 811 additions and 38 deletions

View File

@@ -38,6 +38,12 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
sessionID = fmt.Sprintf("multica-%d", time.Now().UnixNano())
}
args := []string{"agent", "--local", "--json", "--session-id", sessionID}
if opts.Model != "" {
args = append(args, "--model", opts.Model)
}
if opts.SystemPrompt != "" {
args = append(args, "--system-prompt", opts.SystemPrompt)
}
if opts.Timeout > 0 {
args = append(args, "--timeout", fmt.Sprintf("%d", int(opts.Timeout.Seconds())))
}
@@ -129,22 +135,108 @@ type openclawEventResult struct {
}
// processOutput reads the JSON output from openclaw --json stderr and returns
// the parsed result. OpenClaw writes its JSON result to stderr, which may also
// contain non-JSON log lines. We scan line-by-line so a final result line can
// be recognized without waiting for the entire stderr stream to be buffered.
// the parsed result. OpenClaw writes its JSON output to stderr, which may also
// contain non-JSON log lines. The stream may contain:
//
// - NDJSON streaming events (type: "text", "tool_use", "tool_result", "error",
// "step_start", "step_finish") — emitted in real time as the agent works
// - A final result JSON (with payloads + meta) — the legacy single-blob format
//
// We scan line-by-line, emitting messages as events arrive so streaming
// consumers get real-time feedback instead of waiting for the final blob.
func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclawEventResult {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
var output strings.Builder
var sessionID string
var usage TokenUsage
finalStatus := "completed"
var finalError string
gotEvents := false // true if we parsed at least one streaming event or result
var rawLines []string
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
if result, ok := tryParseOpenclawResult(line); ok {
return b.buildOpenclawEventResult(result, ch)
// Try parsing as a streaming NDJSON event first.
if event, ok := tryParseOpenclawEvent(line); ok {
gotEvents = true
if event.SessionID != "" {
sessionID = event.SessionID
}
switch event.Type {
case "text":
if event.Text != "" {
output.WriteString(event.Text)
trySend(ch, Message{Type: MessageText, Content: event.Text})
}
case "tool_use":
var input map[string]any
if event.Input != nil {
_ = json.Unmarshal(event.Input, &input)
}
trySend(ch, Message{
Type: MessageToolUse,
Tool: event.Tool,
CallID: event.CallID,
Input: input,
})
case "tool_result":
trySend(ch, Message{
Type: MessageToolResult,
Tool: event.Tool,
CallID: event.CallID,
Output: event.Text,
})
case "error":
errMsg := event.errorMessage()
b.cfg.Logger.Warn("openclaw error event", "error", errMsg)
trySend(ch, Message{Type: MessageError, Content: errMsg})
finalStatus = "failed"
finalError = errMsg
case "lifecycle":
phase := event.Phase
if phase == "error" || phase == "failed" || phase == "cancelled" {
errMsg := event.errorMessage()
b.cfg.Logger.Warn("openclaw lifecycle failure", "phase", phase, "error", errMsg)
trySend(ch, Message{Type: MessageError, Content: errMsg})
finalStatus = "failed"
finalError = errMsg
}
case "step_start":
trySend(ch, Message{Type: MessageStatus, Status: "running"})
case "step_finish":
if event.Usage != nil {
u := parseOpenclawUsage(event.Usage)
usage.InputTokens += u.InputTokens
usage.OutputTokens += u.OutputTokens
usage.CacheReadTokens += u.CacheReadTokens
usage.CacheWriteTokens += u.CacheWriteTokens
}
}
continue
}
// Try parsing as a final result blob (legacy format).
if result, ok := tryParseOpenclawResult(line); ok {
gotEvents = true
res := b.buildOpenclawEventResult(result, ch, &output)
if res.sessionID != "" {
sessionID = res.sessionID
}
// Prefer usage from the final result if no streaming events reported it.
u := res.usage
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
usage = u
}
continue
}
// Not JSON — treat as log line.
b.cfg.Logger.Debug("[openclaw:stderr] " + line)
rawLines = append(rawLines, line)
}
@@ -153,36 +245,65 @@ func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclaw
return openclawEventResult{status: "failed", errMsg: fmt.Sprintf("read stderr: %v", err)}
}
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
if trimmed != "" {
return openclawEventResult{status: "completed", output: trimmed}
// If we got no events at all, fall back to raw output.
if !gotEvents {
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
if trimmed != "" {
return openclawEventResult{status: "completed", output: trimmed}
}
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
}
return openclawEventResult{
status: finalStatus,
errMsg: finalError,
output: output.String(),
sessionID: sessionID,
usage: usage,
}
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
}
// tryParseOpenclawEvent attempts to parse a line as a streaming NDJSON event.
// Returns the event and true if the line is a valid event with a known type.
func tryParseOpenclawEvent(line string) (openclawEvent, bool) {
if len(line) == 0 || line[0] != '{' {
return openclawEvent{}, false
}
var event openclawEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
return openclawEvent{}, false
}
if event.Type == "" {
return openclawEvent{}, false
}
return event, true
}
// tryParseOpenclawResult attempts to parse a line as a final result blob
// (the legacy format with payloads + meta). Lines must start with '{' to be
// considered — we no longer scan for braces at arbitrary positions, which
// avoids false matches on log lines containing JSON fragments.
func tryParseOpenclawResult(raw string) (openclawResult, bool) {
// Try each '{' position until we find valid openclawResult JSON.
// Earlier '{' chars may appear in log/error lines (e.g. raw_params={...}).
var result openclawResult
for i := 0; i < len(raw); i++ {
if raw[i] != '{' {
continue
}
if err := json.Unmarshal([]byte(raw[i:]), &result); err == nil && (result.Payloads != nil || result.Meta.DurationMs > 0) {
return result, true
}
if len(raw) == 0 || raw[0] != '{' {
return openclawResult{}, false
}
return openclawResult{}, false
var result openclawResult
if err := json.Unmarshal([]byte(raw), &result); err != nil {
return openclawResult{}, false
}
if result.Payloads == nil && result.Meta.DurationMs == 0 {
return openclawResult{}, false
}
return result, true
}
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message) openclawEventResult {
var output strings.Builder
// buildOpenclawEventResult extracts text and metadata from a final result blob.
// Text payloads are appended to the shared output builder and emitted to ch.
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message, output *strings.Builder) openclawEventResult {
for _, p := range result.Payloads {
if p.Text != "" {
if output.Len() > 0 {
output.WriteString("\n")
}
output.WriteString(p.Text)
trySend(ch, Message{Type: MessageText, Content: p.Text})
}
}
@@ -193,17 +314,10 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
sessionID = sid
}
if u, ok := result.Meta.AgentMeta["usage"].(map[string]any); ok {
usage.InputTokens = openclawInt64(u, "input")
usage.OutputTokens = openclawInt64(u, "output")
usage.CacheReadTokens = openclawInt64(u, "cacheRead")
usage.CacheWriteTokens = openclawInt64(u, "cacheWrite")
usage = parseOpenclawUsage(u)
}
}
if output.Len() > 0 {
trySend(ch, Message{Type: MessageText, Content: output.String()})
}
return openclawEventResult{
status: "completed",
output: output.String(),
@@ -212,6 +326,33 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
}
}
// parseOpenclawUsage extracts token usage from a map, supporting multiple
// field name conventions used by different OpenClaw versions and PaperClip:
//
// input / inputTokens / input_tokens
// output / outputTokens / output_tokens
// cacheRead / cachedInputTokens / cached_input_tokens / cache_read
// cacheWrite / cacheCreationInputTokens / cache_creation_input_tokens / cache_write
func parseOpenclawUsage(data map[string]any) TokenUsage {
return TokenUsage{
InputTokens: openclawInt64FirstOf(data, "input", "inputTokens", "input_tokens"),
OutputTokens: openclawInt64FirstOf(data, "output", "outputTokens", "output_tokens"),
CacheReadTokens: openclawInt64FirstOf(data, "cacheRead", "cachedInputTokens", "cached_input_tokens", "cache_read", "cache_read_input_tokens"),
CacheWriteTokens: openclawInt64FirstOf(data, "cacheWrite", "cacheCreationInputTokens", "cache_creation_input_tokens", "cache_write"),
}
}
// openclawInt64FirstOf returns the first non-zero int64 value found under any
// of the given keys. This supports field name variants across protocol versions.
func openclawInt64FirstOf(data map[string]any, keys ...string) int64 {
for _, key := range keys {
if v := openclawInt64(data, key); v != 0 {
return v
}
}
return 0
}
// openclawInt64 safely extracts an int64 from a JSON-decoded map value (which
// may be float64 due to Go's JSON number handling).
func openclawInt64(data map[string]any, key string) int64 {
@@ -231,7 +372,73 @@ func openclawInt64(data map[string]any, key string) int64 {
// ── JSON types for `openclaw agent --json` output ──
// openclawResult represents the JSON output from `openclaw agent --json`.
// openclawEvent represents a single streaming NDJSON event from openclaw --json.
//
// Event types:
// - "text" — text output (text field)
// - "tool_use" — tool invocation (tool, callId, input)
// - "tool_result" — tool output (tool, callId, text)
// - "error" — error (text, or structured error object)
// - "lifecycle" — phase changes (phase: "error"/"failed"/"cancelled")
// - "step_start" — agent step begins
// - "step_finish" — agent step ends (usage)
type openclawEvent struct {
Type string `json:"type"`
SessionID string `json:"sessionId,omitempty"`
Text string `json:"text,omitempty"`
Tool string `json:"tool,omitempty"`
CallID string `json:"callId,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
Usage map[string]any `json:"usage,omitempty"`
Phase string `json:"phase,omitempty"` // lifecycle event phase
Error *openclawError `json:"error,omitempty"` // structured error object
Message string `json:"message,omitempty"` // alternative error message field
}
// errorMessage extracts a human-readable error message from the event,
// checking multiple fields: structured error object, text, message, or fallback.
func (e openclawEvent) errorMessage() string {
if e.Error != nil {
if msg := e.Error.message(); msg != "" {
return msg
}
}
if e.Text != "" {
return e.Text
}
if e.Message != "" {
return e.Message
}
return "unknown openclaw error"
}
// openclawError represents a structured error in an openclaw event,
// compatible with PaperClip's error format (name + data.message).
type openclawError struct {
Name string `json:"name,omitempty"`
Data *openclawErrorData `json:"data,omitempty"`
Message string `json:"message,omitempty"`
}
func (e *openclawError) message() string {
if e.Data != nil && e.Data.Message != "" {
return e.Data.Message
}
if e.Message != "" {
return e.Message
}
if e.Name != "" {
return e.Name
}
return ""
}
type openclawErrorData struct {
Message string `json:"message,omitempty"`
}
// openclawResult represents the final JSON output from `openclaw agent --json`
// (the legacy single-blob format with payloads + meta).
type openclawResult struct {
Payloads []openclawPayload `json:"payloads"`
Meta openclawMeta `json:"meta"`

View File

@@ -18,7 +18,7 @@ func TestNewReturnsOpenclawBackend(t *testing.T) {
}
}
// ── processOutput tests ──
// ── Legacy result format tests (processOutput with final JSON blob) ──
func TestOpenclawProcessOutputHappyPath(t *testing.T) {
t.Parallel()
@@ -90,11 +90,24 @@ func TestOpenclawProcessOutputMultiplePayloads(t *testing.T) {
res := b.processOutput(strings.NewReader(string(data)), ch)
if res.output != "First\nSecond" {
t.Errorf("output: got %q, want %q", res.output, "First\nSecond")
if res.output != "FirstSecond" {
t.Errorf("output: got %q, want %q", res.output, "FirstSecond")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 text messages, got %d", len(msgs))
}
if msgs[0].Content != "First" {
t.Errorf("msg[0]: got %q, want %q", msgs[0].Content, "First")
}
if msgs[1].Content != "Second" {
t.Errorf("msg[1]: got %q, want %q", msgs[1].Content, "Second")
}
}
func TestOpenclawProcessOutputEmptyPayloads(t *testing.T) {
@@ -238,7 +251,8 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
Meta: openclawMeta{DurationMs: 500},
}
data, _ := json.Marshal(result)
// Simulate error line containing braces before the real JSON (the exact bug scenario)
// Log line with braces should NOT be parsed as JSON — only lines starting
// with '{' are considered. The result blob on its own line is still parsed.
input := `[tools] exec failed: complex interpreter invocation detected. raw_params={"command":"echo hello"}` + "\n" + string(data)
res := b.processOutput(strings.NewReader(input), ch)
@@ -253,6 +267,558 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
close(ch)
}
func TestOpenclawResultBlobWithLeadingPrefixRejected(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
// A line with a prefix before the JSON should NOT be parsed as a result.
// This tests that the hardened parser rejects non-'{'-starting lines.
result := openclawResult{
Payloads: []openclawPayload{{Text: "Should not match"}},
Meta: openclawMeta{DurationMs: 500},
}
data, _ := json.Marshal(result)
input := "some prefix " + string(data)
res := b.processOutput(strings.NewReader(input), ch)
// Should fall back to raw output since the JSON has a prefix.
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
if res.output != input {
t.Errorf("output: got %q, want raw input back", res.output)
}
close(ch)
}
// ── Streaming NDJSON event tests ──
func TestOpenclawStreamingTextEvents(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Hello "}`,
`{"type":"text","text":"world"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
if res.output != "Hello world" {
t.Errorf("output: got %q, want %q", res.output, "Hello world")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[0].Type != MessageText || msgs[0].Content != "Hello " {
t.Errorf("msg[0]: type=%s content=%q", msgs[0].Type, msgs[0].Content)
}
if msgs[1].Type != MessageText || msgs[1].Content != "world" {
t.Errorf("msg[1]: type=%s content=%q", msgs[1].Type, msgs[1].Content)
}
}
func TestOpenclawStreamingToolUseEvents(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"tool_use","tool":"bash","callId":"call_1","input":{"command":"ls -la"}}`,
`{"type":"tool_result","tool":"bash","callId":"call_1","text":"total 42\ndrwxr-xr-x"}`,
`{"type":"text","text":"Listed files."}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 3 {
t.Fatalf("expected 3 messages, got %d", len(msgs))
}
// tool_use
if msgs[0].Type != MessageToolUse {
t.Errorf("msg[0] type: got %s, want tool-use", msgs[0].Type)
}
if msgs[0].Tool != "bash" {
t.Errorf("msg[0] tool: got %q, want %q", msgs[0].Tool, "bash")
}
if msgs[0].CallID != "call_1" {
t.Errorf("msg[0] callID: got %q, want %q", msgs[0].CallID, "call_1")
}
if msgs[0].Input["command"] != "ls -la" {
t.Errorf("msg[0] input: got %v", msgs[0].Input)
}
// tool_result
if msgs[1].Type != MessageToolResult {
t.Errorf("msg[1] type: got %s, want tool-result", msgs[1].Type)
}
if msgs[1].CallID != "call_1" {
t.Errorf("msg[1] callID: got %q", msgs[1].CallID)
}
if msgs[1].Output != "total 42\ndrwxr-xr-x" {
t.Errorf("msg[1] output: got %q", msgs[1].Output)
}
// text
if msgs[2].Type != MessageText || msgs[2].Content != "Listed files." {
t.Errorf("msg[2]: type=%s content=%q", msgs[2].Type, msgs[2].Content)
}
}
func TestOpenclawStreamingErrorEvent(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Starting..."}`,
`{"type":"error","text":"model not found: gpt-99"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "model not found: gpt-99" {
t.Errorf("errMsg: got %q", res.errMsg)
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[1].Type != MessageError {
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
}
}
func TestOpenclawStreamingStepFinishUsage(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"step_start"}`,
`{"type":"text","text":"Done"}`,
`{"type":"step_finish","usage":{"input":200,"output":100,"cacheRead":50,"cacheWrite":25}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.usage.InputTokens != 200 {
t.Errorf("input tokens: got %d, want 200", res.usage.InputTokens)
}
if res.usage.OutputTokens != 100 {
t.Errorf("output tokens: got %d, want 100", res.usage.OutputTokens)
}
if res.usage.CacheReadTokens != 50 {
t.Errorf("cache read: got %d, want 50", res.usage.CacheReadTokens)
}
if res.usage.CacheWriteTokens != 25 {
t.Errorf("cache write: got %d, want 25", res.usage.CacheWriteTokens)
}
close(ch)
}
func TestOpenclawStreamingSessionID(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Hi","sessionId":"ses_stream_123"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.sessionID != "ses_stream_123" {
t.Errorf("sessionID: got %q, want %q", res.sessionID, "ses_stream_123")
}
close(ch)
}
func TestOpenclawStreamingMixedWithLogLines(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
"[info] initializing agent...",
`{"type":"text","text":"Hello"}`,
"[debug] tool exec completed",
`{"type":"text","text":" world"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
if res.output != "Hello world" {
t.Errorf("output: got %q, want %q", res.output, "Hello world")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 text messages, got %d", len(msgs))
}
}
// ── Lifecycle event tests ──
func TestOpenclawLifecycleErrorPhase(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Working..."}`,
`{"type":"lifecycle","phase":"error","text":"agent crashed unexpectedly"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "agent crashed unexpectedly" {
t.Errorf("errMsg: got %q", res.errMsg)
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[1].Type != MessageError {
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
}
}
func TestOpenclawLifecycleFailedPhase(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"lifecycle","phase":"failed","message":"timeout exceeded"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "timeout exceeded" {
t.Errorf("errMsg: got %q, want %q", res.errMsg, "timeout exceeded")
}
close(ch)
}
func TestOpenclawLifecycleCancelledPhase(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"lifecycle","phase":"cancelled"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
// With no text/message/error, should get the default.
if res.errMsg != "unknown openclaw error" {
t.Errorf("errMsg: got %q", res.errMsg)
}
close(ch)
}
func TestOpenclawLifecycleRunningPhaseIgnored(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"lifecycle","phase":"running"}`,
`{"type":"text","text":"Hello"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
close(ch)
}
// ── Structured error tests ──
func TestOpenclawStructuredErrorObject(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"error","error":{"name":"ModelNotFoundError","data":{"message":"model gpt-99 not available"}}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "model gpt-99 not available" {
t.Errorf("errMsg: got %q, want %q", res.errMsg, "model gpt-99 not available")
}
close(ch)
}
func TestOpenclawStructuredErrorNameOnly(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"error","error":{"name":"AuthenticationError"}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.errMsg != "AuthenticationError" {
t.Errorf("errMsg: got %q, want %q", res.errMsg, "AuthenticationError")
}
close(ch)
}
func TestOpenclawStructuredErrorMessageField(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"error","error":{"message":"rate limit exceeded"}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.errMsg != "rate limit exceeded" {
t.Errorf("errMsg: got %q, want %q", res.errMsg, "rate limit exceeded")
}
close(ch)
}
// ── Usage field name variant tests ──
func TestOpenclawUsageAlternativeFieldNames(t *testing.T) {
t.Parallel()
// Test PaperClip-style field names (inputTokens, outputTokens, etc.)
data := map[string]any{
"inputTokens": float64(500),
"outputTokens": float64(200),
"cachedInputTokens": float64(100),
}
usage := parseOpenclawUsage(data)
if usage.InputTokens != 500 {
t.Errorf("InputTokens: got %d, want 500", usage.InputTokens)
}
if usage.OutputTokens != 200 {
t.Errorf("OutputTokens: got %d, want 200", usage.OutputTokens)
}
if usage.CacheReadTokens != 100 {
t.Errorf("CacheReadTokens: got %d, want 100", usage.CacheReadTokens)
}
}
func TestOpenclawUsageSnakeCaseFieldNames(t *testing.T) {
t.Parallel()
// Test snake_case field names (Anthropic API style)
data := map[string]any{
"input_tokens": float64(300),
"output_tokens": float64(150),
"cache_read_input_tokens": float64(80),
"cache_creation_input_tokens": float64(40),
}
usage := parseOpenclawUsage(data)
if usage.InputTokens != 300 {
t.Errorf("InputTokens: got %d, want 300", usage.InputTokens)
}
if usage.OutputTokens != 150 {
t.Errorf("OutputTokens: got %d, want 150", usage.OutputTokens)
}
if usage.CacheReadTokens != 80 {
t.Errorf("CacheReadTokens: got %d, want 80", usage.CacheReadTokens)
}
if usage.CacheWriteTokens != 40 {
t.Errorf("CacheWriteTokens: got %d, want 40", usage.CacheWriteTokens)
}
}
func TestOpenclawUsageOriginalFieldNames(t *testing.T) {
t.Parallel()
// Test the original short field names (input, output, cacheRead, cacheWrite)
data := map[string]any{
"input": float64(100),
"output": float64(50),
"cacheRead": float64(10),
"cacheWrite": float64(5),
}
usage := parseOpenclawUsage(data)
if usage.InputTokens != 100 {
t.Errorf("InputTokens: got %d, want 100", usage.InputTokens)
}
if usage.OutputTokens != 50 {
t.Errorf("OutputTokens: got %d, want 50", usage.OutputTokens)
}
if usage.CacheReadTokens != 10 {
t.Errorf("CacheReadTokens: got %d, want 10", usage.CacheReadTokens)
}
if usage.CacheWriteTokens != 5 {
t.Errorf("CacheWriteTokens: got %d, want 5", usage.CacheWriteTokens)
}
}
func TestOpenclawUsageAccumulationAcrossSteps(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"step_finish","usage":{"inputTokens":100,"outputTokens":50}}`,
`{"type":"step_finish","usage":{"inputTokens":200,"outputTokens":80,"cachedInputTokens":60}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.usage.InputTokens != 300 {
t.Errorf("InputTokens: got %d, want 300", res.usage.InputTokens)
}
if res.usage.OutputTokens != 130 {
t.Errorf("OutputTokens: got %d, want 130", res.usage.OutputTokens)
}
if res.usage.CacheReadTokens != 60 {
t.Errorf("CacheReadTokens: got %d, want 60", res.usage.CacheReadTokens)
}
close(ch)
}
func TestOpenclawUsageFinalResultAlternativeFields(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
result := openclawResult{
Payloads: []openclawPayload{{Text: "Done"}},
Meta: openclawMeta{
DurationMs: 1000,
AgentMeta: map[string]any{
"usage": map[string]any{
"inputTokens": float64(400),
"outputTokens": float64(180),
"cachedInputTokens": float64(90),
},
},
},
}
data, _ := json.Marshal(result)
res := b.processOutput(strings.NewReader(string(data)), ch)
if res.usage.InputTokens != 400 {
t.Errorf("InputTokens: got %d, want 400", res.usage.InputTokens)
}
if res.usage.OutputTokens != 180 {
t.Errorf("OutputTokens: got %d, want 180", res.usage.OutputTokens)
}
if res.usage.CacheReadTokens != 90 {
t.Errorf("CacheReadTokens: got %d, want 90", res.usage.CacheReadTokens)
}
close(ch)
}
// ── openclawInt64 tests ──
func TestOpenclawInt64Float(t *testing.T) {