mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-22 06:59:19 +02:00
Compare commits
9 Commits
agent/lamb
...
agent/j/40
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f3355049bc | ||
|
|
dca86acc69 | ||
|
|
c71525e198 | ||
|
|
977dc6479d | ||
|
|
a97bd3da0b | ||
|
|
9dfe119f47 | ||
|
|
418049856f | ||
|
|
9170b01739 | ||
|
|
a0d43ca31a |
@@ -726,12 +726,34 @@ describe("validateCliCallback", () => {
|
||||
expect(validateCliCallback("http://127.0.0.1:8080/cb")).toBe(true);
|
||||
});
|
||||
|
||||
it("accepts 10.x.x.x private IPs", () => {
|
||||
expect(validateCliCallback("http://10.0.0.5:9876/callback")).toBe(true);
|
||||
expect(validateCliCallback("http://10.255.255.255:1234/cb")).toBe(true);
|
||||
});
|
||||
|
||||
it("accepts 172.16-31.x.x private IPs", () => {
|
||||
expect(validateCliCallback("http://172.16.0.1:9876/callback")).toBe(true);
|
||||
expect(validateCliCallback("http://172.31.255.255:1234/cb")).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects 172.x outside 16-31 range", () => {
|
||||
expect(validateCliCallback("http://172.15.0.1:9876/callback")).toBe(false);
|
||||
expect(validateCliCallback("http://172.32.0.1:9876/callback")).toBe(false);
|
||||
});
|
||||
|
||||
it("accepts 192.168.x.x private IPs", () => {
|
||||
expect(validateCliCallback("http://192.168.1.131:41117/callback")).toBe(true);
|
||||
expect(validateCliCallback("http://192.168.0.1:8080/cb")).toBe(true);
|
||||
});
|
||||
|
||||
it("rejects https:// URLs", () => {
|
||||
expect(validateCliCallback("https://localhost:9876/callback")).toBe(false);
|
||||
});
|
||||
|
||||
it("rejects non-localhost hosts", () => {
|
||||
it("rejects public IPs and domains", () => {
|
||||
expect(validateCliCallback("http://evil.com:9876/callback")).toBe(false);
|
||||
expect(validateCliCallback("http://8.8.8.8:9876/callback")).toBe(false);
|
||||
expect(validateCliCallback("http://192.169.1.1:9876/callback")).toBe(false);
|
||||
});
|
||||
|
||||
it("rejects invalid URLs", () => {
|
||||
|
||||
@@ -68,14 +68,22 @@ function redirectToCliCallback(url: string, token: string, state: string) {
|
||||
window.location.href = `${url}${separator}token=${encodeURIComponent(token)}&state=${encodeURIComponent(state)}`;
|
||||
}
|
||||
|
||||
/** Validate that a CLI callback URL points to localhost over HTTP. */
|
||||
/**
|
||||
* Validate that a CLI callback URL points to a safe host over HTTP.
|
||||
* Allows localhost and private/LAN IPs (RFC 1918) to support self-hosted setups
|
||||
* on local VMs while blocking arbitrary public hosts.
|
||||
*/
|
||||
export function validateCliCallback(cliCallback: string): boolean {
|
||||
try {
|
||||
const cbUrl = new URL(cliCallback);
|
||||
if (cbUrl.protocol !== "http:") return false;
|
||||
if (cbUrl.hostname !== "localhost" && cbUrl.hostname !== "127.0.0.1")
|
||||
return false;
|
||||
return true;
|
||||
const h = cbUrl.hostname;
|
||||
if (h === "localhost" || h === "127.0.0.1") return true;
|
||||
// Allow RFC 1918 private IPs: 10.x.x.x, 172.16-31.x.x, 192.168.x.x
|
||||
if (/^10\./.test(h)) return true;
|
||||
if (/^172\.(1[6-9]|2\d|3[01])\./.test(h)) return true;
|
||||
if (/^192\.168\./.test(h)) return true;
|
||||
return false;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
.rich-text-editor.ProseMirror {
|
||||
color: var(--foreground);
|
||||
caret-color: var(--foreground);
|
||||
min-height: 100%;
|
||||
}
|
||||
|
||||
.rich-text-editor.ProseMirror:focus {
|
||||
|
||||
73
packages/views/editor/content-editor.test.tsx
Normal file
73
packages/views/editor/content-editor.test.tsx
Normal file
@@ -0,0 +1,73 @@
|
||||
import { describe, it, expect, vi, beforeEach } from "vitest";
|
||||
import { fireEvent, render, screen } from "@testing-library/react";
|
||||
|
||||
const mockFocus = vi.hoisted(() => vi.fn());
|
||||
|
||||
vi.mock("@tanstack/react-query", () => ({
|
||||
useQueryClient: () => ({}),
|
||||
}));
|
||||
|
||||
vi.mock("./extensions", () => ({
|
||||
createEditorExtensions: () => [],
|
||||
}));
|
||||
|
||||
vi.mock("./extensions/file-upload", () => ({
|
||||
uploadAndInsertFile: vi.fn(),
|
||||
}));
|
||||
|
||||
vi.mock("./utils/preprocess", () => ({
|
||||
preprocessMarkdown: (value: string) => value,
|
||||
}));
|
||||
|
||||
vi.mock("./bubble-menu", () => ({
|
||||
EditorBubbleMenu: () => null,
|
||||
}));
|
||||
|
||||
vi.mock("@tiptap/react", () => ({
|
||||
useEditor: () => ({
|
||||
commands: {
|
||||
focus: mockFocus,
|
||||
clearContent: vi.fn(),
|
||||
},
|
||||
getMarkdown: () => "",
|
||||
state: {
|
||||
doc: {
|
||||
content: {
|
||||
size: 0,
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
EditorContent: ({ className }: { className?: string }) => (
|
||||
<div className={className} data-testid="editor-content">
|
||||
<div className="ProseMirror rich-text-editor" data-testid="prosemirror" />
|
||||
</div>
|
||||
),
|
||||
}));
|
||||
|
||||
import { ContentEditor } from "./content-editor";
|
||||
|
||||
describe("ContentEditor", () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("focuses the editor when clicking the empty container area", () => {
|
||||
render(<ContentEditor placeholder="Add description..." />);
|
||||
|
||||
const shell = screen.getByTestId("editor-content").parentElement;
|
||||
expect(shell).not.toBeNull();
|
||||
|
||||
fireEvent.mouseDown(shell!);
|
||||
|
||||
expect(mockFocus).toHaveBeenCalledWith("end");
|
||||
});
|
||||
|
||||
it("does not hijack clicks that land inside the ProseMirror node", () => {
|
||||
render(<ContentEditor placeholder="Add description..." />);
|
||||
|
||||
fireEvent.mouseDown(screen.getByTestId("prosemirror"));
|
||||
|
||||
expect(mockFocus).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
@@ -30,6 +30,7 @@ import {
|
||||
useEffect,
|
||||
useImperativeHandle,
|
||||
useRef,
|
||||
type MouseEvent as ReactMouseEvent,
|
||||
} from "react";
|
||||
import { useEditor, EditorContent } from "@tiptap/react";
|
||||
import { cn } from "@multica/ui/lib/utils";
|
||||
@@ -222,11 +223,25 @@ const ContentEditor = forwardRef<ContentEditorRef, ContentEditorProps>(
|
||||
},
|
||||
}));
|
||||
|
||||
const handleContainerMouseDown = (event: ReactMouseEvent<HTMLDivElement>) => {
|
||||
if (!editable || !editor) return;
|
||||
|
||||
const target = event.target as HTMLElement;
|
||||
if (target.closest(".ProseMirror")) return;
|
||||
if (target.closest("a, button, input, textarea, [role='button'], [data-node-view-wrapper]")) return;
|
||||
|
||||
event.preventDefault();
|
||||
editor.commands.focus("end");
|
||||
};
|
||||
|
||||
if (!editor) return null;
|
||||
|
||||
return (
|
||||
<div className="relative min-h-full">
|
||||
<EditorContent editor={editor} />
|
||||
<div
|
||||
className="relative flex min-h-full flex-col"
|
||||
onMouseDown={handleContainerMouseDown}
|
||||
>
|
||||
<EditorContent className="flex-1 min-h-full" editor={editor} />
|
||||
{editable && (
|
||||
<>
|
||||
<EditorBubbleMenu editor={editor} />
|
||||
|
||||
@@ -158,7 +158,7 @@ function EditorLinkPreview({ editor }: { editor: Editor }) {
|
||||
|
||||
const virtualRef = useRef({
|
||||
getBoundingClientRect: () => new DOMRect(),
|
||||
contextElement: editor.view.dom,
|
||||
contextElement: editor.view?.dom,
|
||||
});
|
||||
|
||||
const { refs, floatingStyles, isPositioned, update } = useFloating({
|
||||
@@ -171,11 +171,20 @@ function EditorLinkPreview({ editor }: { editor: Editor }) {
|
||||
});
|
||||
|
||||
useEffect(() => {
|
||||
if (typeof editor.on !== "function" || typeof editor.off !== "function") {
|
||||
return;
|
||||
}
|
||||
|
||||
const check = () => {
|
||||
const view = editor.view;
|
||||
if (!editor.isEditable) {
|
||||
setVisible(false);
|
||||
return;
|
||||
}
|
||||
if (!view?.dom || typeof view.coordsAtPos !== "function") {
|
||||
setVisible(false);
|
||||
return;
|
||||
}
|
||||
if (!editor.state.selection.empty || !editor.isActive("link")) {
|
||||
setVisible(false);
|
||||
return;
|
||||
@@ -186,11 +195,11 @@ function EditorLinkPreview({ editor }: { editor: Editor }) {
|
||||
return;
|
||||
}
|
||||
|
||||
const coords = editor.view.coordsAtPos(editor.state.selection.from);
|
||||
const coords = view.coordsAtPos(editor.state.selection.from);
|
||||
virtualRef.current = {
|
||||
getBoundingClientRect: () =>
|
||||
new DOMRect(coords.left, coords.top, 0, coords.bottom - coords.top),
|
||||
contextElement: editor.view.dom,
|
||||
contextElement: view.dom,
|
||||
};
|
||||
|
||||
setHref(linkHref);
|
||||
@@ -204,11 +213,12 @@ function EditorLinkPreview({ editor }: { editor: Editor }) {
|
||||
|
||||
// Close on any ancestor scroll or window resize
|
||||
useEffect(() => {
|
||||
if (!visible) return;
|
||||
const editorDom = editor.view?.dom;
|
||||
if (!visible || !editorDom) return;
|
||||
const close = () => {
|
||||
setVisible(false);
|
||||
};
|
||||
const ancestors = getOverflowAncestors(editor.view.dom);
|
||||
const ancestors = getOverflowAncestors(editorDom);
|
||||
ancestors.forEach((el) => el.addEventListener("scroll", close, { passive: true }));
|
||||
window.addEventListener("resize", close);
|
||||
return () => {
|
||||
|
||||
@@ -98,15 +98,28 @@ func runAuthLoginBrowser(cmd *cobra.Command) error {
|
||||
serverURL := resolveServerURL(cmd)
|
||||
appURL := resolveAppURL(cmd)
|
||||
|
||||
// Determine the callback host from the configured app URL.
|
||||
// For self-hosted setups where the browser is on a different machine,
|
||||
// we need to use the server's reachable hostname instead of localhost.
|
||||
callbackHost := "localhost"
|
||||
bindAddr := "127.0.0.1"
|
||||
if parsed, err := url.Parse(appURL); err == nil {
|
||||
h := parsed.Hostname()
|
||||
if h != "" && h != "localhost" && h != "127.0.0.1" {
|
||||
callbackHost = h
|
||||
bindAddr = "0.0.0.0"
|
||||
}
|
||||
}
|
||||
|
||||
// Start a local HTTP server on a random port to receive the callback.
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
listener, err := net.Listen("tcp", bindAddr+":0")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start local server: %w", err)
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
port := listener.Addr().(*net.TCPAddr).Port
|
||||
callbackURL := fmt.Sprintf("http://localhost:%d/callback", port)
|
||||
callbackURL := fmt.Sprintf("http://%s:%d/callback", callbackHost, port)
|
||||
|
||||
// Generate a random state parameter for CSRF protection.
|
||||
stateBytes := make([]byte, 16)
|
||||
|
||||
@@ -525,7 +525,18 @@ func (d *Daemon) handlePing(ctx context.Context, rt Runtime, pingID string) {
|
||||
}
|
||||
}()
|
||||
|
||||
result := <-session.Result
|
||||
var result agent.Result
|
||||
select {
|
||||
case result = <-session.Result:
|
||||
case <-pingCtx.Done():
|
||||
d.logger.Warn("ping timed out waiting for result", "runtime_id", rt.ID, "ping_id", pingID)
|
||||
d.client.ReportPingResult(ctx, rt.ID, pingID, map[string]any{
|
||||
"status": "failed",
|
||||
"error": "ping context cancelled while waiting for result",
|
||||
"duration_ms": time.Since(start).Milliseconds(),
|
||||
})
|
||||
return
|
||||
}
|
||||
durationMs := time.Since(start).Milliseconds()
|
||||
|
||||
if result.Status == "completed" {
|
||||
@@ -1078,6 +1089,17 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
|
||||
return agent.Result{}, 0, err
|
||||
}
|
||||
|
||||
// Create an independent drain deadline so we don't block forever if the
|
||||
// backend's internal timeout fails to produce a Result (e.g. scanner
|
||||
// stuck on a hung stdout pipe). The extra 30 s gives the backend time
|
||||
// to clean up after its own timeout fires.
|
||||
drainTimeout := opts.Timeout + 30*time.Second
|
||||
if opts.Timeout == 0 {
|
||||
drainTimeout = 21 * time.Minute
|
||||
}
|
||||
drainCtx, drainCancel := context.WithTimeout(ctx, drainTimeout)
|
||||
defer drainCancel()
|
||||
|
||||
var toolCount atomic.Int32
|
||||
go func() {
|
||||
var seq atomic.Int32
|
||||
@@ -1135,77 +1157,92 @@ func (d *Daemon) executeAndDrain(ctx context.Context, backend agent.Backend, pro
|
||||
}
|
||||
}()
|
||||
|
||||
for msg := range session.Messages {
|
||||
switch msg.Type {
|
||||
case agent.MessageToolUse:
|
||||
n := toolCount.Add(1)
|
||||
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
|
||||
if msg.CallID != "" {
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-session.Messages:
|
||||
if !ok {
|
||||
goto drainDone
|
||||
}
|
||||
switch msg.Type {
|
||||
case agent.MessageToolUse:
|
||||
n := toolCount.Add(1)
|
||||
taskLog.Info(fmt.Sprintf("tool #%d: %s", n, msg.Tool))
|
||||
if msg.CallID != "" {
|
||||
mu.Lock()
|
||||
callIDToTool[msg.CallID] = msg.Tool
|
||||
mu.Unlock()
|
||||
}
|
||||
s := seq.Add(1)
|
||||
mu.Lock()
|
||||
callIDToTool[msg.CallID] = msg.Tool
|
||||
batch = append(batch, TaskMessageData{
|
||||
Seq: int(s),
|
||||
Type: "tool_use",
|
||||
Tool: msg.Tool,
|
||||
Input: msg.Input,
|
||||
})
|
||||
mu.Unlock()
|
||||
case agent.MessageToolResult:
|
||||
s := seq.Add(1)
|
||||
output := msg.Output
|
||||
if len(output) > 8192 {
|
||||
output = output[:8192]
|
||||
}
|
||||
toolName := msg.Tool
|
||||
if toolName == "" && msg.CallID != "" {
|
||||
mu.Lock()
|
||||
toolName = callIDToTool[msg.CallID]
|
||||
mu.Unlock()
|
||||
}
|
||||
mu.Lock()
|
||||
batch = append(batch, TaskMessageData{
|
||||
Seq: int(s),
|
||||
Type: "tool_result",
|
||||
Tool: toolName,
|
||||
Output: output,
|
||||
})
|
||||
mu.Unlock()
|
||||
case agent.MessageThinking:
|
||||
if msg.Content != "" {
|
||||
mu.Lock()
|
||||
pendingThinking.WriteString(msg.Content)
|
||||
mu.Unlock()
|
||||
}
|
||||
case agent.MessageText:
|
||||
if msg.Content != "" {
|
||||
taskLog.Debug("agent", "text", truncateLog(msg.Content, 200))
|
||||
mu.Lock()
|
||||
pendingText.WriteString(msg.Content)
|
||||
mu.Unlock()
|
||||
}
|
||||
case agent.MessageError:
|
||||
taskLog.Error("agent error", "content", msg.Content)
|
||||
s := seq.Add(1)
|
||||
mu.Lock()
|
||||
batch = append(batch, TaskMessageData{
|
||||
Seq: int(s),
|
||||
Type: "error",
|
||||
Content: msg.Content,
|
||||
})
|
||||
mu.Unlock()
|
||||
}
|
||||
s := seq.Add(1)
|
||||
mu.Lock()
|
||||
batch = append(batch, TaskMessageData{
|
||||
Seq: int(s),
|
||||
Type: "tool_use",
|
||||
Tool: msg.Tool,
|
||||
Input: msg.Input,
|
||||
})
|
||||
mu.Unlock()
|
||||
case agent.MessageToolResult:
|
||||
s := seq.Add(1)
|
||||
output := msg.Output
|
||||
if len(output) > 8192 {
|
||||
output = output[:8192]
|
||||
}
|
||||
toolName := msg.Tool
|
||||
if toolName == "" && msg.CallID != "" {
|
||||
mu.Lock()
|
||||
toolName = callIDToTool[msg.CallID]
|
||||
mu.Unlock()
|
||||
}
|
||||
mu.Lock()
|
||||
batch = append(batch, TaskMessageData{
|
||||
Seq: int(s),
|
||||
Type: "tool_result",
|
||||
Tool: toolName,
|
||||
Output: output,
|
||||
})
|
||||
mu.Unlock()
|
||||
case agent.MessageThinking:
|
||||
if msg.Content != "" {
|
||||
mu.Lock()
|
||||
pendingThinking.WriteString(msg.Content)
|
||||
mu.Unlock()
|
||||
}
|
||||
case agent.MessageText:
|
||||
if msg.Content != "" {
|
||||
taskLog.Debug("agent", "text", truncateLog(msg.Content, 200))
|
||||
mu.Lock()
|
||||
pendingText.WriteString(msg.Content)
|
||||
mu.Unlock()
|
||||
}
|
||||
case agent.MessageError:
|
||||
taskLog.Error("agent error", "content", msg.Content)
|
||||
s := seq.Add(1)
|
||||
mu.Lock()
|
||||
batch = append(batch, TaskMessageData{
|
||||
Seq: int(s),
|
||||
Type: "error",
|
||||
Content: msg.Content,
|
||||
})
|
||||
mu.Unlock()
|
||||
case <-drainCtx.Done():
|
||||
goto drainDone
|
||||
}
|
||||
}
|
||||
|
||||
drainDone:
|
||||
close(done)
|
||||
flush()
|
||||
}()
|
||||
|
||||
result := <-session.Result
|
||||
return result, toolCount.Load(), nil
|
||||
select {
|
||||
case result := <-session.Result:
|
||||
return result, toolCount.Load(), nil
|
||||
case <-drainCtx.Done():
|
||||
return agent.Result{
|
||||
Status: "timeout",
|
||||
Error: "agent did not produce result within drain timeout",
|
||||
}, toolCount.Load(), nil
|
||||
}
|
||||
}
|
||||
|
||||
func mergeUsage(a, b map[string]agent.TokenUsage) map[string]agent.TokenUsage {
|
||||
|
||||
@@ -37,6 +37,7 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
args := buildClaudeArgs(opts)
|
||||
|
||||
cmd := exec.CommandContext(runCtx, execPath, args...)
|
||||
cmd.WaitDelay = 10 * time.Second
|
||||
if opts.Cwd != "" {
|
||||
cmd.Dir = opts.Cwd
|
||||
}
|
||||
@@ -90,6 +91,12 @@ func (b *claudeBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
var finalError string
|
||||
usage := make(map[string]TokenUsage)
|
||||
|
||||
// Close stdout when the context is cancelled so scanner.Scan() unblocks.
|
||||
go func() {
|
||||
<-runCtx.Done()
|
||||
_ = stdout.Close()
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
||||
|
||||
|
||||
@@ -3,20 +3,15 @@ package agent
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os/exec"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// geminiBackend implements Backend by spawning the Google Gemini CLI
|
||||
// (`gemini -p <prompt> --yolo -o text`) and collecting its stdout.
|
||||
//
|
||||
// This is a minimal v1 implementation — it captures the final text
|
||||
// response but does not stream tool calls in real time. Follow-ups
|
||||
// can move to `-o stream-json` and parse Gemini's event schema once
|
||||
// we have a reliable reproduction of its output format.
|
||||
// with `--output-format stream-json` and parsing its NDJSON event stream.
|
||||
type geminiBackend struct {
|
||||
cfg Config
|
||||
}
|
||||
@@ -39,6 +34,7 @@ func (b *geminiBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
args := buildGeminiArgs(prompt, opts)
|
||||
|
||||
cmd := exec.CommandContext(runCtx, execPath, args...)
|
||||
cmd.WaitDelay = 10 * time.Second
|
||||
if opts.Cwd != "" {
|
||||
cmd.Dir = opts.Cwd
|
||||
}
|
||||
@@ -58,75 +54,196 @@ func (b *geminiBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
|
||||
b.cfg.Logger.Info("gemini started", "pid", cmd.Process.Pid, "cwd", opts.Cwd, "model", opts.Model)
|
||||
|
||||
msgCh := make(chan Message, 16)
|
||||
msgCh := make(chan Message, 256)
|
||||
resCh := make(chan Result, 1)
|
||||
|
||||
// Close stdout when the context is cancelled so scanner.Scan() unblocks.
|
||||
go func() {
|
||||
<-runCtx.Done()
|
||||
_ = stdout.Close()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
defer close(msgCh)
|
||||
defer close(resCh)
|
||||
|
||||
startTime := time.Now()
|
||||
output, readErr := io.ReadAll(bufio.NewReader(stdout))
|
||||
var output strings.Builder
|
||||
var sessionID string
|
||||
finalStatus := "completed"
|
||||
var finalError string
|
||||
usage := make(map[string]TokenUsage)
|
||||
|
||||
// Forward the full response as a single text message so the daemon
|
||||
// can persist it verbatim. Tool streaming is intentionally omitted
|
||||
// in v1; see the file-level comment.
|
||||
text := strings.TrimRight(string(output), "\n")
|
||||
if text != "" {
|
||||
trySend(msgCh, Message{Type: MessageText, Content: text})
|
||||
scanner := bufio.NewScanner(stdout)
|
||||
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
var evt geminiStreamEvent
|
||||
if err := json.Unmarshal([]byte(line), &evt); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
switch evt.Type {
|
||||
case "init":
|
||||
sessionID = evt.SessionID
|
||||
trySend(msgCh, Message{Type: MessageStatus, Status: "running"})
|
||||
|
||||
case "message":
|
||||
if evt.Role == "assistant" && evt.Content != "" {
|
||||
output.WriteString(evt.Content)
|
||||
trySend(msgCh, Message{Type: MessageText, Content: evt.Content})
|
||||
}
|
||||
|
||||
case "tool_use":
|
||||
var params map[string]any
|
||||
if evt.Parameters != nil {
|
||||
_ = json.Unmarshal(evt.Parameters, ¶ms)
|
||||
}
|
||||
trySend(msgCh, Message{
|
||||
Type: MessageToolUse,
|
||||
Tool: evt.ToolName,
|
||||
CallID: evt.ToolID,
|
||||
Input: params,
|
||||
})
|
||||
|
||||
case "tool_result":
|
||||
trySend(msgCh, Message{
|
||||
Type: MessageToolResult,
|
||||
CallID: evt.ToolID,
|
||||
Output: evt.Output,
|
||||
})
|
||||
|
||||
case "error":
|
||||
trySend(msgCh, Message{
|
||||
Type: MessageError,
|
||||
Content: evt.Message,
|
||||
})
|
||||
|
||||
case "result":
|
||||
if evt.Status == "error" && evt.Error != nil {
|
||||
finalStatus = "failed"
|
||||
finalError = evt.Error.Message
|
||||
}
|
||||
if evt.Stats != nil {
|
||||
b.accumulateUsage(usage, evt.Stats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
waitErr := cmd.Wait()
|
||||
durationMs := time.Since(startTime).Milliseconds()
|
||||
duration := time.Since(startTime)
|
||||
|
||||
result := Result{
|
||||
Status: "completed",
|
||||
Output: text,
|
||||
DurationMs: durationMs,
|
||||
}
|
||||
|
||||
// Check context errors first — timeout and cancellation kill the
|
||||
// process, which can cause readErr/waitErr as side effects. The
|
||||
// context error is the root cause and determines the correct status.
|
||||
if runCtx.Err() == context.DeadlineExceeded {
|
||||
result.Status = "timeout"
|
||||
result.Error = fmt.Sprintf("gemini timed out after %s", timeout)
|
||||
finalStatus = "timeout"
|
||||
finalError = fmt.Sprintf("gemini timed out after %s", timeout)
|
||||
} else if runCtx.Err() == context.Canceled {
|
||||
result.Status = "aborted"
|
||||
result.Error = "execution cancelled"
|
||||
} else if readErr != nil {
|
||||
result.Status = "failed"
|
||||
result.Error = fmt.Sprintf("read stdout: %s", readErr.Error())
|
||||
} else if waitErr != nil {
|
||||
result.Status = "failed"
|
||||
result.Error = waitErr.Error()
|
||||
finalStatus = "aborted"
|
||||
finalError = "execution cancelled"
|
||||
} else if waitErr != nil && finalStatus == "completed" {
|
||||
finalStatus = "failed"
|
||||
finalError = fmt.Sprintf("gemini exited with error: %v", waitErr)
|
||||
}
|
||||
|
||||
resCh <- result
|
||||
b.cfg.Logger.Info("gemini finished", "pid", cmd.Process.Pid, "status", finalStatus, "duration", duration.Round(time.Millisecond).String())
|
||||
|
||||
resCh <- Result{
|
||||
Status: finalStatus,
|
||||
Output: output.String(),
|
||||
Error: finalError,
|
||||
DurationMs: duration.Milliseconds(),
|
||||
SessionID: sessionID,
|
||||
Usage: usage,
|
||||
}
|
||||
}()
|
||||
|
||||
return &Session{Messages: msgCh, Result: resCh}, nil
|
||||
}
|
||||
|
||||
// accumulateUsage extracts per-model token usage from Gemini's result stats.
|
||||
func (b *geminiBackend) accumulateUsage(usage map[string]TokenUsage, stats *geminiStreamStats) {
|
||||
for model, m := range stats.Models {
|
||||
u := usage[model]
|
||||
u.InputTokens += int64(m.InputTokens)
|
||||
u.OutputTokens += int64(m.OutputTokens)
|
||||
u.CacheReadTokens += int64(m.Cached)
|
||||
usage[model] = u
|
||||
}
|
||||
}
|
||||
|
||||
// ── Gemini stream-json event types ──
|
||||
|
||||
type geminiStreamEvent struct {
|
||||
Type string `json:"type"`
|
||||
Timestamp string `json:"timestamp,omitempty"`
|
||||
SessionID string `json:"session_id,omitempty"`
|
||||
Model string `json:"model,omitempty"`
|
||||
|
||||
// message fields
|
||||
Role string `json:"role,omitempty"`
|
||||
Content string `json:"content,omitempty"`
|
||||
Delta bool `json:"delta,omitempty"`
|
||||
|
||||
// tool_use fields
|
||||
ToolName string `json:"tool_name,omitempty"`
|
||||
ToolID string `json:"tool_id,omitempty"`
|
||||
Parameters json.RawMessage `json:"parameters,omitempty"`
|
||||
|
||||
// tool_result fields
|
||||
Status string `json:"status,omitempty"`
|
||||
Output string `json:"output,omitempty"`
|
||||
|
||||
// error fields
|
||||
Severity string `json:"severity,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
|
||||
// result fields
|
||||
Error *geminiStreamError `json:"error,omitempty"`
|
||||
Stats *geminiStreamStats `json:"stats,omitempty"`
|
||||
}
|
||||
|
||||
type geminiStreamError struct {
|
||||
Type string `json:"type"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type geminiStreamStats struct {
|
||||
TotalTokens int `json:"total_tokens"`
|
||||
InputTokens int `json:"input_tokens"`
|
||||
OutputTokens int `json:"output_tokens"`
|
||||
DurationMs int `json:"duration_ms"`
|
||||
ToolCalls int `json:"tool_calls"`
|
||||
Models map[string]geminiModelStats `json:"models,omitempty"`
|
||||
}
|
||||
|
||||
type geminiModelStats struct {
|
||||
TotalTokens int `json:"total_tokens"`
|
||||
InputTokens int `json:"input_tokens"`
|
||||
OutputTokens int `json:"output_tokens"`
|
||||
Cached int `json:"cached"`
|
||||
}
|
||||
|
||||
// ── Arg builder ──
|
||||
|
||||
// buildGeminiArgs assembles the argv for a one-shot gemini invocation.
|
||||
//
|
||||
// Flags:
|
||||
//
|
||||
// -p / --prompt non-interactive prompt (the user's task)
|
||||
// --yolo auto-approve all tool executions (equivalent to
|
||||
// claude's --permission-mode bypassPermissions)
|
||||
// -o text plain text output (stream-json is a follow-up)
|
||||
// -m <model> optional model override (from MULTICA_GEMINI_MODEL)
|
||||
// --yolo auto-approve all tool executions
|
||||
// -o stream-json streaming NDJSON output for live events
|
||||
// -m <model> optional model override
|
||||
// -r <session> resume a previous session (if provided)
|
||||
//
|
||||
// Note: gemini reads stdin and appends it to -p when both are present.
|
||||
// The daemon does not pipe stdin, so the prompt comes exclusively from -p.
|
||||
func buildGeminiArgs(prompt string, opts ExecOptions) []string {
|
||||
args := []string{
|
||||
"-p", prompt,
|
||||
"--yolo",
|
||||
"-o", "text",
|
||||
"-o", "stream-json",
|
||||
}
|
||||
if opts.Model != "" {
|
||||
args = append(args, "-m", opts.Model)
|
||||
|
||||
@@ -11,7 +11,7 @@ func TestBuildGeminiArgsBaseline(t *testing.T) {
|
||||
expected := []string{
|
||||
"-p", "write a haiku",
|
||||
"--yolo",
|
||||
"-o", "text",
|
||||
"-o", "stream-json",
|
||||
}
|
||||
|
||||
if len(args) != len(expected) {
|
||||
|
||||
@@ -38,12 +38,19 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
sessionID = fmt.Sprintf("multica-%d", time.Now().UnixNano())
|
||||
}
|
||||
args := []string{"agent", "--local", "--json", "--session-id", sessionID}
|
||||
if opts.Model != "" {
|
||||
args = append(args, "--model", opts.Model)
|
||||
}
|
||||
if opts.SystemPrompt != "" {
|
||||
args = append(args, "--system-prompt", opts.SystemPrompt)
|
||||
}
|
||||
if opts.Timeout > 0 {
|
||||
args = append(args, "--timeout", fmt.Sprintf("%d", int(opts.Timeout.Seconds())))
|
||||
}
|
||||
args = append(args, "--message", prompt)
|
||||
|
||||
cmd := exec.CommandContext(runCtx, execPath, args...)
|
||||
cmd.WaitDelay = 10 * time.Second
|
||||
if opts.Cwd != "" {
|
||||
cmd.Dir = opts.Cwd
|
||||
}
|
||||
@@ -67,6 +74,12 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
msgCh := make(chan Message, 256)
|
||||
resCh := make(chan Result, 1)
|
||||
|
||||
// Close stderr when the context is cancelled so the scanner unblocks.
|
||||
go func() {
|
||||
<-runCtx.Done()
|
||||
_ = stderr.Close()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
defer close(msgCh)
|
||||
@@ -129,22 +142,108 @@ type openclawEventResult struct {
|
||||
}
|
||||
|
||||
// processOutput reads the JSON output from openclaw --json stderr and returns
|
||||
// the parsed result. OpenClaw writes its JSON result to stderr, which may also
|
||||
// contain non-JSON log lines. We scan line-by-line so a final result line can
|
||||
// be recognized without waiting for the entire stderr stream to be buffered.
|
||||
// the parsed result. OpenClaw writes its JSON output to stderr, which may also
|
||||
// contain non-JSON log lines. The stream may contain:
|
||||
//
|
||||
// - NDJSON streaming events (type: "text", "tool_use", "tool_result", "error",
|
||||
// "step_start", "step_finish") — emitted in real time as the agent works
|
||||
// - A final result JSON (with payloads + meta) — the legacy single-blob format
|
||||
//
|
||||
// We scan line-by-line, emitting messages as events arrive so streaming
|
||||
// consumers get real-time feedback instead of waiting for the final blob.
|
||||
func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclawEventResult {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
||||
|
||||
var output strings.Builder
|
||||
var sessionID string
|
||||
var usage TokenUsage
|
||||
finalStatus := "completed"
|
||||
var finalError string
|
||||
gotEvents := false // true if we parsed at least one streaming event or result
|
||||
|
||||
var rawLines []string
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
if result, ok := tryParseOpenclawResult(line); ok {
|
||||
return b.buildOpenclawEventResult(result, ch)
|
||||
|
||||
// Try parsing as a streaming NDJSON event first.
|
||||
if event, ok := tryParseOpenclawEvent(line); ok {
|
||||
gotEvents = true
|
||||
if event.SessionID != "" {
|
||||
sessionID = event.SessionID
|
||||
}
|
||||
switch event.Type {
|
||||
case "text":
|
||||
if event.Text != "" {
|
||||
output.WriteString(event.Text)
|
||||
trySend(ch, Message{Type: MessageText, Content: event.Text})
|
||||
}
|
||||
case "tool_use":
|
||||
var input map[string]any
|
||||
if event.Input != nil {
|
||||
_ = json.Unmarshal(event.Input, &input)
|
||||
}
|
||||
trySend(ch, Message{
|
||||
Type: MessageToolUse,
|
||||
Tool: event.Tool,
|
||||
CallID: event.CallID,
|
||||
Input: input,
|
||||
})
|
||||
case "tool_result":
|
||||
trySend(ch, Message{
|
||||
Type: MessageToolResult,
|
||||
Tool: event.Tool,
|
||||
CallID: event.CallID,
|
||||
Output: event.Text,
|
||||
})
|
||||
case "error":
|
||||
errMsg := event.errorMessage()
|
||||
b.cfg.Logger.Warn("openclaw error event", "error", errMsg)
|
||||
trySend(ch, Message{Type: MessageError, Content: errMsg})
|
||||
finalStatus = "failed"
|
||||
finalError = errMsg
|
||||
case "lifecycle":
|
||||
phase := event.Phase
|
||||
if phase == "error" || phase == "failed" || phase == "cancelled" {
|
||||
errMsg := event.errorMessage()
|
||||
b.cfg.Logger.Warn("openclaw lifecycle failure", "phase", phase, "error", errMsg)
|
||||
trySend(ch, Message{Type: MessageError, Content: errMsg})
|
||||
finalStatus = "failed"
|
||||
finalError = errMsg
|
||||
}
|
||||
case "step_start":
|
||||
trySend(ch, Message{Type: MessageStatus, Status: "running"})
|
||||
case "step_finish":
|
||||
if event.Usage != nil {
|
||||
u := parseOpenclawUsage(event.Usage)
|
||||
usage.InputTokens += u.InputTokens
|
||||
usage.OutputTokens += u.OutputTokens
|
||||
usage.CacheReadTokens += u.CacheReadTokens
|
||||
usage.CacheWriteTokens += u.CacheWriteTokens
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Try parsing as a final result blob (legacy format).
|
||||
if result, ok := tryParseOpenclawResult(line); ok {
|
||||
gotEvents = true
|
||||
res := b.buildOpenclawEventResult(result, ch, &output)
|
||||
if res.sessionID != "" {
|
||||
sessionID = res.sessionID
|
||||
}
|
||||
// Prefer usage from the final result if no streaming events reported it.
|
||||
u := res.usage
|
||||
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
|
||||
usage = u
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Not JSON — treat as log line.
|
||||
b.cfg.Logger.Debug("[openclaw:stderr] " + line)
|
||||
rawLines = append(rawLines, line)
|
||||
}
|
||||
@@ -153,36 +252,65 @@ func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclaw
|
||||
return openclawEventResult{status: "failed", errMsg: fmt.Sprintf("read stderr: %v", err)}
|
||||
}
|
||||
|
||||
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
|
||||
if trimmed != "" {
|
||||
return openclawEventResult{status: "completed", output: trimmed}
|
||||
// If we got no events at all, fall back to raw output.
|
||||
if !gotEvents {
|
||||
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
|
||||
if trimmed != "" {
|
||||
return openclawEventResult{status: "completed", output: trimmed}
|
||||
}
|
||||
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
|
||||
}
|
||||
|
||||
return openclawEventResult{
|
||||
status: finalStatus,
|
||||
errMsg: finalError,
|
||||
output: output.String(),
|
||||
sessionID: sessionID,
|
||||
usage: usage,
|
||||
}
|
||||
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
|
||||
}
|
||||
|
||||
// tryParseOpenclawEvent attempts to parse a line as a streaming NDJSON event.
|
||||
// Returns the event and true if the line is a valid event with a known type.
|
||||
func tryParseOpenclawEvent(line string) (openclawEvent, bool) {
|
||||
if len(line) == 0 || line[0] != '{' {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
var event openclawEvent
|
||||
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
if event.Type == "" {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
return event, true
|
||||
}
|
||||
|
||||
// tryParseOpenclawResult attempts to parse a line as a final result blob
|
||||
// (the legacy format with payloads + meta). Lines must start with '{' to be
|
||||
// considered — we no longer scan for braces at arbitrary positions, which
|
||||
// avoids false matches on log lines containing JSON fragments.
|
||||
func tryParseOpenclawResult(raw string) (openclawResult, bool) {
|
||||
// Try each '{' position until we find valid openclawResult JSON.
|
||||
// Earlier '{' chars may appear in log/error lines (e.g. raw_params={...}).
|
||||
var result openclawResult
|
||||
for i := 0; i < len(raw); i++ {
|
||||
if raw[i] != '{' {
|
||||
continue
|
||||
}
|
||||
if err := json.Unmarshal([]byte(raw[i:]), &result); err == nil && (result.Payloads != nil || result.Meta.DurationMs > 0) {
|
||||
return result, true
|
||||
}
|
||||
if len(raw) == 0 || raw[0] != '{' {
|
||||
return openclawResult{}, false
|
||||
}
|
||||
return openclawResult{}, false
|
||||
var result openclawResult
|
||||
if err := json.Unmarshal([]byte(raw), &result); err != nil {
|
||||
return openclawResult{}, false
|
||||
}
|
||||
if result.Payloads == nil && result.Meta.DurationMs == 0 {
|
||||
return openclawResult{}, false
|
||||
}
|
||||
return result, true
|
||||
}
|
||||
|
||||
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message) openclawEventResult {
|
||||
var output strings.Builder
|
||||
// buildOpenclawEventResult extracts text and metadata from a final result blob.
|
||||
// Text payloads are appended to the shared output builder and emitted to ch.
|
||||
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message, output *strings.Builder) openclawEventResult {
|
||||
for _, p := range result.Payloads {
|
||||
if p.Text != "" {
|
||||
if output.Len() > 0 {
|
||||
output.WriteString("\n")
|
||||
}
|
||||
output.WriteString(p.Text)
|
||||
trySend(ch, Message{Type: MessageText, Content: p.Text})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -193,17 +321,10 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
|
||||
sessionID = sid
|
||||
}
|
||||
if u, ok := result.Meta.AgentMeta["usage"].(map[string]any); ok {
|
||||
usage.InputTokens = openclawInt64(u, "input")
|
||||
usage.OutputTokens = openclawInt64(u, "output")
|
||||
usage.CacheReadTokens = openclawInt64(u, "cacheRead")
|
||||
usage.CacheWriteTokens = openclawInt64(u, "cacheWrite")
|
||||
usage = parseOpenclawUsage(u)
|
||||
}
|
||||
}
|
||||
|
||||
if output.Len() > 0 {
|
||||
trySend(ch, Message{Type: MessageText, Content: output.String()})
|
||||
}
|
||||
|
||||
return openclawEventResult{
|
||||
status: "completed",
|
||||
output: output.String(),
|
||||
@@ -212,6 +333,33 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
|
||||
}
|
||||
}
|
||||
|
||||
// parseOpenclawUsage extracts token usage from a map, supporting multiple
|
||||
// field name conventions used by different OpenClaw versions and PaperClip:
|
||||
//
|
||||
// input / inputTokens / input_tokens
|
||||
// output / outputTokens / output_tokens
|
||||
// cacheRead / cachedInputTokens / cached_input_tokens / cache_read
|
||||
// cacheWrite / cacheCreationInputTokens / cache_creation_input_tokens / cache_write
|
||||
func parseOpenclawUsage(data map[string]any) TokenUsage {
|
||||
return TokenUsage{
|
||||
InputTokens: openclawInt64FirstOf(data, "input", "inputTokens", "input_tokens"),
|
||||
OutputTokens: openclawInt64FirstOf(data, "output", "outputTokens", "output_tokens"),
|
||||
CacheReadTokens: openclawInt64FirstOf(data, "cacheRead", "cachedInputTokens", "cached_input_tokens", "cache_read", "cache_read_input_tokens"),
|
||||
CacheWriteTokens: openclawInt64FirstOf(data, "cacheWrite", "cacheCreationInputTokens", "cache_creation_input_tokens", "cache_write"),
|
||||
}
|
||||
}
|
||||
|
||||
// openclawInt64FirstOf returns the first non-zero int64 value found under any
|
||||
// of the given keys. This supports field name variants across protocol versions.
|
||||
func openclawInt64FirstOf(data map[string]any, keys ...string) int64 {
|
||||
for _, key := range keys {
|
||||
if v := openclawInt64(data, key); v != 0 {
|
||||
return v
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// openclawInt64 safely extracts an int64 from a JSON-decoded map value (which
|
||||
// may be float64 due to Go's JSON number handling).
|
||||
func openclawInt64(data map[string]any, key string) int64 {
|
||||
@@ -231,7 +379,73 @@ func openclawInt64(data map[string]any, key string) int64 {
|
||||
|
||||
// ── JSON types for `openclaw agent --json` output ──
|
||||
|
||||
// openclawResult represents the JSON output from `openclaw agent --json`.
|
||||
// openclawEvent represents a single streaming NDJSON event from openclaw --json.
|
||||
//
|
||||
// Event types:
|
||||
// - "text" — text output (text field)
|
||||
// - "tool_use" — tool invocation (tool, callId, input)
|
||||
// - "tool_result" — tool output (tool, callId, text)
|
||||
// - "error" — error (text, or structured error object)
|
||||
// - "lifecycle" — phase changes (phase: "error"/"failed"/"cancelled")
|
||||
// - "step_start" — agent step begins
|
||||
// - "step_finish" — agent step ends (usage)
|
||||
type openclawEvent struct {
|
||||
Type string `json:"type"`
|
||||
SessionID string `json:"sessionId,omitempty"`
|
||||
Text string `json:"text,omitempty"`
|
||||
Tool string `json:"tool,omitempty"`
|
||||
CallID string `json:"callId,omitempty"`
|
||||
Input json.RawMessage `json:"input,omitempty"`
|
||||
Usage map[string]any `json:"usage,omitempty"`
|
||||
Phase string `json:"phase,omitempty"` // lifecycle event phase
|
||||
Error *openclawError `json:"error,omitempty"` // structured error object
|
||||
Message string `json:"message,omitempty"` // alternative error message field
|
||||
}
|
||||
|
||||
// errorMessage extracts a human-readable error message from the event,
|
||||
// checking multiple fields: structured error object, text, message, or fallback.
|
||||
func (e openclawEvent) errorMessage() string {
|
||||
if e.Error != nil {
|
||||
if msg := e.Error.message(); msg != "" {
|
||||
return msg
|
||||
}
|
||||
}
|
||||
if e.Text != "" {
|
||||
return e.Text
|
||||
}
|
||||
if e.Message != "" {
|
||||
return e.Message
|
||||
}
|
||||
return "unknown openclaw error"
|
||||
}
|
||||
|
||||
// openclawError represents a structured error in an openclaw event,
|
||||
// compatible with PaperClip's error format (name + data.message).
|
||||
type openclawError struct {
|
||||
Name string `json:"name,omitempty"`
|
||||
Data *openclawErrorData `json:"data,omitempty"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
func (e *openclawError) message() string {
|
||||
if e.Data != nil && e.Data.Message != "" {
|
||||
return e.Data.Message
|
||||
}
|
||||
if e.Message != "" {
|
||||
return e.Message
|
||||
}
|
||||
if e.Name != "" {
|
||||
return e.Name
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type openclawErrorData struct {
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
// openclawResult represents the final JSON output from `openclaw agent --json`
|
||||
// (the legacy single-blob format with payloads + meta).
|
||||
type openclawResult struct {
|
||||
Payloads []openclawPayload `json:"payloads"`
|
||||
Meta openclawMeta `json:"meta"`
|
||||
|
||||
@@ -18,7 +18,7 @@ func TestNewReturnsOpenclawBackend(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ── processOutput tests ──
|
||||
// ── Legacy result format tests (processOutput with final JSON blob) ──
|
||||
|
||||
func TestOpenclawProcessOutputHappyPath(t *testing.T) {
|
||||
t.Parallel()
|
||||
@@ -90,11 +90,24 @@ func TestOpenclawProcessOutputMultiplePayloads(t *testing.T) {
|
||||
|
||||
res := b.processOutput(strings.NewReader(string(data)), ch)
|
||||
|
||||
if res.output != "First\nSecond" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "First\nSecond")
|
||||
if res.output != "FirstSecond" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "FirstSecond")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 text messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[0].Content != "First" {
|
||||
t.Errorf("msg[0]: got %q, want %q", msgs[0].Content, "First")
|
||||
}
|
||||
if msgs[1].Content != "Second" {
|
||||
t.Errorf("msg[1]: got %q, want %q", msgs[1].Content, "Second")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawProcessOutputEmptyPayloads(t *testing.T) {
|
||||
@@ -238,7 +251,8 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
|
||||
Meta: openclawMeta{DurationMs: 500},
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
// Simulate error line containing braces before the real JSON (the exact bug scenario)
|
||||
// Log line with braces should NOT be parsed as JSON — only lines starting
|
||||
// with '{' are considered. The result blob on its own line is still parsed.
|
||||
input := `[tools] exec failed: complex interpreter invocation detected. raw_params={"command":"echo hello"}` + "\n" + string(data)
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
@@ -253,6 +267,558 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawResultBlobWithLeadingPrefixRejected(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
// A line with a prefix before the JSON should NOT be parsed as a result.
|
||||
// This tests that the hardened parser rejects non-'{'-starting lines.
|
||||
result := openclawResult{
|
||||
Payloads: []openclawPayload{{Text: "Should not match"}},
|
||||
Meta: openclawMeta{DurationMs: 500},
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
input := "some prefix " + string(data)
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
// Should fall back to raw output since the JSON has a prefix.
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
if res.output != input {
|
||||
t.Errorf("output: got %q, want raw input back", res.output)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── Streaming NDJSON event tests ──
|
||||
|
||||
func TestOpenclawStreamingTextEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Hello "}`,
|
||||
`{"type":"text","text":"world"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
if res.output != "Hello world" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "Hello world")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[0].Type != MessageText || msgs[0].Content != "Hello " {
|
||||
t.Errorf("msg[0]: type=%s content=%q", msgs[0].Type, msgs[0].Content)
|
||||
}
|
||||
if msgs[1].Type != MessageText || msgs[1].Content != "world" {
|
||||
t.Errorf("msg[1]: type=%s content=%q", msgs[1].Type, msgs[1].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingToolUseEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"tool_use","tool":"bash","callId":"call_1","input":{"command":"ls -la"}}`,
|
||||
`{"type":"tool_result","tool":"bash","callId":"call_1","text":"total 42\ndrwxr-xr-x"}`,
|
||||
`{"type":"text","text":"Listed files."}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 3 {
|
||||
t.Fatalf("expected 3 messages, got %d", len(msgs))
|
||||
}
|
||||
|
||||
// tool_use
|
||||
if msgs[0].Type != MessageToolUse {
|
||||
t.Errorf("msg[0] type: got %s, want tool-use", msgs[0].Type)
|
||||
}
|
||||
if msgs[0].Tool != "bash" {
|
||||
t.Errorf("msg[0] tool: got %q, want %q", msgs[0].Tool, "bash")
|
||||
}
|
||||
if msgs[0].CallID != "call_1" {
|
||||
t.Errorf("msg[0] callID: got %q, want %q", msgs[0].CallID, "call_1")
|
||||
}
|
||||
if msgs[0].Input["command"] != "ls -la" {
|
||||
t.Errorf("msg[0] input: got %v", msgs[0].Input)
|
||||
}
|
||||
|
||||
// tool_result
|
||||
if msgs[1].Type != MessageToolResult {
|
||||
t.Errorf("msg[1] type: got %s, want tool-result", msgs[1].Type)
|
||||
}
|
||||
if msgs[1].CallID != "call_1" {
|
||||
t.Errorf("msg[1] callID: got %q", msgs[1].CallID)
|
||||
}
|
||||
if msgs[1].Output != "total 42\ndrwxr-xr-x" {
|
||||
t.Errorf("msg[1] output: got %q", msgs[1].Output)
|
||||
}
|
||||
|
||||
// text
|
||||
if msgs[2].Type != MessageText || msgs[2].Content != "Listed files." {
|
||||
t.Errorf("msg[2]: type=%s content=%q", msgs[2].Type, msgs[2].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingErrorEvent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Starting..."}`,
|
||||
`{"type":"error","text":"model not found: gpt-99"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "model not found: gpt-99" {
|
||||
t.Errorf("errMsg: got %q", res.errMsg)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[1].Type != MessageError {
|
||||
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingStepFinishUsage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"step_start"}`,
|
||||
`{"type":"text","text":"Done"}`,
|
||||
`{"type":"step_finish","usage":{"input":200,"output":100,"cacheRead":50,"cacheWrite":25}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.usage.InputTokens != 200 {
|
||||
t.Errorf("input tokens: got %d, want 200", res.usage.InputTokens)
|
||||
}
|
||||
if res.usage.OutputTokens != 100 {
|
||||
t.Errorf("output tokens: got %d, want 100", res.usage.OutputTokens)
|
||||
}
|
||||
if res.usage.CacheReadTokens != 50 {
|
||||
t.Errorf("cache read: got %d, want 50", res.usage.CacheReadTokens)
|
||||
}
|
||||
if res.usage.CacheWriteTokens != 25 {
|
||||
t.Errorf("cache write: got %d, want 25", res.usage.CacheWriteTokens)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingSessionID(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Hi","sessionId":"ses_stream_123"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.sessionID != "ses_stream_123" {
|
||||
t.Errorf("sessionID: got %q, want %q", res.sessionID, "ses_stream_123")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingMixedWithLogLines(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
"[info] initializing agent...",
|
||||
`{"type":"text","text":"Hello"}`,
|
||||
"[debug] tool exec completed",
|
||||
`{"type":"text","text":" world"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
if res.output != "Hello world" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "Hello world")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 text messages, got %d", len(msgs))
|
||||
}
|
||||
}
|
||||
|
||||
// ── Lifecycle event tests ──
|
||||
|
||||
func TestOpenclawLifecycleErrorPhase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Working..."}`,
|
||||
`{"type":"lifecycle","phase":"error","text":"agent crashed unexpectedly"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "agent crashed unexpectedly" {
|
||||
t.Errorf("errMsg: got %q", res.errMsg)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[1].Type != MessageError {
|
||||
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawLifecycleFailedPhase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"lifecycle","phase":"failed","message":"timeout exceeded"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "timeout exceeded" {
|
||||
t.Errorf("errMsg: got %q, want %q", res.errMsg, "timeout exceeded")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawLifecycleCancelledPhase(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"lifecycle","phase":"cancelled"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
// With no text/message/error, should get the default.
|
||||
if res.errMsg != "unknown openclaw error" {
|
||||
t.Errorf("errMsg: got %q", res.errMsg)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawLifecycleRunningPhaseIgnored(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"lifecycle","phase":"running"}`,
|
||||
`{"type":"text","text":"Hello"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── Structured error tests ──
|
||||
|
||||
func TestOpenclawStructuredErrorObject(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"error","error":{"name":"ModelNotFoundError","data":{"message":"model gpt-99 not available"}}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "model gpt-99 not available" {
|
||||
t.Errorf("errMsg: got %q, want %q", res.errMsg, "model gpt-99 not available")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStructuredErrorNameOnly(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"error","error":{"name":"AuthenticationError"}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.errMsg != "AuthenticationError" {
|
||||
t.Errorf("errMsg: got %q, want %q", res.errMsg, "AuthenticationError")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStructuredErrorMessageField(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"error","error":{"message":"rate limit exceeded"}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.errMsg != "rate limit exceeded" {
|
||||
t.Errorf("errMsg: got %q, want %q", res.errMsg, "rate limit exceeded")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── Usage field name variant tests ──
|
||||
|
||||
func TestOpenclawUsageAlternativeFieldNames(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Test PaperClip-style field names (inputTokens, outputTokens, etc.)
|
||||
data := map[string]any{
|
||||
"inputTokens": float64(500),
|
||||
"outputTokens": float64(200),
|
||||
"cachedInputTokens": float64(100),
|
||||
}
|
||||
usage := parseOpenclawUsage(data)
|
||||
|
||||
if usage.InputTokens != 500 {
|
||||
t.Errorf("InputTokens: got %d, want 500", usage.InputTokens)
|
||||
}
|
||||
if usage.OutputTokens != 200 {
|
||||
t.Errorf("OutputTokens: got %d, want 200", usage.OutputTokens)
|
||||
}
|
||||
if usage.CacheReadTokens != 100 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 100", usage.CacheReadTokens)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawUsageSnakeCaseFieldNames(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Test snake_case field names (Anthropic API style)
|
||||
data := map[string]any{
|
||||
"input_tokens": float64(300),
|
||||
"output_tokens": float64(150),
|
||||
"cache_read_input_tokens": float64(80),
|
||||
"cache_creation_input_tokens": float64(40),
|
||||
}
|
||||
usage := parseOpenclawUsage(data)
|
||||
|
||||
if usage.InputTokens != 300 {
|
||||
t.Errorf("InputTokens: got %d, want 300", usage.InputTokens)
|
||||
}
|
||||
if usage.OutputTokens != 150 {
|
||||
t.Errorf("OutputTokens: got %d, want 150", usage.OutputTokens)
|
||||
}
|
||||
if usage.CacheReadTokens != 80 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 80", usage.CacheReadTokens)
|
||||
}
|
||||
if usage.CacheWriteTokens != 40 {
|
||||
t.Errorf("CacheWriteTokens: got %d, want 40", usage.CacheWriteTokens)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawUsageOriginalFieldNames(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Test the original short field names (input, output, cacheRead, cacheWrite)
|
||||
data := map[string]any{
|
||||
"input": float64(100),
|
||||
"output": float64(50),
|
||||
"cacheRead": float64(10),
|
||||
"cacheWrite": float64(5),
|
||||
}
|
||||
usage := parseOpenclawUsage(data)
|
||||
|
||||
if usage.InputTokens != 100 {
|
||||
t.Errorf("InputTokens: got %d, want 100", usage.InputTokens)
|
||||
}
|
||||
if usage.OutputTokens != 50 {
|
||||
t.Errorf("OutputTokens: got %d, want 50", usage.OutputTokens)
|
||||
}
|
||||
if usage.CacheReadTokens != 10 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 10", usage.CacheReadTokens)
|
||||
}
|
||||
if usage.CacheWriteTokens != 5 {
|
||||
t.Errorf("CacheWriteTokens: got %d, want 5", usage.CacheWriteTokens)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawUsageAccumulationAcrossSteps(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"step_finish","usage":{"inputTokens":100,"outputTokens":50}}`,
|
||||
`{"type":"step_finish","usage":{"inputTokens":200,"outputTokens":80,"cachedInputTokens":60}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.usage.InputTokens != 300 {
|
||||
t.Errorf("InputTokens: got %d, want 300", res.usage.InputTokens)
|
||||
}
|
||||
if res.usage.OutputTokens != 130 {
|
||||
t.Errorf("OutputTokens: got %d, want 130", res.usage.OutputTokens)
|
||||
}
|
||||
if res.usage.CacheReadTokens != 60 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 60", res.usage.CacheReadTokens)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawUsageFinalResultAlternativeFields(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
result := openclawResult{
|
||||
Payloads: []openclawPayload{{Text: "Done"}},
|
||||
Meta: openclawMeta{
|
||||
DurationMs: 1000,
|
||||
AgentMeta: map[string]any{
|
||||
"usage": map[string]any{
|
||||
"inputTokens": float64(400),
|
||||
"outputTokens": float64(180),
|
||||
"cachedInputTokens": float64(90),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := json.Marshal(result)
|
||||
|
||||
res := b.processOutput(strings.NewReader(string(data)), ch)
|
||||
|
||||
if res.usage.InputTokens != 400 {
|
||||
t.Errorf("InputTokens: got %d, want 400", res.usage.InputTokens)
|
||||
}
|
||||
if res.usage.OutputTokens != 180 {
|
||||
t.Errorf("OutputTokens: got %d, want 180", res.usage.OutputTokens)
|
||||
}
|
||||
if res.usage.CacheReadTokens != 90 {
|
||||
t.Errorf("CacheReadTokens: got %d, want 90", res.usage.CacheReadTokens)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── openclawInt64 tests ──
|
||||
|
||||
func TestOpenclawInt64Float(t *testing.T) {
|
||||
|
||||
@@ -48,6 +48,7 @@ func (b *opencodeBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
args = append(args, prompt)
|
||||
|
||||
cmd := exec.CommandContext(runCtx, execPath, args...)
|
||||
cmd.WaitDelay = 10 * time.Second
|
||||
if opts.Cwd != "" {
|
||||
cmd.Dir = opts.Cwd
|
||||
}
|
||||
@@ -74,6 +75,12 @@ func (b *opencodeBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
msgCh := make(chan Message, 256)
|
||||
resCh := make(chan Result, 1)
|
||||
|
||||
// Close stdout when the context is cancelled so the scanner unblocks.
|
||||
go func() {
|
||||
<-runCtx.Done()
|
||||
_ = stdout.Close()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
defer close(msgCh)
|
||||
|
||||
Reference in New Issue
Block a user