Files
multica/server/pkg/agent/codebuddy.go
2026-05-30 14:53:32 +08:00

455 lines
13 KiB
Go

package agent
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"strings"
"time"
)
// codebuddyBackend implements Backend by spawning the Claude Code CLI
// (codebuddy fork) with --output-format stream-json.
type codebuddyBackend struct {
cfg Config
}
// codebuddyBlockedArgs are flags hardcoded by the daemon that must not be
// overridden by user-configured custom_args. Overriding these would break
// the daemon↔codebuddy communication protocol.
var codebuddyBlockedArgs = map[string]blockedArgMode{
"-p": blockedStandalone, // non-interactive mode
"--output-format": blockedWithValue, // stream-json protocol
"--input-format": blockedWithValue, // stream-json protocol
"--permission-mode": blockedWithValue, // bypassPermissions for autonomous operation
"--mcp-config": blockedWithValue, // set by daemon from agent.mcp_config
// `--effort` is owned by the per-agent thinking_level picker so a
// user-supplied custom_arg cannot silently outvote it.
"--effort": blockedWithValue,
}
func buildCodebuddyArgs(opts ExecOptions, logger *slog.Logger) []string {
args := []string{
"-p",
"--output-format", "stream-json",
"--input-format", "stream-json",
"--verbose",
"--strict-mcp-config",
"--permission-mode", "bypassPermissions",
"--disallowedTools", "AskUserQuestion",
}
if opts.Model != "" {
args = append(args, "--model", opts.Model)
}
if opts.ThinkingLevel != "" {
args = append(args, "--effort", opts.ThinkingLevel)
}
if opts.MaxTurns > 0 {
args = append(args, "--max-turns", fmt.Sprintf("%d", opts.MaxTurns))
}
if opts.SystemPrompt != "" {
args = append(args, "--append-system-prompt", opts.SystemPrompt)
}
if opts.ResumeSessionID != "" {
args = append(args, "--resume", opts.ResumeSessionID)
}
args = append(args, filterCustomArgs(opts.ExtraArgs, codebuddyBlockedArgs, logger)...)
args = append(args, filterCustomArgs(opts.CustomArgs, codebuddyBlockedArgs, logger)...)
return args
}
func (b *codebuddyBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
execPath := b.cfg.ExecutablePath
if execPath == "" {
execPath = "codebuddy"
}
if _, err := exec.LookPath(execPath); err != nil {
return nil, fmt.Errorf("codebuddy executable not found at %q: %w", execPath, err)
}
timeout := opts.Timeout
if timeout == 0 {
timeout = 20 * time.Minute
}
runCtx, cancel := context.WithTimeout(ctx, timeout)
args := buildCodebuddyArgs(opts, b.cfg.Logger)
// If the caller provided an MCP config, write it to a temp file and pass
// --mcp-config <path> so the agent uses a controlled set of MCP servers.
var mcpConfigPath string
var mcpFileCleanup func()
if len(opts.McpConfig) > 0 {
path, err := writeMcpConfigToTemp(opts.McpConfig)
if err != nil {
cancel()
return nil, err
}
mcpConfigPath = path
mcpFileCleanup = func() { os.Remove(mcpConfigPath) }
args = append(args, "--mcp-config", mcpConfigPath)
}
// Clean up the temp file if we return before the goroutine takes ownership.
defer func() {
if mcpFileCleanup != nil {
mcpFileCleanup()
}
}()
cmd := exec.CommandContext(runCtx, execPath, args...)
hideAgentWindow(cmd)
b.cfg.Logger.Info("agent command", "exec", execPath, "args", args)
cmd.WaitDelay = 10 * time.Second
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
cmd.Env = buildEnv(b.cfg.Env)
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("codebuddy stdout pipe: %w", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("codebuddy stdin pipe: %w", err)
}
closeStdin := func() {
if stdin != nil {
_ = stdin.Close()
stdin = nil
}
}
stderrBuf := newStderrTail(newLogWriter(b.cfg.Logger, "[codebuddy:stderr] "), agentStderrTailBytes)
cmd.Stderr = stderrBuf
if err := cmd.Start(); err != nil {
closeStdin()
cancel()
return nil, fmt.Errorf("start codebuddy: %w", err)
}
if err := writeCodebuddyInput(stdin, prompt); err != nil {
closeStdin()
cancel()
_ = cmd.Wait()
return nil, errors.New(withAgentStderr(fmt.Sprintf("write codebuddy input: %v", err), "codebuddy", stderrBuf.Tail()))
}
closeStdin()
b.cfg.Logger.Info("codebuddy started", "pid", cmd.Process.Pid, "cwd", opts.Cwd, "model", opts.Model)
// cmd.Start() succeeded — transfer temp file ownership to the goroutine.
mcpFileCleanup = nil
msgCh := make(chan Message, 256)
resCh := make(chan Result, 1)
go func() {
defer cancel()
defer close(msgCh)
defer close(resCh)
if mcpConfigPath != "" {
defer os.Remove(mcpConfigPath)
}
startTime := time.Now()
var output strings.Builder
var sessionID string
finalStatus := "completed"
var finalError string
usage := make(map[string]TokenUsage)
// Close stdout when the context is cancelled so scanner.Scan() unblocks.
go func() {
<-runCtx.Done()
_ = stdout.Close()
}()
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
var msg codebuddySDKMessage
if err := json.Unmarshal([]byte(line), &msg); err != nil {
continue
}
switch msg.Type {
case "assistant":
b.handleAssistant(msg, msgCh, &output, usage)
case "user":
b.handleUser(msg, msgCh)
case "system":
if msg.SessionID != "" {
sessionID = msg.SessionID
}
trySend(msgCh, Message{Type: MessageStatus, Status: "running", SessionID: sessionID})
case "result":
closeStdin()
sessionID = msg.SessionID
if msg.ResultText != "" {
output.Reset()
output.WriteString(msg.ResultText)
}
if resultUsage := codebuddyResultUsage(msg, opts.Model); len(resultUsage) > 0 {
usage = resultUsage
}
if msg.IsError {
finalStatus = "failed"
finalError = msg.ResultText
}
case "log":
if msg.Log != nil {
trySend(msgCh, Message{
Type: MessageLog,
Level: msg.Log.Level,
Content: msg.Log.Message,
})
}
}
}
// Wait for process exit.
exitErr := cmd.Wait()
duration := time.Since(startTime)
if runCtx.Err() == context.DeadlineExceeded {
finalStatus = "timeout"
finalError = fmt.Sprintf("codebuddy timed out after %s", timeout)
} else if runCtx.Err() == context.Canceled {
finalStatus = "aborted"
finalError = "execution cancelled"
} else if exitErr != nil && finalStatus == "completed" {
finalStatus = "failed"
finalError = fmt.Sprintf("codebuddy exited with error: %v", exitErr)
}
if finalError != "" {
finalError = withAgentStderr(finalError, "codebuddy", stderrBuf.Tail())
}
b.cfg.Logger.Info("codebuddy finished", "pid", cmd.Process.Pid, "status", finalStatus, "duration", duration.Round(time.Millisecond).String())
reportedSessionID := resolveSessionID(opts.ResumeSessionID, sessionID, finalStatus == "failed")
if reportedSessionID != sessionID {
b.cfg.Logger.Info("codebuddy resume did not land; clearing fresh session id for daemon fallback",
"requested_resume", opts.ResumeSessionID,
"emitted_session", sessionID,
)
}
resCh <- Result{
Status: finalStatus,
Output: output.String(),
Error: finalError,
DurationMs: duration.Milliseconds(),
SessionID: reportedSessionID,
Usage: usage,
}
}()
return &Session{Messages: msgCh, Result: resCh}, nil
}
func (b *codebuddyBackend) handleAssistant(msg codebuddySDKMessage, ch chan<- Message, output *strings.Builder, usage map[string]TokenUsage) {
var content codebuddyMessageContent
if err := json.Unmarshal(msg.Message, &content); err != nil {
return
}
// Accumulate token usage per model.
if content.Usage != nil && content.Model != "" {
u := usage[content.Model]
u.InputTokens += content.Usage.InputTokens
u.OutputTokens += content.Usage.OutputTokens
u.CacheReadTokens += content.Usage.CacheReadInputTokens
u.CacheWriteTokens += content.Usage.CacheCreationInputTokens
usage[content.Model] = u
}
for _, block := range content.Content {
switch block.Type {
case "text":
if block.Text != "" {
output.WriteString(block.Text)
trySend(ch, Message{Type: MessageText, Content: block.Text})
}
case "thinking":
if block.Text != "" {
trySend(ch, Message{Type: MessageThinking, Content: block.Text})
}
case "tool_use":
var input map[string]any
if block.Input != nil {
_ = json.Unmarshal(block.Input, &input)
}
trySend(ch, Message{
Type: MessageToolUse,
Tool: block.Name,
CallID: block.ID,
Input: input,
})
}
}
}
func (b *codebuddyBackend) handleUser(msg codebuddySDKMessage, ch chan<- Message) {
var content codebuddyMessageContent
if err := json.Unmarshal(msg.Message, &content); err != nil {
return
}
for _, block := range content.Content {
if block.Type == "tool_result" {
resultStr := ""
if block.Content != nil {
resultStr = string(block.Content)
}
trySend(ch, Message{
Type: MessageToolResult,
CallID: block.ToolUseID,
Output: resultStr,
})
}
}
}
func writeCodebuddyInput(w io.Writer, prompt string) error {
payload := map[string]any{
"type": "user",
"message": map[string]any{
"role": "user",
"content": []map[string]string{
{
"type": "text",
"text": prompt,
},
},
},
}
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal codebuddy input: %w", err)
}
data = append(data, '\n')
if _, err := w.Write(data); err != nil {
return err
}
return nil
}
// ── Codebuddy SDK JSON types ──
type codebuddySDKMessage struct {
Type string `json:"type"`
Message json.RawMessage `json:"message,omitempty"`
Subtype string `json:"subtype,omitempty"`
SessionID string `json:"session_id,omitempty"`
Model string `json:"model,omitempty"`
// result fields
ResultText string `json:"result,omitempty"`
IsError bool `json:"is_error,omitempty"`
DurationMs float64 `json:"duration_ms,omitempty"`
NumTurns int `json:"num_turns,omitempty"`
Usage *codebuddyUsage `json:"usage,omitempty"`
ModelUsage map[string]codebuddyResultModelUsage `json:"modelUsage,omitempty"`
// log fields
Log *codebuddyLogEntry `json:"log,omitempty"`
}
type codebuddyLogEntry struct {
Level string `json:"level"`
Message string `json:"message"`
}
type codebuddyMessageContent struct {
Role string `json:"role"`
Model string `json:"model"`
Content []codebuddyContentBlock `json:"content"`
Usage *codebuddyUsage `json:"usage,omitempty"`
}
type codebuddyUsage struct {
InputTokens int64 `json:"input_tokens"`
OutputTokens int64 `json:"output_tokens"`
CacheReadInputTokens int64 `json:"cache_read_input_tokens"`
CacheCreationInputTokens int64 `json:"cache_creation_input_tokens"`
}
type codebuddyResultModelUsage struct {
InputTokens int64 `json:"inputTokens"`
OutputTokens int64 `json:"outputTokens"`
CacheReadInputTokens int64 `json:"cacheReadInputTokens"`
CacheCreationInputTokens int64 `json:"cacheCreationInputTokens"`
}
type codebuddyContentBlock struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Input json.RawMessage `json:"input,omitempty"`
ToolUseID string `json:"tool_use_id,omitempty"`
Content json.RawMessage `json:"content,omitempty"`
}
func codebuddyResultUsage(msg codebuddySDKMessage, fallbackModel string) map[string]TokenUsage {
if len(msg.ModelUsage) > 0 {
usage := make(map[string]TokenUsage, len(msg.ModelUsage))
for model, u := range msg.ModelUsage {
if model == "" || !codebuddyUsageHasTokens(u.InputTokens, u.OutputTokens, u.CacheReadInputTokens, u.CacheCreationInputTokens) {
continue
}
usage[model] = TokenUsage{
InputTokens: u.InputTokens,
OutputTokens: u.OutputTokens,
CacheReadTokens: u.CacheReadInputTokens,
CacheWriteTokens: u.CacheCreationInputTokens,
}
}
if len(usage) > 0 {
return usage
}
}
model := msg.Model
if model == "" {
model = fallbackModel
}
if msg.Usage == nil || model == "" || !codebuddyUsageHasTokens(
msg.Usage.InputTokens,
msg.Usage.OutputTokens,
msg.Usage.CacheReadInputTokens,
msg.Usage.CacheCreationInputTokens,
) {
return nil
}
return map[string]TokenUsage{
model: {
InputTokens: msg.Usage.InputTokens,
OutputTokens: msg.Usage.OutputTokens,
CacheReadTokens: msg.Usage.CacheReadInputTokens,
CacheWriteTokens: msg.Usage.CacheCreationInputTokens,
},
}
}
func codebuddyUsageHasTokens(input, output, cacheRead, cacheWrite int64) bool {
return input > 0 || output > 0 || cacheRead > 0 || cacheWrite > 0
}