mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 18:09:14 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/d7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
61e6242054 | ||
|
|
5ce18907a6 |
@@ -354,7 +354,7 @@ function SingleAgentLiveCard({ task, items, issueId, agentName }: SingleAgentLiv
|
||||
)}
|
||||
>
|
||||
<div className="overflow-hidden">
|
||||
{items.length > 0 && (
|
||||
{items.length > 0 ? (
|
||||
<div
|
||||
ref={scrollRef}
|
||||
onScroll={handleScroll}
|
||||
@@ -380,6 +380,12 @@ function SingleAgentLiveCard({ task, items, issueId, agentName }: SingleAgentLiv
|
||||
</button>
|
||||
)}
|
||||
</div>
|
||||
) : (
|
||||
<div className="border-t border-info/10 px-3 py-3">
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Live log is not available for this agent provider. Results will appear when the task completes.
|
||||
</p>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package agent
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -11,7 +10,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// openclawBackend implements Backend by spawning `openclaw agent -p <prompt>
|
||||
// openclawBackend implements Backend by spawning `openclaw agent --message <prompt>
|
||||
// --output-format stream-json --yes` and reading streaming NDJSON events from
|
||||
// stdout — similar to the opencode backend.
|
||||
type openclawBackend struct {
|
||||
@@ -33,20 +32,15 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
}
|
||||
runCtx, cancel := context.WithTimeout(ctx, timeout)
|
||||
|
||||
args := []string{"agent", "--output-format", "stream-json", "--yes"}
|
||||
if opts.Model != "" {
|
||||
args = append(args, "--model", opts.Model)
|
||||
sessionID := opts.ResumeSessionID
|
||||
if sessionID == "" {
|
||||
sessionID = fmt.Sprintf("multica-%d", time.Now().UnixNano())
|
||||
}
|
||||
if opts.SystemPrompt != "" {
|
||||
args = append(args, "--system-prompt", opts.SystemPrompt)
|
||||
args := []string{"agent", "--local", "--json", "--session-id", sessionID}
|
||||
if opts.Timeout > 0 {
|
||||
args = append(args, "--timeout", fmt.Sprintf("%d", int(opts.Timeout.Seconds())))
|
||||
}
|
||||
if opts.MaxTurns > 0 {
|
||||
args = append(args, "--max-turns", fmt.Sprintf("%d", opts.MaxTurns))
|
||||
}
|
||||
if opts.ResumeSessionID != "" {
|
||||
args = append(args, "--session", opts.ResumeSessionID)
|
||||
}
|
||||
args = append(args, "-p", prompt)
|
||||
args = append(args, "--message", prompt)
|
||||
|
||||
cmd := exec.CommandContext(runCtx, execPath, args...)
|
||||
if opts.Cwd != "" {
|
||||
@@ -54,12 +48,13 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
}
|
||||
cmd.Env = buildEnv(b.cfg.Env)
|
||||
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
// openclaw writes its --json output to stderr, not stdout.
|
||||
stderr, err := cmd.StderrPipe()
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("openclaw stdout pipe: %w", err)
|
||||
return nil, fmt.Errorf("openclaw stderr pipe: %w", err)
|
||||
}
|
||||
cmd.Stderr = newLogWriter(b.cfg.Logger, "[openclaw:stderr] ")
|
||||
cmd.Stdout = newLogWriter(b.cfg.Logger, "[openclaw:stdout] ")
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
cancel()
|
||||
@@ -77,7 +72,7 @@ func (b *openclawBackend) Execute(ctx context.Context, prompt string, opts ExecO
|
||||
defer close(resCh)
|
||||
|
||||
startTime := time.Now()
|
||||
scanResult := b.processEvents(stdout, msgCh)
|
||||
scanResult := b.processOutput(stderr, msgCh)
|
||||
|
||||
// Wait for process exit.
|
||||
exitErr := cmd.Wait()
|
||||
@@ -132,86 +127,75 @@ type openclawEventResult struct {
|
||||
usage TokenUsage
|
||||
}
|
||||
|
||||
// processEvents reads NDJSON lines from r, dispatches events to ch, and returns
|
||||
// the accumulated result.
|
||||
func (b *openclawBackend) processEvents(r io.Reader, ch chan<- Message) openclawEventResult {
|
||||
var output strings.Builder
|
||||
var sessionID string
|
||||
var usage TokenUsage
|
||||
finalStatus := "completed"
|
||||
var finalError string
|
||||
// processOutput reads the JSON output from openclaw --json stderr and returns
|
||||
// the parsed result. OpenClaw writes its JSON result to stderr, which may also
|
||||
// contain non-JSON log lines. We extract the JSON object by finding the first '{'.
|
||||
func (b *openclawBackend) processOutput(r io.Reader, ch chan<- Message) openclawEventResult {
|
||||
data, err := io.ReadAll(r)
|
||||
if err != nil {
|
||||
return openclawEventResult{status: "failed", errMsg: fmt.Sprintf("read stderr: %v", err)}
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(r)
|
||||
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := strings.TrimSpace(scanner.Text())
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
var event openclawEvent
|
||||
if err := json.Unmarshal([]byte(line), &event); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if event.SessionID != "" {
|
||||
sessionID = event.SessionID
|
||||
}
|
||||
|
||||
switch event.Type {
|
||||
case "text":
|
||||
b.handleOCTextEvent(event, ch, &output)
|
||||
case "thinking":
|
||||
b.handleOCThinkingEvent(event, ch)
|
||||
case "tool_call":
|
||||
b.handleOCToolCallEvent(event, ch)
|
||||
case "error":
|
||||
// NOTE: error events unconditionally set finalStatus to "failed" and
|
||||
// it stays sticky — subsequent text or result events won't revert it.
|
||||
// This is intentional: once an error fires, the session is considered
|
||||
// failed regardless of later events.
|
||||
b.handleOCErrorEvent(event, ch, &finalStatus, &finalError)
|
||||
case "step_start":
|
||||
trySend(ch, Message{Type: MessageStatus, Status: "running"})
|
||||
case "step_end":
|
||||
// Accumulate token usage from step_end events if present.
|
||||
if event.Data != nil {
|
||||
usage.InputTokens += openclawInt64(event.Data, "inputTokens")
|
||||
usage.OutputTokens += openclawInt64(event.Data, "outputTokens")
|
||||
usage.CacheReadTokens += openclawInt64(event.Data, "cacheReadTokens")
|
||||
usage.CacheWriteTokens += openclawInt64(event.Data, "cacheWriteTokens")
|
||||
}
|
||||
case "result":
|
||||
// The result event only updates status on explicit failure. A
|
||||
// "completed" result is a no-op because finalStatus defaults to
|
||||
// "completed". Any unrecognized status (e.g. "partial") is also
|
||||
// treated as success — update this if OpenClaw adds new statuses.
|
||||
if event.Data != nil {
|
||||
if s, ok := event.Data["status"].(string); ok && s != "" {
|
||||
if s == "error" || s == "failed" {
|
||||
finalStatus = "failed"
|
||||
if msg, ok := event.Data["error"].(string); ok {
|
||||
finalError = msg
|
||||
}
|
||||
}
|
||||
}
|
||||
// Log non-JSON lines and find the JSON object
|
||||
raw := string(data)
|
||||
jsonStart := strings.Index(raw, "{")
|
||||
if jsonStart > 0 {
|
||||
for _, line := range strings.Split(raw[:jsonStart], "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if line != "" {
|
||||
b.cfg.Logger.Debug("[openclaw:stderr] " + line)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for scanner errors (e.g. broken pipe, read errors).
|
||||
if scanErr := scanner.Err(); scanErr != nil {
|
||||
b.cfg.Logger.Warn("openclaw stdout scanner error", "error", scanErr)
|
||||
if finalStatus == "completed" {
|
||||
finalStatus = "failed"
|
||||
finalError = fmt.Sprintf("stdout read error: %v", scanErr)
|
||||
var result openclawResult
|
||||
jsonData := raw
|
||||
if jsonStart >= 0 {
|
||||
jsonData = raw[jsonStart:]
|
||||
}
|
||||
if err := json.Unmarshal([]byte(jsonData), &result); err != nil {
|
||||
// If we can't parse JSON, return raw output as-is
|
||||
trimmed := strings.TrimSpace(raw)
|
||||
if trimmed != "" {
|
||||
b.cfg.Logger.Debug("[openclaw:stderr] " + trimmed)
|
||||
return openclawEventResult{status: "completed", output: trimmed}
|
||||
}
|
||||
return openclawEventResult{status: "failed", errMsg: "openclaw returned no parseable output"}
|
||||
}
|
||||
|
||||
// Extract text from payloads
|
||||
var output strings.Builder
|
||||
for _, p := range result.Payloads {
|
||||
if p.Text != "" {
|
||||
if output.Len() > 0 {
|
||||
output.WriteString("\n")
|
||||
}
|
||||
output.WriteString(p.Text)
|
||||
}
|
||||
}
|
||||
|
||||
// Extract session ID and usage from meta
|
||||
var sessionID string
|
||||
var usage TokenUsage
|
||||
if result.Meta.AgentMeta != nil {
|
||||
if sid, ok := result.Meta.AgentMeta["sessionId"].(string); ok {
|
||||
sessionID = sid
|
||||
}
|
||||
if u, ok := result.Meta.AgentMeta["usage"].(map[string]any); ok {
|
||||
usage.InputTokens = openclawInt64(u, "input")
|
||||
usage.OutputTokens = openclawInt64(u, "output")
|
||||
usage.CacheReadTokens = openclawInt64(u, "cacheRead")
|
||||
usage.CacheWriteTokens = openclawInt64(u, "cacheWrite")
|
||||
}
|
||||
}
|
||||
|
||||
// Send final text as a message
|
||||
if output.Len() > 0 {
|
||||
trySend(ch, Message{Type: MessageText, Content: output.String()})
|
||||
}
|
||||
|
||||
return openclawEventResult{
|
||||
status: finalStatus,
|
||||
errMsg: finalError,
|
||||
status: "completed",
|
||||
output: output.String(),
|
||||
sessionID: sessionID,
|
||||
usage: usage,
|
||||
@@ -235,118 +219,19 @@ func openclawInt64(data map[string]any, key string) int64 {
|
||||
}
|
||||
}
|
||||
|
||||
func (b *openclawBackend) handleOCTextEvent(event openclawEvent, ch chan<- Message, output *strings.Builder) {
|
||||
text := openclawExtractText(event.Data)
|
||||
if text != "" {
|
||||
output.WriteString(text)
|
||||
trySend(ch, Message{Type: MessageText, Content: text})
|
||||
}
|
||||
// ── JSON types for `openclaw agent --json` output ──
|
||||
|
||||
// openclawResult represents the JSON output from `openclaw agent --json`.
|
||||
type openclawResult struct {
|
||||
Payloads []openclawPayload `json:"payloads"`
|
||||
Meta openclawMeta `json:"meta"`
|
||||
}
|
||||
|
||||
func (b *openclawBackend) handleOCThinkingEvent(event openclawEvent, ch chan<- Message) {
|
||||
text := openclawExtractText(event.Data)
|
||||
if text != "" {
|
||||
trySend(ch, Message{Type: MessageThinking, Content: text})
|
||||
}
|
||||
type openclawPayload struct {
|
||||
Text string `json:"text"`
|
||||
}
|
||||
|
||||
// handleOCToolCallEvent processes "tool_call" events from OpenClaw. A single
|
||||
// tool_call event may contain both the call and result when the tool has
|
||||
// completed (status == "completed").
|
||||
func (b *openclawBackend) handleOCToolCallEvent(event openclawEvent, ch chan<- Message) {
|
||||
if event.Data == nil {
|
||||
return
|
||||
}
|
||||
|
||||
name, _ := event.Data["name"].(string)
|
||||
callID, _ := event.Data["callId"].(string)
|
||||
|
||||
// Extract input.
|
||||
var input map[string]any
|
||||
if raw, ok := event.Data["input"]; ok {
|
||||
if m, ok := raw.(map[string]any); ok {
|
||||
input = m
|
||||
}
|
||||
}
|
||||
|
||||
// Emit the tool-use message.
|
||||
trySend(ch, Message{
|
||||
Type: MessageToolUse,
|
||||
Tool: name,
|
||||
CallID: callID,
|
||||
Input: input,
|
||||
})
|
||||
|
||||
// If the tool has completed, also emit a tool-result message.
|
||||
status, _ := event.Data["status"].(string)
|
||||
if status == "completed" {
|
||||
outputStr := extractToolOutput(event.Data["output"])
|
||||
trySend(ch, Message{
|
||||
Type: MessageToolResult,
|
||||
Tool: name,
|
||||
CallID: callID,
|
||||
Output: outputStr,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (b *openclawBackend) handleOCErrorEvent(event openclawEvent, ch chan<- Message, finalStatus, finalError *string) {
|
||||
errMsg := ""
|
||||
if event.Data != nil {
|
||||
if msg, ok := event.Data["message"].(string); ok {
|
||||
errMsg = msg
|
||||
}
|
||||
if errMsg == "" {
|
||||
if code, ok := event.Data["code"].(string); ok {
|
||||
errMsg = code
|
||||
}
|
||||
}
|
||||
}
|
||||
if errMsg == "" {
|
||||
errMsg = "unknown openclaw error"
|
||||
}
|
||||
|
||||
b.cfg.Logger.Warn("openclaw error event", "error", errMsg)
|
||||
trySend(ch, Message{Type: MessageError, Content: errMsg})
|
||||
|
||||
*finalStatus = "failed"
|
||||
*finalError = errMsg
|
||||
}
|
||||
|
||||
// openclawExtractText extracts text content from an openclaw event data map.
|
||||
// Supports both flat {"text": "..."} and nested {"content": {"text": "..."}} layouts.
|
||||
func openclawExtractText(data map[string]any) string {
|
||||
if data == nil {
|
||||
return ""
|
||||
}
|
||||
// Try "text" field directly.
|
||||
if text, ok := data["text"].(string); ok {
|
||||
return text
|
||||
}
|
||||
// Try nested "content.text".
|
||||
if content, ok := data["content"].(map[string]any); ok {
|
||||
if text, ok := content["text"].(string); ok {
|
||||
return text
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ── JSON types for `openclaw agent --output-format stream-json` stdout events ──
|
||||
|
||||
// openclawEvent represents a single NDJSON line from OpenClaw's stream-json output.
|
||||
//
|
||||
// Event types:
|
||||
//
|
||||
// "step_start" — agent step begins
|
||||
// "text" — text output from agent
|
||||
// "thinking" — model reasoning/thinking
|
||||
// "tool_call" — tool invocation with call and result
|
||||
// "error" — error from openclaw
|
||||
// "step_end" — agent step completes
|
||||
// "result" — final result with status
|
||||
type openclawEvent struct {
|
||||
Type string `json:"type"`
|
||||
SessionID string `json:"sessionId,omitempty"`
|
||||
Data map[string]any `json:"data,omitempty"`
|
||||
type openclawMeta struct {
|
||||
DurationMs int64 `json:"durationMs"`
|
||||
AgentMeta map[string]any `json:"agentMeta"`
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user