Compare commits

..

9 Commits

Author SHA1 Message Date
Jiang Bohan
f3355049bc feat(agent): add live log support for Gemini CLI via stream-json
Switch Gemini backend from `-o text` (batch output) to `-o stream-json`
(NDJSON streaming) so tool calls, text, and errors are forwarded to the
UI in real time instead of collected at the end.

Parses all Gemini stream-json event types: init, message, tool_use,
tool_result, error, and result — including per-model token usage from
the result stats.
2026-04-14 14:17:13 +08:00
Naiyuan Qing
dca86acc69 Merge pull request #938 from 1WorldCapture/fix/lyo-7-description-click-focus
fix(views): focus description editor when clicking empty area
2026-04-14 14:06:32 +08:00
Bohan Jiang
c71525e198 Merge pull request #910 from multica-ai/agent/j/openclaw-p0-p1
feat(agent): OpenClaw backend P0+P1 improvements
2026-04-14 14:02:38 +08:00
devv-eve
977dc6479d fix(daemon): prevent task stall when agent process hangs on stdout (#947)
When an agent CLI process hangs (e.g. a tool call blocks on unreachable
I/O), the daemon's scanner blocks indefinitely on stdout, preventing the
Result from ever being sent. This causes tasks to stay in "running"
state permanently with no further events.

Three-layer fix:

1. Agent backends (claude, opencode, openclaw, gemini): add a watchdog
   goroutine that closes the stdout/stderr pipe when the context is
   cancelled, forcing the scanner to unblock. Also set cmd.WaitDelay
   so Go force-closes pipes after 10s if the process doesn't exit.

2. daemon executeAndDrain: add an independent drain timeout (backend
   timeout + 30s buffer) with context-aware select on both the message
   channel and the result channel, so the daemon never blocks forever.

3. daemon ping path: add context-aware select so pings don't deadlock
   if the agent backend stalls.

Closes #925

Co-authored-by: Devv <devv@Devvs-Mac-mini.local>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-13 23:00:27 -07:00
Jiayuan Zhang
a97bd3da0b fix(auth): support non-localhost CLI callback for self-hosted VMs (#944)
The CLI auth callback was hardcoded to localhost, breaking self-hosted
setups where the browser runs on a different machine than the CLI.

- CLI: derive callback host from configured app URL; bind to 0.0.0.0
  when the app URL is not localhost so remote browsers can reach it
- Frontend: expand validateCliCallback to accept RFC 1918 private IPs
  (10.x, 172.16-31.x, 192.168.x) in addition to localhost

Closes #923
2026-04-14 13:50:02 +08:00
Jiayuan Zhang
9dfe119f47 fix(daemon): use runtime's owner_id for agent migration on upgrade (#941)
* fix(daemon): prevent duplicate runtime registration on profile switch

The daemon_id included a profile name suffix (e.g. "hostname-staging"),
so switching profiles created a new daemon_id that bypassed the UPSERT
dedup constraint, leaving orphaned runtime records in the database.

Three changes:
- Remove profile suffix from daemon_id — use stable hostname only.
  The unique constraint (workspace_id, daemon_id, provider) already
  prevents collisions within the same workspace.
- Auto-migrate agents from old offline runtimes to the newly registered
  runtime during DaemonRegister (same workspace/provider/owner).
- Add TTL-based GC in the runtime sweeper to delete offline runtimes
  with no active agents after 7 days.

Closes MUL-695

* fix(daemon): address code review issues on PR #906

1. Move gcRuntimes() to the main sweep loop — previously it was inside
   sweepStaleRuntimes() after an early return, so it only ran when new
   runtimes were marked stale. Now it runs every sweep cycle independently.

2. Fix DeleteStaleOfflineRuntimes to exclude runtimes with ANY agent
   reference (not just active ones). The FK agent.runtime_id is ON DELETE
   RESTRICT, so archived agents also block deletion.

3. Scope MigrateAgentsToRuntime to the same machine by matching
   daemon_id LIKE '<current_daemon_id>-%'. This prevents cross-machine
   agent migration when the same user has multiple devices.

* fix(daemon): use runtime's owner_id for agent migration, not caller's

The migration was gated on ownerID.Valid which is only true for PAT/JWT
registrations. Daemon token registrations (the common case for background
daemon restarts) had ownerID as zero, skipping migration entirely.

Fix: use registered.OwnerID (preserved via COALESCE on upsert) instead
of the caller's ownerID. This ensures migration runs even when the daemon
re-registers via daemon token after an upgrade.
2026-04-14 13:42:27 +08:00
Lyon Liang
418049856f merge: resolve conflicts with upstream/main 2026-04-14 12:49:40 +08:00
Lyon Liang
9170b01739 fix(views): focus description editor when clicking empty area 2026-04-14 11:04:58 +08:00
Jiang Bohan
a0d43ca31a feat(agent): OpenClaw backend P0+P1 improvements
Combined P0 and P1 improvements to the OpenClaw agent backend, informed
by PaperClip's adapter architecture:

P0 — User experience:
- Streaming output — emit MessageText as NDJSON events arrive in real
  time, instead of waiting for the final result blob
- Tool use support — parse and emit MessageToolUse/MessageToolResult
  from streaming events, matching Claude and OpenCode backends
- Model & system prompt — pass --model and --system-prompt to the
  OpenClaw CLI when configured

P1 — Robustness:
- Hardened JSON parsing — tryParseOpenclawResult requires lines to
  start with '{', eliminating fragile brace-scanning that could
  false-match JSON fragments in log lines
- Lifecycle event handling — new "lifecycle" event type with phase
  tracking (error/failed/cancelled), plus structured error objects
  (error.name, error.data.message) matching PaperClip's pattern
- Usage field name variants — parseOpenclawUsage supports multiple
  naming conventions (input/inputTokens/input_tokens, cacheRead/
  cachedInputTokens/cache_read_input_tokens, etc.) with incremental
  accumulation across step_finish events

Backwards compatible with the legacy single JSON blob format.
31 tests covering all new functionality.

Closes MUL-726
2026-04-14 01:52:03 +08:00
14 changed files with 1250 additions and 160 deletions

View File

@@ -726,12 +726,34 @@ describe("validateCliCallback", () => {
expect(validateCliCallback("http://127.0.0.1:8080/cb")).toBe(true);
});
it("accepts 10.x.x.x private IPs", () => {
expect(validateCliCallback("http://10.0.0.5:9876/callback")).toBe(true);
expect(validateCliCallback("http://10.255.255.255:1234/cb")).toBe(true);
});
it("accepts 172.16-31.x.x private IPs", () => {
expect(validateCliCallback("http://172.16.0.1:9876/callback")).toBe(true);
expect(validateCliCallback("http://172.31.255.255:1234/cb")).toBe(true);
});
it("rejects 172.x outside 16-31 range", () => {
expect(validateCliCallback("http://172.15.0.1:9876/callback")).toBe(false);
expect(validateCliCallback("http://172.32.0.1:9876/callback")).toBe(false);
});
it("accepts 192.168.x.x private IPs", () => {
expect(validateCliCallback("http://192.168.1.131:41117/callback")).toBe(true);
expect(validateCliCallback("http://192.168.0.1:8080/cb")).toBe(true);
});
it("rejects https:// URLs", () => {
expect(validateCliCallback("https://localhost:9876/callback")).toBe(false);
});
it("rejects non-localhost hosts", () => {
it("rejects public IPs and domains", () => {
expect(validateCliCallback("http://evil.com:9876/callback")).toBe(false);
expect(validateCliCallback("http://8.8.8.8:9876/callback")).toBe(false);
expect(validateCliCallback("http://192.169.1.1:9876/callback")).toBe(false);
});
it("rejects invalid URLs", () => {

View File

@@ -68,14 +68,22 @@ function redirectToCliCallback(url: string, token: string, state: string) {
window.location.href = `${url}${separator}token=${encodeURIComponent(token)}&state=${encodeURIComponent(state)}`;
}
/** Validate that a CLI callback URL points to localhost over HTTP. */
/**
* Validate that a CLI callback URL points to a safe host over HTTP.
* Allows localhost and private/LAN IPs (RFC 1918) to support self-hosted setups
* on local VMs while blocking arbitrary public hosts.
*/
export function validateCliCallback(cliCallback: string): boolean {
try {
const cbUrl = new URL(cliCallback);
if (cbUrl.protocol !== "http:") return false;
if (cbUrl.hostname !== "localhost" && cbUrl.hostname !== "127.0.0.1")
return false;
return true;
const h = cbUrl.hostname;
if (h === "localhost" || h === "127.0.0.1") return true;
// Allow RFC 1918 private IPs: 10.x.x.x, 172.16-31.x.x, 192.168.x.x
if (/^10\./.test(h)) return true;
if (/^172\.(1[6-9]|2\d|3[01])\./.test(h)) return true;
if (/^192\.168\./.test(h)) return true;
return false;
} catch {
return false;
}

View File

@@ -28,6 +28,7 @@
.rich-text-editor.ProseMirror {
color: var(--foreground);
caret-color: var(--foreground);
min-height: 100%;
}
.rich-text-editor.ProseMirror:focus {

View File

@@ -0,0 +1,73 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { fireEvent, render, screen } from "@testing-library/react";
const mockFocus = vi.hoisted(() => vi.fn());
vi.mock("@tanstack/react-query", () => ({
useQueryClient: () => ({}),
}));
vi.mock("./extensions", () => ({
createEditorExtensions: () => [],
}));
vi.mock("./extensions/file-upload", () => ({
uploadAndInsertFile: vi.fn(),
}));
vi.mock("./utils/preprocess", () => ({
preprocessMarkdown: (value: string) => value,
}));
vi.mock("./bubble-menu", () => ({
EditorBubbleMenu: () => null,
}));
vi.mock("@tiptap/react", () => ({
useEditor: () => ({
commands: {
focus: mockFocus,
clearContent: vi.fn(),
},
getMarkdown: () => "",
state: {
doc: {
content: {
size: 0,
},
},
},
}),
EditorContent: ({ className }: { className?: string }) => (
<div className={className} data-testid="editor-content">
<div className="ProseMirror rich-text-editor" data-testid="prosemirror" />
</div>
),
}));
import { ContentEditor } from "./content-editor";
describe("ContentEditor", () => {
beforeEach(() => {
vi.clearAllMocks();
});
it("focuses the editor when clicking the empty container area", () => {
render(<ContentEditor placeholder="Add description..." />);
const shell = screen.getByTestId("editor-content").parentElement;
expect(shell).not.toBeNull();
fireEvent.mouseDown(shell!);
expect(mockFocus).toHaveBeenCalledWith("end");
});
it("does not hijack clicks that land inside the ProseMirror node", () => {
render(<ContentEditor placeholder="Add description..." />);
fireEvent.mouseDown(screen.getByTestId("prosemirror"));
expect(mockFocus).not.toHaveBeenCalled();
});
});

View File

@@ -30,6 +30,7 @@ import {
useEffect,
useImperativeHandle,
useRef,
type MouseEvent as ReactMouseEvent,
} from "react";
import { useEditor, EditorContent } from "@tiptap/react";
import { cn } from "@multica/ui/lib/utils";
@@ -222,11 +223,25 @@ const ContentEditor = forwardRef<ContentEditorRef, ContentEditorProps>(
},
}));
const handleContainerMouseDown = (event: ReactMouseEvent<HTMLDivElement>) => {
if (!editable || !editor) return;
const target = event.target as HTMLElement;
if (target.closest(".ProseMirror")) return;
if (target.closest("a, button, input, textarea, [role='button'], [data-node-view-wrapper]")) return;
event.preventDefault();
editor.commands.focus("end");
};
if (!editor) return null;
return (
<div className="relative min-h-full">
<EditorContent editor={editor} />
<div
className="relative flex min-h-full flex-col"
onMouseDown={handleContainerMouseDown}
>
<EditorContent className="flex-1 min-h-full" editor={editor} />
{editable && (
<>
<EditorBubbleMenu editor={editor} />

View File

@@ -158,7 +158,7 @@ function EditorLinkPreview({ editor }: { editor: Editor }) {
const virtualRef = useRef({
getBoundingClientRect: () => new DOMRect(),
contextElement: editor.view.dom,
contextElement: editor.view?.dom,
});
const { refs, floatingStyles, isPositioned, update } = useFloating({
@@ -171,11 +171,20 @@ function EditorLinkPreview({ editor }: { editor: Editor }) {
});
useEffect(() => {
if (typeof editor.on !== "function" || typeof editor.off !== "function") {
return;
}
const check = () => {
const view = editor.view;
if (!editor.isEditable) {
setVisible(false);
return;
}
if (!view?.dom || typeof view.coordsAtPos !== "function") {
setVisible(false);
return;
}
if (!editor.state.selection.empty || !editor.isActive("link")) {
setVisible(false);
return;
@@ -186,11 +195,11 @@ function EditorLinkPreview({ editor }: { editor: Editor }) {
return;
}
const coords = editor.view.coordsAtPos(editor.state.selection.from);
const coords = view.coordsAtPos(editor.state.selection.from);
virtualRef.current = {
getBoundingClientRect: () =>
new DOMRect(coords.left, coords.top, 0, coords.bottom - coords.top),
contextElement: editor.view.dom,
contextElement: view.dom,
};
setHref(linkHref);
@@ -204,11 +213,12 @@ function EditorLinkPreview({ editor }: { editor: Editor }) {
// Close on any ancestor scroll or window resize
useEffect(() => {
if (!visible) return;
const editorDom = editor.view?.dom;
if (!visible || !editorDom) return;
const close = () => {
setVisible(false);
};
const ancestors = getOverflowAncestors(editor.view.dom);
const ancestors = getOverflowAncestors(editorDom);
ancestors.forEach((el) => el.addEventListener("scroll", close, { passive: true }));
window.addEventListener("resize", close);
return () => {

View File

@@ -98,15 +98,28 @@ func runAuthLoginBrowser(cmd *cobra.Command) error {
serverURL := resolveServerURL(cmd)
appURL := resolveAppURL(cmd)
// Determine the callback host from the configured app URL.
// For self-hosted setups where the browser is on a different machine,
// we need to use the server's reachable hostname instead of localhost.
callbackHost := "localhost"
bindAddr := "127.0.0.1"
if parsed, err := url.Parse(appURL); err == nil {
h := parsed.Hostname()
if h != "" && h != "localhost" && h != "127.0.0.1" {
callbackHost = h
bindAddr = "0.0.0.0"
}
}
// Start a local HTTP server on a random port to receive the callback.
listener, err := net.Listen("tcp", "127.0.0.1:0")
listener, err := net.Listen("tcp", bindAddr+":0")
if err != nil {
return fmt.Errorf("failed to start local server: %w", err)
}
defer listener.Close()
port := listener.Addr().(*net.TCPAddr).Port
callbackURL := fmt.Sprintf("http://localhost:%d/callback", port)
callbackURL := fmt.Sprintf("http://%s:%d/callback", callbackHost, port)
// Generate a random state parameter for CSRF protection.
stateBytes := make([]byte, 16)

View File

@@ -525,7 +525,18 @@ func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) {
}
}()
result := <-session.Result
var result agent.Result
select {
case result = <-session.Result:
case <-pingCtx.Done():
d.logger.Warn("ping timed out waiting for result", "runtime_id", rt.ID, "ping_id", pingID)
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
"status": "failed",
"error": "ping context cancelled while waiting for result",
"duration_ms": time.Since(start).Milliseconds(),
})
return
}
durationMs := time.Since(start).Milliseconds()
if result.Status == "completed" {
@@ -1078,6 +1089,17 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
return agent.Result{}, 0, err
}
// Create an independent drain deadline so we don't block forever if the
// backend's internal timeout fails to produce a Result (e.g. scanner
// stuck on a hung stdout pipe). The extra 30 s gives the backend time
// to clean up after its own timeout fires.
drainTimeout := opts.Timeout + 30*time.Second
if opts.Timeout == 0 {
drainTimeout = 21 * time.Minute
}
drainCtx, drainCancel := context.WithTimeout(ctx, drainTimeout)
defer drainCancel()
var toolCount atomic.Int32
go func() {
var seq atomic.Int32
@@ -1135,77 +1157,92 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
}
}()
for msg := range session.Messages {
switch msg.Type {
case agent.MessageToolUse:
n := toolCount.Add(1)
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
if msg.CallID != "" {
for {
select {
case msg, ok := <-session.Messages:
if !ok {
goto drainDone
}
switch msg.Type {
case agent.MessageToolUse:
n := toolCount.Add(1)
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
if msg.CallID != "" {
mu.Lock()
callIDToTool[msg.CallID] = msg.Tool
mu.Unlock()
}
s := seq.Add(1)
mu.Lock()
callIDToTool[msg.CallID] = msg.Tool
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "tool_use",
Tool: msg.Tool,
Input: msg.Input,
})
mu.Unlock()
case agent.MessageToolResult:
s := seq.Add(1)
output := msg.Output
if len(output) > 8192 {
output = output[:8192]
}
toolName := msg.Tool
if toolName == "" && msg.CallID != "" {
mu.Lock()
toolName = callIDToTool[msg.CallID]
mu.Unlock()
}
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "tool_result",
Tool: toolName,
Output: output,
})
mu.Unlock()
case agent.MessageThinking:
if msg.Content != "" {
mu.Lock()
pendingThinking.WriteString(msg.Content)
mu.Unlock()
}
case agent.MessageText:
if msg.Content != "" {
taskLog.Debug("agent", "text", truncateLog(msg.Content, 200))
mu.Lock()
pendingText.WriteString(msg.Content)
mu.Unlock()
}
case agent.MessageError:
taskLog.Error("agent error", "content", msg.Content)
s := seq.Add(1)
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "error",
Content: msg.Content,
})
mu.Unlock()
}
s := seq.Add(1)
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "tool_use",
Tool: msg.Tool,
Input: msg.Input,
})
mu.Unlock()
case agent.MessageToolResult:
s := seq.Add(1)
output := msg.Output
if len(output) > 8192 {
output = output[:8192]
}
toolName := msg.Tool
if toolName == "" && msg.CallID != "" {
mu.Lock()
toolName = callIDToTool[msg.CallID]
mu.Unlock()
}
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "tool_result",
Tool: toolName,
Output: output,
})
mu.Unlock()
case agent.MessageThinking:
if msg.Content != "" {
mu.Lock()
pendingThinking.WriteString(msg.Content)
mu.Unlock()
}
case agent.MessageText:
if msg.Content != "" {
taskLog.Debug("agent", "text", truncateLog(msg.Content, 200))
mu.Lock()
pendingText.WriteString(msg.Content)
mu.Unlock()
}
case agent.MessageError:
taskLog.Error("agent error", "content", msg.Content)
s := seq.Add(1)
mu.Lock()
batch = append(batch, TaskMessageData{
Seq: int(s),
Type: "error",
Content: msg.Content,
})
mu.Unlock()
case <-drainCtx.Done():
goto drainDone
}
}
drainDone:
close(done)
flush()
}()
result := <-session.Result
return result, toolCount.Load(), nil
select {
case result := <-session.Result:
return result, toolCount.Load(), nil
case <-drainCtx.Done():
return agent.Result{
Status: "timeout",
Error: "agent did not produce result within drain timeout",
}, toolCount.Load(), nil
}
}
func mergeUsage(a, b map[string]agent.TokenUsage) map[string]agent.TokenUsage {

View File

@@ -37,6 +37,7 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
args := buildClaudeArgs(opts)
cmd := exec.CommandContext(runCtx, execPath, args...)
cmd.WaitDelay = 10 * time.Second
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
@@ -90,6 +91,12 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
var finalError string
usage := make(map[string]TokenUsage)
// Close stdout when the context is cancelled so scanner.Scan() unblocks.
go func() {
<-runCtx.Done()
_ = stdout.Close()
}()
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)

View File

@@ -3,20 +3,15 @@ package agent
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os/exec"
"strings"
"time"
)
// geminiBackend implements Backend by spawning the Google Gemini CLI
// (`gemini -p <prompt> --yolo -o text`) and collecting its stdout.
//
// This is a minimal v1 implementation — it captures the final text
// response but does not stream tool calls in real time. Follow-ups
// can move to `-o stream-json` and parse Gemini's event schema once
// we have a reliable reproduction of its output format.
// with `--output-format stream-json` and parsing its NDJSON event stream.
type geminiBackend struct {
cfg Config
}
@@ -39,6 +34,7 @@ func (b *geminiBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
args := buildGeminiArgs(prompt, opts)
cmd := exec.CommandContext(runCtx, execPath, args...)
cmd.WaitDelay = 10 * time.Second
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
@@ -58,75 +54,196 @@ func (b *geminiBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
b.cfg.Logger.Info("gemini started", "pid", cmd.Process.Pid, "cwd", opts.Cwd, "model", opts.Model)
msgCh := make(chan Message, 16)
msgCh := make(chan Message, 256)
resCh := make(chan Result, 1)
// Close stdout when the context is cancelled so scanner.Scan() unblocks.
go func() {
<-runCtx.Done()
_ = stdout.Close()
}()
go func() {
defer cancel()
defer close(msgCh)
defer close(resCh)
startTime := time.Now()
output, readErr := io.ReadAll(bufio.NewReader(stdout))
var output strings.Builder
var sessionID string
finalStatus := "completed"
var finalError string
usage := make(map[string]TokenUsage)
// Forward the full response as a single text message so the daemon
// can persist it verbatim. Tool streaming is intentionally omitted
// in v1; see the file-level comment.
text := strings.TrimRight(string(output), "\n")
if text != "" {
trySend(msgCh, Message{Type: MessageText, Content: text})
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
var evt geminiStreamEvent
if err := json.Unmarshal([]byte(line), &evt); err != nil {
continue
}
switch evt.Type {
case "init":
sessionID = evt.SessionID
trySend(msgCh, Message{Type: MessageStatus, Status: "running"})
case "message":
if evt.Role == "assistant" && evt.Content != "" {
output.WriteString(evt.Content)
trySend(msgCh, Message{Type: MessageText, Content: evt.Content})
}
case "tool_use":
var params map[string]any
if evt.Parameters != nil {
_ = json.Unmarshal(evt.Parameters, &params)
}
trySend(msgCh, Message{
Type: MessageToolUse,
Tool: evt.ToolName,
CallID: evt.ToolID,
Input: params,
})
case "tool_result":
trySend(msgCh, Message{
Type: MessageToolResult,
CallID: evt.ToolID,
Output: evt.Output,
})
case "error":
trySend(msgCh, Message{
Type: MessageError,
Content: evt.Message,
})
case "result":
if evt.Status == "error" && evt.Error != nil {
finalStatus = "failed"
finalError = evt.Error.Message
}
if evt.Stats != nil {
b.accumulateUsage(usage, evt.Stats)
}
}
}
waitErr := cmd.Wait()
durationMs := time.Since(startTime).Milliseconds()
duration := time.Since(startTime)
result := Result{
Status: "completed",
Output: text,
DurationMs: durationMs,
}
// Check context errors first — timeout and cancellation kill the
// process, which can cause readErr/waitErr as side effects. The
// context error is the root cause and determines the correct status.
if runCtx.Err() == context.DeadlineExceeded {
result.Status = "timeout"
result.Error = fmt.Sprintf("gemini timed out after %s", timeout)
finalStatus = "timeout"
finalError = fmt.Sprintf("gemini timed out after %s", timeout)
} else if runCtx.Err() == context.Canceled {
result.Status = "aborted"
result.Error = "execution cancelled"
} else if readErr != nil {
result.Status = "failed"
result.Error = fmt.Sprintf("read stdout: %s", readErr.Error())
} else if waitErr != nil {
result.Status = "failed"
result.Error = waitErr.Error()
finalStatus = "aborted"
finalError = "execution cancelled"
} else if waitErr != nil && finalStatus == "completed" {
finalStatus = "failed"
finalError = fmt.Sprintf("gemini exited with error: %v", waitErr)
}
resCh <- result
b.cfg.Logger.Info("gemini finished", "pid", cmd.Process.Pid, "status", finalStatus, "duration", duration.Round(time.Millisecond).String())
resCh <- Result{
Status: finalStatus,
Output: output.String(),
Error: finalError,
DurationMs: duration.Milliseconds(),
SessionID: sessionID,
Usage: usage,
}
}()
return &Session{Messages: msgCh, Result: resCh}, nil
}
// accumulateUsage extracts per-model token usage from Gemini's result stats.
func (b *geminiBackend) accumulateUsage(usage map[string]TokenUsage, stats *geminiStreamStats) {
for model, m := range stats.Models {
u := usage[model]
u.InputTokens += int64(m.InputTokens)
u.OutputTokens += int64(m.OutputTokens)
u.CacheReadTokens += int64(m.Cached)
usage[model] = u
}
}
// ── Gemini stream-json event types ──
type geminiStreamEvent struct {
Type string `json:"type"`
Timestamp string `json:"timestamp,omitempty"`
SessionID string `json:"session_id,omitempty"`
Model string `json:"model,omitempty"`
// message fields
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
Delta bool `json:"delta,omitempty"`
// tool_use fields
ToolName string `json:"tool_name,omitempty"`
ToolID string `json:"tool_id,omitempty"`
Parameters json.RawMessage `json:"parameters,omitempty"`
// tool_result fields
Status string `json:"status,omitempty"`
Output string `json:"output,omitempty"`
// error fields
Severity string `json:"severity,omitempty"`
Message string `json:"message,omitempty"`
// result fields
Error *geminiStreamError `json:"error,omitempty"`
Stats *geminiStreamStats `json:"stats,omitempty"`
}
type geminiStreamError struct {
Type string `json:"type"`
Message string `json:"message"`
}
type geminiStreamStats struct {
TotalTokens int `json:"total_tokens"`
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
DurationMs int `json:"duration_ms"`
ToolCalls int `json:"tool_calls"`
Models map[string]geminiModelStats `json:"models,omitempty"`
}
type geminiModelStats struct {
TotalTokens int `json:"total_tokens"`
InputTokens int `json:"input_tokens"`
OutputTokens int `json:"output_tokens"`
Cached int `json:"cached"`
}
// ── Arg builder ──
// buildGeminiArgs assembles the argv for a one-shot gemini invocation.
//
// Flags:
//
// -p / --prompt non-interactive prompt (the user's task)
// --yolo auto-approve all tool executions (equivalent to
// claude's --permission-mode bypassPermissions)
// -o text plain text output (stream-json is a follow-up)
// -m <model> optional model override (from MULTICA_GEMINI_MODEL)
// --yolo auto-approve all tool executions
// -o stream-json streaming NDJSON output for live events
// -m <model> optional model override
// -r <session> resume a previous session (if provided)
//
// Note: gemini reads stdin and appends it to -p when both are present.
// The daemon does not pipe stdin, so the prompt comes exclusively from -p.
func buildGeminiArgs(prompt string, opts ExecOptions) []string {
args := []string{
"-p", prompt,
"--yolo",
"-o", "text",
"-o", "stream-json",
}
if opts.Model != "" {
args = append(args, "-m", opts.Model)

View File

@@ -11,7 +11,7 @@ func TestBuildGeminiArgsBaseline(t *testing.T) {
expected := []string{
"-p", "write a haiku",
"--yolo",
"-o", "text",
"-o", "stream-json",
}
if len(args) != len(expected) {

View File

@@ -38,12 +38,19 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
sessionID = fmt.Sprintf("multica-%d", time.Now().UnixNano())
}
args := []string{"agent", "--local", "--json", "--session-id", sessionID}
if opts.Model != "" {
args = append(args, "--model", opts.Model)
}
if opts.SystemPrompt != "" {
args = append(args, "--system-prompt", opts.SystemPrompt)
}
if opts.Timeout > 0 {
args = append(args, "--timeout", fmt.Sprintf("%d", int(opts.Timeout.Seconds())))
}
args = append(args, "--message", prompt)
cmd := exec.CommandContext(runCtx, execPath, args...)
cmd.WaitDelay = 10 * time.Second
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
@@ -67,6 +74,12 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
msgCh := make(chan Message, 256)
resCh := make(chan Result, 1)
// Close stderr when the context is cancelled so the scanner unblocks.
go func() {
<-runCtx.Done()
_ = stderr.Close()
}()
go func() {
defer cancel()
defer close(msgCh)
@@ -129,22 +142,108 @@ type openclawEventResult struct {
}
// processOutput reads the JSON output from openclaw --json stderr and returns
// the parsed result. OpenClaw writes its JSON result to stderr, which may also
// contain non-JSON log lines. We scan line-by-line so a final result line can
// be recognized without waiting for the entire stderr stream to be buffered.
// the parsed result. OpenClaw writes its JSON output to stderr, which may also
// contain non-JSON log lines. The stream may contain:
//
// - NDJSON streaming events (type: "text", "tool_use", "tool_result", "error",
// "step_start", "step_finish") — emitted in real time as the agent works
// - A final result JSON (with payloads + meta) — the legacy single-blob format
//
// We scan line-by-line, emitting messages as events arrive so streaming
// consumers get real-time feedback instead of waiting for the final blob.
func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclawEventResult {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
var output strings.Builder
var sessionID string
var usage TokenUsage
finalStatus := "completed"
var finalError string
gotEvents := false // true if we parsed at least one streaming event or result
var rawLines []string
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
if result, ok := tryParseOpenclawResult(line); ok {
return b.buildOpenclawEventResult(result, ch)
// Try parsing as a streaming NDJSON event first.
if event, ok := tryParseOpenclawEvent(line); ok {
gotEvents = true
if event.SessionID != "" {
sessionID = event.SessionID
}
switch event.Type {
case "text":
if event.Text != "" {
output.WriteString(event.Text)
trySend(ch, Message{Type: MessageText, Content: event.Text})
}
case "tool_use":
var input map[string]any
if event.Input != nil {
_ = json.Unmarshal(event.Input, &input)
}
trySend(ch, Message{
Type: MessageToolUse,
Tool: event.Tool,
CallID: event.CallID,
Input: input,
})
case "tool_result":
trySend(ch, Message{
Type: MessageToolResult,
Tool: event.Tool,
CallID: event.CallID,
Output: event.Text,
})
case "error":
errMsg := event.errorMessage()
b.cfg.Logger.Warn("openclaw error event", "error", errMsg)
trySend(ch, Message{Type: MessageError, Content: errMsg})
finalStatus = "failed"
finalError = errMsg
case "lifecycle":
phase := event.Phase
if phase == "error" || phase == "failed" || phase == "cancelled" {
errMsg := event.errorMessage()
b.cfg.Logger.Warn("openclaw lifecycle failure", "phase", phase, "error", errMsg)
trySend(ch, Message{Type: MessageError, Content: errMsg})
finalStatus = "failed"
finalError = errMsg
}
case "step_start":
trySend(ch, Message{Type: MessageStatus, Status: "running"})
case "step_finish":
if event.Usage != nil {
u := parseOpenclawUsage(event.Usage)
usage.InputTokens += u.InputTokens
usage.OutputTokens += u.OutputTokens
usage.CacheReadTokens += u.CacheReadTokens
usage.CacheWriteTokens += u.CacheWriteTokens
}
}
continue
}
// Try parsing as a final result blob (legacy format).
if result, ok := tryParseOpenclawResult(line); ok {
gotEvents = true
res := b.buildOpenclawEventResult(result, ch, &output)
if res.sessionID != "" {
sessionID = res.sessionID
}
// Prefer usage from the final result if no streaming events reported it.
u := res.usage
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
usage = u
}
continue
}
// Not JSON — treat as log line.
b.cfg.Logger.Debug("[openclaw:stderr] " + line)
rawLines = append(rawLines, line)
}
@@ -153,36 +252,65 @@ func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclaw
return openclawEventResult{status: "failed", errMsg: fmt.Sprintf("read stderr: %v", err)}
}
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
if trimmed != "" {
return openclawEventResult{status: "completed", output: trimmed}
// If we got no events at all, fall back to raw output.
if !gotEvents {
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
if trimmed != "" {
return openclawEventResult{status: "completed", output: trimmed}
}
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
}
return openclawEventResult{
status: finalStatus,
errMsg: finalError,
output: output.String(),
sessionID: sessionID,
usage: usage,
}
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
}
// tryParseOpenclawEvent attempts to parse a line as a streaming NDJSON event.
// Returns the event and true if the line is a valid event with a known type.
func tryParseOpenclawEvent(line string) (openclawEvent, bool) {
if len(line) == 0 || line[0] != '{' {
return openclawEvent{}, false
}
var event openclawEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
return openclawEvent{}, false
}
if event.Type == "" {
return openclawEvent{}, false
}
return event, true
}
// tryParseOpenclawResult attempts to parse a line as a final result blob
// (the legacy format with payloads + meta). Lines must start with '{' to be
// considered — we no longer scan for braces at arbitrary positions, which
// avoids false matches on log lines containing JSON fragments.
func tryParseOpenclawResult(raw string) (openclawResult, bool) {
// Try each '{' position until we find valid openclawResult JSON.
// Earlier '{' chars may appear in log/error lines (e.g. raw_params={...}).
var result openclawResult
for i := 0; i < len(raw); i++ {
if raw[i] != '{' {
continue
}
if err := json.Unmarshal([]byte(raw[i:]), &result); err == nil && (result.Payloads != nil || result.Meta.DurationMs > 0) {
return result, true
}
if len(raw) == 0 || raw[0] != '{' {
return openclawResult{}, false
}
return openclawResult{}, false
var result openclawResult
if err := json.Unmarshal([]byte(raw), &result); err != nil {
return openclawResult{}, false
}
if result.Payloads == nil && result.Meta.DurationMs == 0 {
return openclawResult{}, false
}
return result, true
}
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message) openclawEventResult {
var output strings.Builder
// buildOpenclawEventResult extracts text and metadata from a final result blob.
// Text payloads are appended to the shared output builder and emitted to ch.
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message, output *strings.Builder) openclawEventResult {
for _, p := range result.Payloads {
if p.Text != "" {
if output.Len() > 0 {
output.WriteString("\n")
}
output.WriteString(p.Text)
trySend(ch, Message{Type: MessageText, Content: p.Text})
}
}
@@ -193,17 +321,10 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
sessionID = sid
}
if u, ok := result.Meta.AgentMeta["usage"].(map[string]any); ok {
usage.InputTokens = openclawInt64(u, "input")
usage.OutputTokens = openclawInt64(u, "output")
usage.CacheReadTokens = openclawInt64(u, "cacheRead")
usage.CacheWriteTokens = openclawInt64(u, "cacheWrite")
usage = parseOpenclawUsage(u)
}
}
if output.Len() > 0 {
trySend(ch, Message{Type: MessageText, Content: output.String()})
}
return openclawEventResult{
status: "completed",
output: output.String(),
@@ -212,6 +333,33 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
}
}
// parseOpenclawUsage extracts token usage from a map, supporting multiple
// field name conventions used by different OpenClaw versions and PaperClip:
//
// input / inputTokens / input_tokens
// output / outputTokens / output_tokens
// cacheRead / cachedInputTokens / cached_input_tokens / cache_read
// cacheWrite / cacheCreationInputTokens / cache_creation_input_tokens / cache_write
func parseOpenclawUsage(data map[string]any) TokenUsage {
return TokenUsage{
InputTokens: openclawInt64FirstOf(data, "input", "inputTokens", "input_tokens"),
OutputTokens: openclawInt64FirstOf(data, "output", "outputTokens", "output_tokens"),
CacheReadTokens: openclawInt64FirstOf(data, "cacheRead", "cachedInputTokens", "cached_input_tokens", "cache_read", "cache_read_input_tokens"),
CacheWriteTokens: openclawInt64FirstOf(data, "cacheWrite", "cacheCreationInputTokens", "cache_creation_input_tokens", "cache_write"),
}
}
// openclawInt64FirstOf returns the first non-zero int64 value found under any
// of the given keys. This supports field name variants across protocol versions.
func openclawInt64FirstOf(data map[string]any, keys ...string) int64 {
for _, key := range keys {
if v := openclawInt64(data, key); v != 0 {
return v
}
}
return 0
}
// openclawInt64 safely extracts an int64 from a JSON-decoded map value (which
// may be float64 due to Go's JSON number handling).
func openclawInt64(data map[string]any, key string) int64 {
@@ -231,7 +379,73 @@ func openclawInt64(data map[string]any, key string) int64 {
// ── JSON types for `openclaw agent --json` output ──
// openclawResult represents the JSON output from `openclaw agent --json`.
// openclawEvent represents a single streaming NDJSON event from openclaw --json.
//
// Event types:
// - "text" — text output (text field)
// - "tool_use" — tool invocation (tool, callId, input)
// - "tool_result" — tool output (tool, callId, text)
// - "error" — error (text, or structured error object)
// - "lifecycle" — phase changes (phase: "error"/"failed"/"cancelled")
// - "step_start" — agent step begins
// - "step_finish" — agent step ends (usage)
type openclawEvent struct {
Type string `json:"type"`
SessionID string `json:"sessionId,omitempty"`
Text string `json:"text,omitempty"`
Tool string `json:"tool,omitempty"`
CallID string `json:"callId,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
Usage map[string]any `json:"usage,omitempty"`
Phase string `json:"phase,omitempty"` // lifecycle event phase
Error *openclawError `json:"error,omitempty"` // structured error object
Message string `json:"message,omitempty"` // alternative error message field
}
// errorMessage extracts a human-readable error message from the event,
// checking multiple fields: structured error object, text, message, or fallback.
func (e openclawEvent) errorMessage() string {
if e.Error != nil {
if msg := e.Error.message(); msg != "" {
return msg
}
}
if e.Text != "" {
return e.Text
}
if e.Message != "" {
return e.Message
}
return "unknown openclaw error"
}
// openclawError represents a structured error in an openclaw event,
// compatible with PaperClip's error format (name + data.message).
type openclawError struct {
Name string `json:"name,omitempty"`
Data *openclawErrorData `json:"data,omitempty"`
Message string `json:"message,omitempty"`
}
func (e *openclawError) message() string {
if e.Data != nil && e.Data.Message != "" {
return e.Data.Message
}
if e.Message != "" {
return e.Message
}
if e.Name != "" {
return e.Name
}
return ""
}
type openclawErrorData struct {
Message string `json:"message,omitempty"`
}
// openclawResult represents the final JSON output from `openclaw agent --json`
// (the legacy single-blob format with payloads + meta).
type openclawResult struct {
Payloads []openclawPayload `json:"payloads"`
Meta openclawMeta `json:"meta"`

View File

@@ -18,7 +18,7 @@ func TestNewReturnsOpenclawBackend(t *testing.T) {
}
}
// ── processOutput tests ──
// ── Legacy result format tests (processOutput with final JSON blob) ──
func TestOpenclawProcessOutputHappyPath(t *testing.T) {
t.Parallel()
@@ -90,11 +90,24 @@ func TestOpenclawProcessOutputMultiplePayloads(t *testing.T) {
res := b.processOutput(strings.NewReader(string(data)), ch)
if res.output != "First\nSecond" {
t.Errorf("output: got %q, want %q", res.output, "First\nSecond")
if res.output != "FirstSecond" {
t.Errorf("output: got %q, want %q", res.output, "FirstSecond")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 text messages, got %d", len(msgs))
}
if msgs[0].Content != "First" {
t.Errorf("msg[0]: got %q, want %q", msgs[0].Content, "First")
}
if msgs[1].Content != "Second" {
t.Errorf("msg[1]: got %q, want %q", msgs[1].Content, "Second")
}
}
func TestOpenclawProcessOutputEmptyPayloads(t *testing.T) {
@@ -238,7 +251,8 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
Meta: openclawMeta{DurationMs: 500},
}
data, _ := json.Marshal(result)
// Simulate error line containing braces before the real JSON (the exact bug scenario)
// Log line with braces should NOT be parsed as JSON — only lines starting
// with '{' are considered. The result blob on its own line is still parsed.
input := `[tools] exec failed: complex interpreter invocation detected. raw_params={"command":"echo hello"}` + "\n" + string(data)
res := b.processOutput(strings.NewReader(input), ch)
@@ -253,6 +267,558 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
close(ch)
}
func TestOpenclawResultBlobWithLeadingPrefixRejected(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
// A line with a prefix before the JSON should NOT be parsed as a result.
// This tests that the hardened parser rejects non-'{'-starting lines.
result := openclawResult{
Payloads: []openclawPayload{{Text: "Should not match"}},
Meta: openclawMeta{DurationMs: 500},
}
data, _ := json.Marshal(result)
input := "some prefix " + string(data)
res := b.processOutput(strings.NewReader(input), ch)
// Should fall back to raw output since the JSON has a prefix.
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
if res.output != input {
t.Errorf("output: got %q, want raw input back", res.output)
}
close(ch)
}
// ── Streaming NDJSON event tests ──
func TestOpenclawStreamingTextEvents(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Hello "}`,
`{"type":"text","text":"world"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
if res.output != "Hello world" {
t.Errorf("output: got %q, want %q", res.output, "Hello world")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[0].Type != MessageText || msgs[0].Content != "Hello " {
t.Errorf("msg[0]: type=%s content=%q", msgs[0].Type, msgs[0].Content)
}
if msgs[1].Type != MessageText || msgs[1].Content != "world" {
t.Errorf("msg[1]: type=%s content=%q", msgs[1].Type, msgs[1].Content)
}
}
func TestOpenclawStreamingToolUseEvents(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"tool_use","tool":"bash","callId":"call_1","input":{"command":"ls -la"}}`,
`{"type":"tool_result","tool":"bash","callId":"call_1","text":"total 42\ndrwxr-xr-x"}`,
`{"type":"text","text":"Listed files."}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 3 {
t.Fatalf("expected 3 messages, got %d", len(msgs))
}
// tool_use
if msgs[0].Type != MessageToolUse {
t.Errorf("msg[0] type: got %s, want tool-use", msgs[0].Type)
}
if msgs[0].Tool != "bash" {
t.Errorf("msg[0] tool: got %q, want %q", msgs[0].Tool, "bash")
}
if msgs[0].CallID != "call_1" {
t.Errorf("msg[0] callID: got %q, want %q", msgs[0].CallID, "call_1")
}
if msgs[0].Input["command"] != "ls -la" {
t.Errorf("msg[0] input: got %v", msgs[0].Input)
}
// tool_result
if msgs[1].Type != MessageToolResult {
t.Errorf("msg[1] type: got %s, want tool-result", msgs[1].Type)
}
if msgs[1].CallID != "call_1" {
t.Errorf("msg[1] callID: got %q", msgs[1].CallID)
}
if msgs[1].Output != "total 42\ndrwxr-xr-x" {
t.Errorf("msg[1] output: got %q", msgs[1].Output)
}
// text
if msgs[2].Type != MessageText || msgs[2].Content != "Listed files." {
t.Errorf("msg[2]: type=%s content=%q", msgs[2].Type, msgs[2].Content)
}
}
func TestOpenclawStreamingErrorEvent(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Starting..."}`,
`{"type":"error","text":"model not found: gpt-99"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "model not found: gpt-99" {
t.Errorf("errMsg: got %q", res.errMsg)
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[1].Type != MessageError {
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
}
}
func TestOpenclawStreamingStepFinishUsage(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"step_start"}`,
`{"type":"text","text":"Done"}`,
`{"type":"step_finish","usage":{"input":200,"output":100,"cacheRead":50,"cacheWrite":25}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.usage.InputTokens != 200 {
t.Errorf("input tokens: got %d, want 200", res.usage.InputTokens)
}
if res.usage.OutputTokens != 100 {
t.Errorf("output tokens: got %d, want 100", res.usage.OutputTokens)
}
if res.usage.CacheReadTokens != 50 {
t.Errorf("cache read: got %d, want 50", res.usage.CacheReadTokens)
}
if res.usage.CacheWriteTokens != 25 {
t.Errorf("cache write: got %d, want 25", res.usage.CacheWriteTokens)
}
close(ch)
}
func TestOpenclawStreamingSessionID(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Hi","sessionId":"ses_stream_123"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.sessionID != "ses_stream_123" {
t.Errorf("sessionID: got %q, want %q", res.sessionID, "ses_stream_123")
}
close(ch)
}
func TestOpenclawStreamingMixedWithLogLines(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
"[info] initializing agent...",
`{"type":"text","text":"Hello"}`,
"[debug] tool exec completed",
`{"type":"text","text":" world"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
if res.output != "Hello world" {
t.Errorf("output: got %q, want %q", res.output, "Hello world")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 text messages, got %d", len(msgs))
}
}
// ── Lifecycle event tests ──
func TestOpenclawLifecycleErrorPhase(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Working..."}`,
`{"type":"lifecycle","phase":"error","text":"agent crashed unexpectedly"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "agent crashed unexpectedly" {
t.Errorf("errMsg: got %q", res.errMsg)
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[1].Type != MessageError {
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
}
}
func TestOpenclawLifecycleFailedPhase(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"lifecycle","phase":"failed","message":"timeout exceeded"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "timeout exceeded" {
t.Errorf("errMsg: got %q, want %q", res.errMsg, "timeout exceeded")
}
close(ch)
}
func TestOpenclawLifecycleCancelledPhase(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"lifecycle","phase":"cancelled"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
// With no text/message/error, should get the default.
if res.errMsg != "unknown openclaw error" {
t.Errorf("errMsg: got %q", res.errMsg)
}
close(ch)
}
func TestOpenclawLifecycleRunningPhaseIgnored(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"lifecycle","phase":"running"}`,
`{"type":"text","text":"Hello"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
close(ch)
}
// ── Structured error tests ──
func TestOpenclawStructuredErrorObject(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"error","error":{"name":"ModelNotFoundError","data":{"message":"model gpt-99 not available"}}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "model gpt-99 not available" {
t.Errorf("errMsg: got %q, want %q", res.errMsg, "model gpt-99 not available")
}
close(ch)
}
func TestOpenclawStructuredErrorNameOnly(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"error","error":{"name":"AuthenticationError"}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.errMsg != "AuthenticationError" {
t.Errorf("errMsg: got %q, want %q", res.errMsg, "AuthenticationError")
}
close(ch)
}
func TestOpenclawStructuredErrorMessageField(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"error","error":{"message":"rate limit exceeded"}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.errMsg != "rate limit exceeded" {
t.Errorf("errMsg: got %q, want %q", res.errMsg, "rate limit exceeded")
}
close(ch)
}
// ── Usage field name variant tests ──
func TestOpenclawUsageAlternativeFieldNames(t *testing.T) {
t.Parallel()
// Test PaperClip-style field names (inputTokens, outputTokens, etc.)
data := map[string]any{
"inputTokens": float64(500),
"outputTokens": float64(200),
"cachedInputTokens": float64(100),
}
usage := parseOpenclawUsage(data)
if usage.InputTokens != 500 {
t.Errorf("InputTokens: got %d, want 500", usage.InputTokens)
}
if usage.OutputTokens != 200 {
t.Errorf("OutputTokens: got %d, want 200", usage.OutputTokens)
}
if usage.CacheReadTokens != 100 {
t.Errorf("CacheReadTokens: got %d, want 100", usage.CacheReadTokens)
}
}
func TestOpenclawUsageSnakeCaseFieldNames(t *testing.T) {
t.Parallel()
// Test snake_case field names (Anthropic API style)
data := map[string]any{
"input_tokens": float64(300),
"output_tokens": float64(150),
"cache_read_input_tokens": float64(80),
"cache_creation_input_tokens": float64(40),
}
usage := parseOpenclawUsage(data)
if usage.InputTokens != 300 {
t.Errorf("InputTokens: got %d, want 300", usage.InputTokens)
}
if usage.OutputTokens != 150 {
t.Errorf("OutputTokens: got %d, want 150", usage.OutputTokens)
}
if usage.CacheReadTokens != 80 {
t.Errorf("CacheReadTokens: got %d, want 80", usage.CacheReadTokens)
}
if usage.CacheWriteTokens != 40 {
t.Errorf("CacheWriteTokens: got %d, want 40", usage.CacheWriteTokens)
}
}
func TestOpenclawUsageOriginalFieldNames(t *testing.T) {
t.Parallel()
// Test the original short field names (input, output, cacheRead, cacheWrite)
data := map[string]any{
"input": float64(100),
"output": float64(50),
"cacheRead": float64(10),
"cacheWrite": float64(5),
}
usage := parseOpenclawUsage(data)
if usage.InputTokens != 100 {
t.Errorf("InputTokens: got %d, want 100", usage.InputTokens)
}
if usage.OutputTokens != 50 {
t.Errorf("OutputTokens: got %d, want 50", usage.OutputTokens)
}
if usage.CacheReadTokens != 10 {
t.Errorf("CacheReadTokens: got %d, want 10", usage.CacheReadTokens)
}
if usage.CacheWriteTokens != 5 {
t.Errorf("CacheWriteTokens: got %d, want 5", usage.CacheWriteTokens)
}
}
func TestOpenclawUsageAccumulationAcrossSteps(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"step_finish","usage":{"inputTokens":100,"outputTokens":50}}`,
`{"type":"step_finish","usage":{"inputTokens":200,"outputTokens":80,"cachedInputTokens":60}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.usage.InputTokens != 300 {
t.Errorf("InputTokens: got %d, want 300", res.usage.InputTokens)
}
if res.usage.OutputTokens != 130 {
t.Errorf("OutputTokens: got %d, want 130", res.usage.OutputTokens)
}
if res.usage.CacheReadTokens != 60 {
t.Errorf("CacheReadTokens: got %d, want 60", res.usage.CacheReadTokens)
}
close(ch)
}
func TestOpenclawUsageFinalResultAlternativeFields(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
result := openclawResult{
Payloads: []openclawPayload{{Text: "Done"}},
Meta: openclawMeta{
DurationMs: 1000,
AgentMeta: map[string]any{
"usage": map[string]any{
"inputTokens": float64(400),
"outputTokens": float64(180),
"cachedInputTokens": float64(90),
},
},
},
}
data, _ := json.Marshal(result)
res := b.processOutput(strings.NewReader(string(data)), ch)
if res.usage.InputTokens != 400 {
t.Errorf("InputTokens: got %d, want 400", res.usage.InputTokens)
}
if res.usage.OutputTokens != 180 {
t.Errorf("OutputTokens: got %d, want 180", res.usage.OutputTokens)
}
if res.usage.CacheReadTokens != 90 {
t.Errorf("CacheReadTokens: got %d, want 90", res.usage.CacheReadTokens)
}
close(ch)
}
// ── openclawInt64 tests ──
func TestOpenclawInt64Float(t *testing.T) {

View File

@@ -48,6 +48,7 @@ func (b *opencodeBackend) Execute(ctx context.Context, prompt string, opts ExecO
args = append(args, prompt)
cmd := exec.CommandContext(runCtx, execPath, args...)
cmd.WaitDelay = 10 * time.Second
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
@@ -74,6 +75,12 @@ func (b *opencodeBackend) Execute(ctx context.Context, prompt string, opts ExecO
msgCh := make(chan Message, 256)
resCh := make(chan Result, 1)
// Close stdout when the context is cancelled so the scanner unblocks.
go func() {
<-runCtx.Done()
_ = stdout.Close()
}()
go func() {
defer cancel()
defer close(msgCh)