Compare commits

...

1 Commits

Author SHA1 Message Date
Jiang Bohan
4896128723 feat(agent): improve OpenClaw backend with streaming, tool use, and model args
P0 improvements to the OpenClaw agent backend, informed by PaperClip's
adapter architecture:

1. Streaming output — emit MessageText as NDJSON events arrive, instead
   of waiting for the final result blob. Users now see real-time progress.

2. Tool use support — parse and emit MessageToolUse / MessageToolResult
   from streaming events, matching the Claude and OpenCode backends.

3. Model & system prompt — pass --model and --system-prompt to the
   OpenClaw CLI when configured, previously silently ignored.

The implementation supports both the new streaming NDJSON format and the
legacy single-blob result format for backwards compatibility.

Closes MUL-726
2026-04-14 01:37:49 +08:00
2 changed files with 373 additions and 22 deletions

View File

@@ -38,6 +38,12 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
sessionID = fmt.Sprintf("multica-%d", time.Now().UnixNano())
}
args := []string{"agent", "--local", "--json", "--session-id", sessionID}
if opts.Model != "" {
args = append(args, "--model", opts.Model)
}
if opts.SystemPrompt != "" {
args = append(args, "--system-prompt", opts.SystemPrompt)
}
if opts.Timeout > 0 {
args = append(args, "--timeout", fmt.Sprintf("%d", int(opts.Timeout.Seconds())))
}
@@ -129,22 +135,101 @@ type openclawEventResult struct {
}
// processOutput reads the JSON output from openclaw --json stderr and returns
// the parsed result. OpenClaw writes its JSON result to stderr, which may also
// contain non-JSON log lines. We scan line-by-line so a final result line can
// be recognized without waiting for the entire stderr stream to be buffered.
// the parsed result. OpenClaw writes its JSON output to stderr, which may also
// contain non-JSON log lines. The stream may contain:
//
// - NDJSON streaming events (type: "text", "tool_use", "tool_result", "error",
// "step_start", "step_finish") — emitted in real time as the agent works
// - A final result JSON (with payloads + meta) — the legacy single-blob format
//
// We scan line-by-line, emitting messages as events arrive so streaming
// consumers get real-time feedback instead of waiting for the final blob.
func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclawEventResult {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
var output strings.Builder
var sessionID string
var usage TokenUsage
finalStatus := "completed"
var finalError string
gotEvents := false // true if we parsed at least one streaming event or result
var rawLines []string
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
if result, ok := tryParseOpenclawResult(line); ok {
return b.buildOpenclawEventResult(result, ch)
// Try parsing as a streaming NDJSON event first.
if event, ok := tryParseOpenclawEvent(line); ok {
gotEvents = true
if event.SessionID != "" {
sessionID = event.SessionID
}
switch event.Type {
case "text":
if event.Text != "" {
output.WriteString(event.Text)
trySend(ch, Message{Type: MessageText, Content: event.Text})
}
case "tool_use":
var input map[string]any
if event.Input != nil {
_ = json.Unmarshal(event.Input, &input)
}
trySend(ch, Message{
Type: MessageToolUse,
Tool: event.Tool,
CallID: event.CallID,
Input: input,
})
case "tool_result":
trySend(ch, Message{
Type: MessageToolResult,
Tool: event.Tool,
CallID: event.CallID,
Output: event.Text,
})
case "error":
errMsg := event.Text
if errMsg == "" {
errMsg = "unknown openclaw error"
}
b.cfg.Logger.Warn("openclaw error event", "error", errMsg)
trySend(ch, Message{Type: MessageError, Content: errMsg})
finalStatus = "failed"
finalError = errMsg
case "step_start":
trySend(ch, Message{Type: MessageStatus, Status: "running"})
case "step_finish":
if event.Usage != nil {
usage.InputTokens += openclawInt64(event.Usage, "input")
usage.OutputTokens += openclawInt64(event.Usage, "output")
usage.CacheReadTokens += openclawInt64(event.Usage, "cacheRead")
usage.CacheWriteTokens += openclawInt64(event.Usage, "cacheWrite")
}
}
continue
}
// Try parsing as a final result blob (legacy format).
if result, ok := tryParseOpenclawResult(line); ok {
gotEvents = true
res := b.buildOpenclawEventResult(result, ch, &output)
if res.sessionID != "" {
sessionID = res.sessionID
}
// Prefer usage from the final result if no streaming events reported it.
u := res.usage
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
usage = u
}
continue
}
// Not JSON — treat as log line.
b.cfg.Logger.Debug("[openclaw:stderr] " + line)
rawLines = append(rawLines, line)
}
@@ -153,13 +238,42 @@ func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclaw
return openclawEventResult{status: "failed", errMsg: fmt.Sprintf("read stderr: %v", err)}
}
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
if trimmed != "" {
return openclawEventResult{status: "completed", output: trimmed}
// If we got no events at all, fall back to raw output.
if !gotEvents {
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
if trimmed != "" {
return openclawEventResult{status: "completed", output: trimmed}
}
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
}
return openclawEventResult{
status: finalStatus,
errMsg: finalError,
output: output.String(),
sessionID: sessionID,
usage: usage,
}
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
}
// tryParseOpenclawEvent attempts to parse a line as a streaming NDJSON event.
// Returns the event and true if the line is a valid event with a known type.
func tryParseOpenclawEvent(line string) (openclawEvent, bool) {
if len(line) == 0 || line[0] != '{' {
return openclawEvent{}, false
}
var event openclawEvent
if err := json.Unmarshal([]byte(line), &event); err != nil {
return openclawEvent{}, false
}
if event.Type == "" {
return openclawEvent{}, false
}
return event, true
}
// tryParseOpenclawResult attempts to parse a line as a final result blob
// (the legacy format with payloads + meta).
func tryParseOpenclawResult(raw string) (openclawResult, bool) {
// Try each '{' position until we find valid openclawResult JSON.
// Earlier '{' chars may appear in log/error lines (e.g. raw_params={...}).
@@ -175,14 +289,13 @@ func tryParseOpenclawResult(raw string) (openclawResult, bool) {
return openclawResult{}, false
}
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message) openclawEventResult {
var output strings.Builder
// buildOpenclawEventResult extracts text and metadata from a final result blob.
// Text payloads are appended to the shared output builder and emitted to ch.
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message, output *strings.Builder) openclawEventResult {
for _, p := range result.Payloads {
if p.Text != "" {
if output.Len() > 0 {
output.WriteString("\n")
}
output.WriteString(p.Text)
trySend(ch, Message{Type: MessageText, Content: p.Text})
}
}
@@ -200,10 +313,6 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
}
}
if output.Len() > 0 {
trySend(ch, Message{Type: MessageText, Content: output.String()})
}
return openclawEventResult{
status: "completed",
output: output.String(),
@@ -231,7 +340,20 @@ func openclawInt64(data map[string]any, key string) int64 {
// ── JSON types for `openclaw agent --json` output ──
// openclawResult represents the JSON output from `openclaw agent --json`.
// openclawEvent represents a single streaming NDJSON event from openclaw --json.
// Event types: "text", "tool_use", "tool_result", "error", "step_start", "step_finish".
type openclawEvent struct {
Type string `json:"type"`
SessionID string `json:"sessionId,omitempty"`
Text string `json:"text,omitempty"`
Tool string `json:"tool,omitempty"`
CallID string `json:"callId,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
Usage map[string]any `json:"usage,omitempty"`
}
// openclawResult represents the final JSON output from `openclaw agent --json`
// (the legacy single-blob format with payloads + meta).
type openclawResult struct {
Payloads []openclawPayload `json:"payloads"`
Meta openclawMeta `json:"meta"`

View File

@@ -18,7 +18,7 @@ func TestNewReturnsOpenclawBackend(t *testing.T) {
}
}
// ── processOutput tests ──
// ── Legacy result format tests (processOutput with final JSON blob) ──
func TestOpenclawProcessOutputHappyPath(t *testing.T) {
t.Parallel()
@@ -90,11 +90,24 @@ func TestOpenclawProcessOutputMultiplePayloads(t *testing.T) {
res := b.processOutput(strings.NewReader(string(data)), ch)
if res.output != "First\nSecond" {
t.Errorf("output: got %q, want %q", res.output, "First\nSecond")
if res.output != "FirstSecond" {
t.Errorf("output: got %q, want %q", res.output, "FirstSecond")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 text messages, got %d", len(msgs))
}
if msgs[0].Content != "First" {
t.Errorf("msg[0]: got %q, want %q", msgs[0].Content, "First")
}
if msgs[1].Content != "Second" {
t.Errorf("msg[1]: got %q, want %q", msgs[1].Content, "Second")
}
}
func TestOpenclawProcessOutputEmptyPayloads(t *testing.T) {
@@ -253,6 +266,222 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
close(ch)
}
// ── Streaming NDJSON event tests ──
func TestOpenclawStreamingTextEvents(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Hello "}`,
`{"type":"text","text":"world"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
if res.output != "Hello world" {
t.Errorf("output: got %q, want %q", res.output, "Hello world")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[0].Type != MessageText || msgs[0].Content != "Hello " {
t.Errorf("msg[0]: type=%s content=%q", msgs[0].Type, msgs[0].Content)
}
if msgs[1].Type != MessageText || msgs[1].Content != "world" {
t.Errorf("msg[1]: type=%s content=%q", msgs[1].Type, msgs[1].Content)
}
}
func TestOpenclawStreamingToolUseEvents(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"tool_use","tool":"bash","callId":"call_1","input":{"command":"ls -la"}}`,
`{"type":"tool_result","tool":"bash","callId":"call_1","text":"total 42\ndrwxr-xr-x"}`,
`{"type":"text","text":"Listed files."}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 3 {
t.Fatalf("expected 3 messages, got %d", len(msgs))
}
// tool_use
if msgs[0].Type != MessageToolUse {
t.Errorf("msg[0] type: got %s, want tool-use", msgs[0].Type)
}
if msgs[0].Tool != "bash" {
t.Errorf("msg[0] tool: got %q, want %q", msgs[0].Tool, "bash")
}
if msgs[0].CallID != "call_1" {
t.Errorf("msg[0] callID: got %q, want %q", msgs[0].CallID, "call_1")
}
if msgs[0].Input["command"] != "ls -la" {
t.Errorf("msg[0] input: got %v", msgs[0].Input)
}
// tool_result
if msgs[1].Type != MessageToolResult {
t.Errorf("msg[1] type: got %s, want tool-result", msgs[1].Type)
}
if msgs[1].CallID != "call_1" {
t.Errorf("msg[1] callID: got %q", msgs[1].CallID)
}
if msgs[1].Output != "total 42\ndrwxr-xr-x" {
t.Errorf("msg[1] output: got %q", msgs[1].Output)
}
// text
if msgs[2].Type != MessageText || msgs[2].Content != "Listed files." {
t.Errorf("msg[2]: type=%s content=%q", msgs[2].Type, msgs[2].Content)
}
}
func TestOpenclawStreamingErrorEvent(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Starting..."}`,
`{"type":"error","text":"model not found: gpt-99"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "failed" {
t.Errorf("status: got %q, want %q", res.status, "failed")
}
if res.errMsg != "model not found: gpt-99" {
t.Errorf("errMsg: got %q", res.errMsg)
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 messages, got %d", len(msgs))
}
if msgs[1].Type != MessageError {
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
}
}
func TestOpenclawStreamingStepFinishUsage(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"step_start"}`,
`{"type":"text","text":"Done"}`,
`{"type":"step_finish","usage":{"input":200,"output":100,"cacheRead":50,"cacheWrite":25}}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.usage.InputTokens != 200 {
t.Errorf("input tokens: got %d, want 200", res.usage.InputTokens)
}
if res.usage.OutputTokens != 100 {
t.Errorf("output tokens: got %d, want 100", res.usage.OutputTokens)
}
if res.usage.CacheReadTokens != 50 {
t.Errorf("cache read: got %d, want 50", res.usage.CacheReadTokens)
}
if res.usage.CacheWriteTokens != 25 {
t.Errorf("cache write: got %d, want 25", res.usage.CacheWriteTokens)
}
close(ch)
}
func TestOpenclawStreamingSessionID(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
`{"type":"text","text":"Hi","sessionId":"ses_stream_123"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.sessionID != "ses_stream_123" {
t.Errorf("sessionID: got %q, want %q", res.sessionID, "ses_stream_123")
}
close(ch)
}
func TestOpenclawStreamingMixedWithLogLines(t *testing.T) {
t.Parallel()
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
ch := make(chan Message, 256)
lines := []string{
"[info] initializing agent...",
`{"type":"text","text":"Hello"}`,
"[debug] tool exec completed",
`{"type":"text","text":" world"}`,
}
input := strings.Join(lines, "\n")
res := b.processOutput(strings.NewReader(input), ch)
if res.status != "completed" {
t.Errorf("status: got %q, want %q", res.status, "completed")
}
if res.output != "Hello world" {
t.Errorf("output: got %q, want %q", res.output, "Hello world")
}
close(ch)
var msgs []Message
for m := range ch {
msgs = append(msgs, m)
}
if len(msgs) != 2 {
t.Fatalf("expected 2 text messages, got %d", len(msgs))
}
}
// ── openclawInt64 tests ──
func TestOpenclawInt64Float(t *testing.T) {