mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-24 07:59:30 +02:00
Compare commits
1 Commits
fix/header
...
agent/j/2b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4896128723 |
@@ -38,6 +38,12 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
sessionID = fmt.Sprintf("multica-%d", time.Now().UnixNano())
|
||||
}
|
||||
args := []string{"agent", "--local", "--json", "--session-id", sessionID}
|
||||
if opts.Model != "" {
|
||||
args = append(args, "--model", opts.Model)
|
||||
}
|
||||
if opts.SystemPrompt != "" {
|
||||
args = append(args, "--system-prompt", opts.SystemPrompt)
|
||||
}
|
||||
if opts.Timeout > 0 {
|
||||
args = append(args, "--timeout", fmt.Sprintf("%d", int(opts.Timeout.Seconds())))
|
||||
}
|
||||
@@ -129,22 +135,101 @@ type openclawEventResult struct {
|
||||
}
|
||||
|
||||
// processOutput reads the JSON output from openclaw --json stderr and returns
|
||||
// the parsed result. OpenClaw writes its JSON result to stderr, which may also
|
||||
// contain non-JSON log lines. We scan line-by-line so a final result line can
|
||||
// be recognized without waiting for the entire stderr stream to be buffered.
|
||||
// the parsed result. OpenClaw writes its JSON output to stderr, which may also
|
||||
// contain non-JSON log lines. The stream may contain:
|
||||
//
|
||||
// - NDJSON streaming events (type: "text", "tool_use", "tool_result", "error",
|
||||
// "step_start", "step_finish") — emitted in real time as the agent works
|
||||
// - A final result JSON (with payloads + meta) — the legacy single-blob format
|
||||
//
|
||||
// We scan line-by-line, emitting messages as events arrive so streaming
|
||||
// consumers get real-time feedback instead of waiting for the final blob.
|
||||
func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclawEventResult {
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
||||
|
||||
var output strings.Builder
|
||||
var sessionID string
|
||||
var usage TokenUsage
|
||||
finalStatus := "completed"
|
||||
var finalError string
|
||||
gotEvents := false // true if we parsed at least one streaming event or result
|
||||
|
||||
var rawLines []string
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
if result, ok := tryParseOpenclawResult(line); ok {
|
||||
return b.buildOpenclawEventResult(result, ch)
|
||||
|
||||
// Try parsing as a streaming NDJSON event first.
|
||||
if event, ok := tryParseOpenclawEvent(line); ok {
|
||||
gotEvents = true
|
||||
if event.SessionID != "" {
|
||||
sessionID = event.SessionID
|
||||
}
|
||||
switch event.Type {
|
||||
case "text":
|
||||
if event.Text != "" {
|
||||
output.WriteString(event.Text)
|
||||
trySend(ch, Message{Type: MessageText, Content: event.Text})
|
||||
}
|
||||
case "tool_use":
|
||||
var input map[string]any
|
||||
if event.Input != nil {
|
||||
_ = json.Unmarshal(event.Input, &input)
|
||||
}
|
||||
trySend(ch, Message{
|
||||
Type: MessageToolUse,
|
||||
Tool: event.Tool,
|
||||
CallID: event.CallID,
|
||||
Input: input,
|
||||
})
|
||||
case "tool_result":
|
||||
trySend(ch, Message{
|
||||
Type: MessageToolResult,
|
||||
Tool: event.Tool,
|
||||
CallID: event.CallID,
|
||||
Output: event.Text,
|
||||
})
|
||||
case "error":
|
||||
errMsg := event.Text
|
||||
if errMsg == "" {
|
||||
errMsg = "unknown openclaw error"
|
||||
}
|
||||
b.cfg.Logger.Warn("openclaw error event", "error", errMsg)
|
||||
trySend(ch, Message{Type: MessageError, Content: errMsg})
|
||||
finalStatus = "failed"
|
||||
finalError = errMsg
|
||||
case "step_start":
|
||||
trySend(ch, Message{Type: MessageStatus, Status: "running"})
|
||||
case "step_finish":
|
||||
if event.Usage != nil {
|
||||
usage.InputTokens += openclawInt64(event.Usage, "input")
|
||||
usage.OutputTokens += openclawInt64(event.Usage, "output")
|
||||
usage.CacheReadTokens += openclawInt64(event.Usage, "cacheRead")
|
||||
usage.CacheWriteTokens += openclawInt64(event.Usage, "cacheWrite")
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Try parsing as a final result blob (legacy format).
|
||||
if result, ok := tryParseOpenclawResult(line); ok {
|
||||
gotEvents = true
|
||||
res := b.buildOpenclawEventResult(result, ch, &output)
|
||||
if res.sessionID != "" {
|
||||
sessionID = res.sessionID
|
||||
}
|
||||
// Prefer usage from the final result if no streaming events reported it.
|
||||
u := res.usage
|
||||
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 || u.CacheWriteTokens > 0 {
|
||||
usage = u
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Not JSON — treat as log line.
|
||||
b.cfg.Logger.Debug("[openclaw:stderr] " + line)
|
||||
rawLines = append(rawLines, line)
|
||||
}
|
||||
@@ -153,13 +238,42 @@ func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclaw
|
||||
return openclawEventResult{status: "failed", errMsg: fmt.Sprintf("read stderr: %v", err)}
|
||||
}
|
||||
|
||||
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
|
||||
if trimmed != "" {
|
||||
return openclawEventResult{status: "completed", output: trimmed}
|
||||
// If we got no events at all, fall back to raw output.
|
||||
if !gotEvents {
|
||||
trimmed := strings.TrimSpace(strings.Join(rawLines, "\n"))
|
||||
if trimmed != "" {
|
||||
return openclawEventResult{status: "completed", output: trimmed}
|
||||
}
|
||||
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
|
||||
}
|
||||
|
||||
return openclawEventResult{
|
||||
status: finalStatus,
|
||||
errMsg: finalError,
|
||||
output: output.String(),
|
||||
sessionID: sessionID,
|
||||
usage: usage,
|
||||
}
|
||||
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
|
||||
}
|
||||
|
||||
// tryParseOpenclawEvent attempts to parse a line as a streaming NDJSON event.
|
||||
// Returns the event and true if the line is a valid event with a known type.
|
||||
func tryParseOpenclawEvent(line string) (openclawEvent, bool) {
|
||||
if len(line) == 0 || line[0] != '{' {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
var event openclawEvent
|
||||
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
if event.Type == "" {
|
||||
return openclawEvent{}, false
|
||||
}
|
||||
return event, true
|
||||
}
|
||||
|
||||
// tryParseOpenclawResult attempts to parse a line as a final result blob
|
||||
// (the legacy format with payloads + meta).
|
||||
func tryParseOpenclawResult(raw string) (openclawResult, bool) {
|
||||
// Try each '{' position until we find valid openclawResult JSON.
|
||||
// Earlier '{' chars may appear in log/error lines (e.g. raw_params={...}).
|
||||
@@ -175,14 +289,13 @@ func tryParseOpenclawResult(raw string) (openclawResult, bool) {
|
||||
return openclawResult{}, false
|
||||
}
|
||||
|
||||
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message) openclawEventResult {
|
||||
var output strings.Builder
|
||||
// buildOpenclawEventResult extracts text and metadata from a final result blob.
|
||||
// Text payloads are appended to the shared output builder and emitted to ch.
|
||||
func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch chan<- Message, output *strings.Builder) openclawEventResult {
|
||||
for _, p := range result.Payloads {
|
||||
if p.Text != "" {
|
||||
if output.Len() > 0 {
|
||||
output.WriteString("\n")
|
||||
}
|
||||
output.WriteString(p.Text)
|
||||
trySend(ch, Message{Type: MessageText, Content: p.Text})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,10 +313,6 @@ func (b *openclawBackend) buildOpenclawEventResult(result openclawResult, ch cha
|
||||
}
|
||||
}
|
||||
|
||||
if output.Len() > 0 {
|
||||
trySend(ch, Message{Type: MessageText, Content: output.String()})
|
||||
}
|
||||
|
||||
return openclawEventResult{
|
||||
status: "completed",
|
||||
output: output.String(),
|
||||
@@ -231,7 +340,20 @@ func openclawInt64(data map[string]any, key string) int64 {
|
||||
|
||||
// ── JSON types for `openclaw agent --json` output ──
|
||||
|
||||
// openclawResult represents the JSON output from `openclaw agent --json`.
|
||||
// openclawEvent represents a single streaming NDJSON event from openclaw --json.
|
||||
// Event types: "text", "tool_use", "tool_result", "error", "step_start", "step_finish".
|
||||
type openclawEvent struct {
|
||||
Type string `json:"type"`
|
||||
SessionID string `json:"sessionId,omitempty"`
|
||||
Text string `json:"text,omitempty"`
|
||||
Tool string `json:"tool,omitempty"`
|
||||
CallID string `json:"callId,omitempty"`
|
||||
Input json.RawMessage `json:"input,omitempty"`
|
||||
Usage map[string]any `json:"usage,omitempty"`
|
||||
}
|
||||
|
||||
// openclawResult represents the final JSON output from `openclaw agent --json`
|
||||
// (the legacy single-blob format with payloads + meta).
|
||||
type openclawResult struct {
|
||||
Payloads []openclawPayload `json:"payloads"`
|
||||
Meta openclawMeta `json:"meta"`
|
||||
|
||||
@@ -18,7 +18,7 @@ func TestNewReturnsOpenclawBackend(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// ── processOutput tests ──
|
||||
// ── Legacy result format tests (processOutput with final JSON blob) ──
|
||||
|
||||
func TestOpenclawProcessOutputHappyPath(t *testing.T) {
|
||||
t.Parallel()
|
||||
@@ -90,11 +90,24 @@ func TestOpenclawProcessOutputMultiplePayloads(t *testing.T) {
|
||||
|
||||
res := b.processOutput(strings.NewReader(string(data)), ch)
|
||||
|
||||
if res.output != "First\nSecond" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "First\nSecond")
|
||||
if res.output != "FirstSecond" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "FirstSecond")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 text messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[0].Content != "First" {
|
||||
t.Errorf("msg[0]: got %q, want %q", msgs[0].Content, "First")
|
||||
}
|
||||
if msgs[1].Content != "Second" {
|
||||
t.Errorf("msg[1]: got %q, want %q", msgs[1].Content, "Second")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawProcessOutputEmptyPayloads(t *testing.T) {
|
||||
@@ -253,6 +266,222 @@ func TestOpenclawProcessOutputWithBracesInLogLines(t *testing.T) {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// ── Streaming NDJSON event tests ──
|
||||
|
||||
func TestOpenclawStreamingTextEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Hello "}`,
|
||||
`{"type":"text","text":"world"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
if res.output != "Hello world" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "Hello world")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[0].Type != MessageText || msgs[0].Content != "Hello " {
|
||||
t.Errorf("msg[0]: type=%s content=%q", msgs[0].Type, msgs[0].Content)
|
||||
}
|
||||
if msgs[1].Type != MessageText || msgs[1].Content != "world" {
|
||||
t.Errorf("msg[1]: type=%s content=%q", msgs[1].Type, msgs[1].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingToolUseEvents(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"tool_use","tool":"bash","callId":"call_1","input":{"command":"ls -la"}}`,
|
||||
`{"type":"tool_result","tool":"bash","callId":"call_1","text":"total 42\ndrwxr-xr-x"}`,
|
||||
`{"type":"text","text":"Listed files."}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 3 {
|
||||
t.Fatalf("expected 3 messages, got %d", len(msgs))
|
||||
}
|
||||
|
||||
// tool_use
|
||||
if msgs[0].Type != MessageToolUse {
|
||||
t.Errorf("msg[0] type: got %s, want tool-use", msgs[0].Type)
|
||||
}
|
||||
if msgs[0].Tool != "bash" {
|
||||
t.Errorf("msg[0] tool: got %q, want %q", msgs[0].Tool, "bash")
|
||||
}
|
||||
if msgs[0].CallID != "call_1" {
|
||||
t.Errorf("msg[0] callID: got %q, want %q", msgs[0].CallID, "call_1")
|
||||
}
|
||||
if msgs[0].Input["command"] != "ls -la" {
|
||||
t.Errorf("msg[0] input: got %v", msgs[0].Input)
|
||||
}
|
||||
|
||||
// tool_result
|
||||
if msgs[1].Type != MessageToolResult {
|
||||
t.Errorf("msg[1] type: got %s, want tool-result", msgs[1].Type)
|
||||
}
|
||||
if msgs[1].CallID != "call_1" {
|
||||
t.Errorf("msg[1] callID: got %q", msgs[1].CallID)
|
||||
}
|
||||
if msgs[1].Output != "total 42\ndrwxr-xr-x" {
|
||||
t.Errorf("msg[1] output: got %q", msgs[1].Output)
|
||||
}
|
||||
|
||||
// text
|
||||
if msgs[2].Type != MessageText || msgs[2].Content != "Listed files." {
|
||||
t.Errorf("msg[2]: type=%s content=%q", msgs[2].Type, msgs[2].Content)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingErrorEvent(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Starting..."}`,
|
||||
`{"type":"error","text":"model not found: gpt-99"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "failed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "failed")
|
||||
}
|
||||
if res.errMsg != "model not found: gpt-99" {
|
||||
t.Errorf("errMsg: got %q", res.errMsg)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 messages, got %d", len(msgs))
|
||||
}
|
||||
if msgs[1].Type != MessageError {
|
||||
t.Errorf("msg[1] type: got %s, want error", msgs[1].Type)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingStepFinishUsage(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"step_start"}`,
|
||||
`{"type":"text","text":"Done"}`,
|
||||
`{"type":"step_finish","usage":{"input":200,"output":100,"cacheRead":50,"cacheWrite":25}}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.usage.InputTokens != 200 {
|
||||
t.Errorf("input tokens: got %d, want 200", res.usage.InputTokens)
|
||||
}
|
||||
if res.usage.OutputTokens != 100 {
|
||||
t.Errorf("output tokens: got %d, want 100", res.usage.OutputTokens)
|
||||
}
|
||||
if res.usage.CacheReadTokens != 50 {
|
||||
t.Errorf("cache read: got %d, want 50", res.usage.CacheReadTokens)
|
||||
}
|
||||
if res.usage.CacheWriteTokens != 25 {
|
||||
t.Errorf("cache write: got %d, want 25", res.usage.CacheWriteTokens)
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingSessionID(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
`{"type":"text","text":"Hi","sessionId":"ses_stream_123"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.sessionID != "ses_stream_123" {
|
||||
t.Errorf("sessionID: got %q, want %q", res.sessionID, "ses_stream_123")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
}
|
||||
|
||||
func TestOpenclawStreamingMixedWithLogLines(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := &openclawBackend{cfg: Config{Logger: slog.Default()}}
|
||||
ch := make(chan Message, 256)
|
||||
|
||||
lines := []string{
|
||||
"[info] initializing agent...",
|
||||
`{"type":"text","text":"Hello"}`,
|
||||
"[debug] tool exec completed",
|
||||
`{"type":"text","text":" world"}`,
|
||||
}
|
||||
input := strings.Join(lines, "\n")
|
||||
|
||||
res := b.processOutput(strings.NewReader(input), ch)
|
||||
|
||||
if res.status != "completed" {
|
||||
t.Errorf("status: got %q, want %q", res.status, "completed")
|
||||
}
|
||||
if res.output != "Hello world" {
|
||||
t.Errorf("output: got %q, want %q", res.output, "Hello world")
|
||||
}
|
||||
|
||||
close(ch)
|
||||
var msgs []Message
|
||||
for m := range ch {
|
||||
msgs = append(msgs, m)
|
||||
}
|
||||
if len(msgs) != 2 {
|
||||
t.Fatalf("expected 2 text messages, got %d", len(msgs))
|
||||
}
|
||||
}
|
||||
|
||||
// ── openclawInt64 tests ──
|
||||
|
||||
func TestOpenclawInt64Float(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user