mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
The Pi backend hardcoded `--tools read,bash,edit,write,grep,find,ls` in buildPiArgs. Pi's SDK treats --tools as a restrictive allowlist: only the listed tools pass through `_refreshToolRegistry()`, silently filtering out any user-installed extension tools registered via `pi.registerTool()`. Omitting --tools makes Pi's `allowedToolNames` undefined, so the `isAllowedTool()` filter becomes a no-op and all tools — built-in and extension — are available. This matches Pi's standalone behavior. Users who want to restrict tools can still pass --tools via custom_args (it is not in piBlockedArgs). Closes #2379
403 lines
11 KiB
Go
403 lines
11 KiB
Go
package agent
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
// piBackend implements Backend by spawning the Pi CLI in non-interactive
|
|
// JSON mode (`pi -p --mode json --session <path>`) and parsing its event
|
|
// stream on stdout.
|
|
type piBackend struct {
|
|
cfg Config
|
|
}
|
|
|
|
func (b *piBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
|
|
execPath := b.cfg.ExecutablePath
|
|
if execPath == "" {
|
|
execPath = "pi"
|
|
}
|
|
if _, err := exec.LookPath(execPath); err != nil {
|
|
return nil, fmt.Errorf("pi executable not found at %q: %w", execPath, err)
|
|
}
|
|
|
|
timeout := opts.Timeout
|
|
if timeout == 0 {
|
|
timeout = 20 * time.Minute
|
|
}
|
|
|
|
// Pi's --session flag expects a file path where events are appended.
|
|
// The path doubles as our opaque session identifier: we return it as
|
|
// SessionID and expect it back as ResumeSessionID on the next turn.
|
|
sessionPath := opts.ResumeSessionID
|
|
if sessionPath == "" {
|
|
p, err := newPiSessionPath()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pi session path: %w", err)
|
|
}
|
|
sessionPath = p
|
|
}
|
|
if err := ensurePiSessionFile(sessionPath); err != nil {
|
|
return nil, fmt.Errorf("pi session file: %w", err)
|
|
}
|
|
|
|
runCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
|
|
args := buildPiArgs(prompt, sessionPath, opts, b.cfg.Logger)
|
|
|
|
cmd := exec.CommandContext(runCtx, execPath, args...)
|
|
hideAgentWindow(cmd)
|
|
b.cfg.Logger.Info("agent command", "exec", execPath, "args", args)
|
|
cmd.WaitDelay = 10 * time.Second
|
|
if opts.Cwd != "" {
|
|
cmd.Dir = opts.Cwd
|
|
}
|
|
cmd.Env = buildEnv(b.cfg.Env)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("pi stdout pipe: %w", err)
|
|
}
|
|
cmd.Stderr = newLogWriter(b.cfg.Logger, "[pi:stderr] ")
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("start pi: %w", err)
|
|
}
|
|
|
|
b.cfg.Logger.Info("pi started", "pid", cmd.Process.Pid, "cwd", opts.Cwd, "model", opts.Model)
|
|
|
|
msgCh := make(chan Message, 256)
|
|
resCh := make(chan Result, 1)
|
|
|
|
// Close stdout when the context is cancelled so scanner.Scan() unblocks.
|
|
go func() {
|
|
<-runCtx.Done()
|
|
_ = stdout.Close()
|
|
}()
|
|
|
|
go func() {
|
|
defer cancel()
|
|
defer close(msgCh)
|
|
defer close(resCh)
|
|
|
|
startTime := time.Now()
|
|
var output strings.Builder
|
|
finalStatus := "completed"
|
|
var finalError string
|
|
usage := make(map[string]TokenUsage)
|
|
|
|
scanner := bufio.NewScanner(stdout)
|
|
// Pi message_update events can be large (they embed the full message
|
|
// partial on each delta), so give the scanner generous headroom.
|
|
scanner.Buffer(make([]byte, 0, 1024*1024), 32*1024*1024)
|
|
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if line == "" {
|
|
continue
|
|
}
|
|
var evt piStreamEvent
|
|
if err := json.Unmarshal([]byte(line), &evt); err != nil {
|
|
continue
|
|
}
|
|
|
|
switch evt.Type {
|
|
case "agent_start":
|
|
trySend(msgCh, Message{Type: MessageStatus, Status: "running"})
|
|
|
|
case "message_update":
|
|
if evt.AssistantMessageEvent == nil {
|
|
continue
|
|
}
|
|
switch evt.AssistantMessageEvent.Type {
|
|
case "text_delta":
|
|
if d := evt.AssistantMessageEvent.Delta; d != "" {
|
|
output.WriteString(d)
|
|
trySend(msgCh, Message{Type: MessageText, Content: d})
|
|
}
|
|
case "thinking_delta":
|
|
if d := evt.AssistantMessageEvent.Delta; d != "" {
|
|
trySend(msgCh, Message{Type: MessageThinking, Content: d})
|
|
}
|
|
}
|
|
|
|
case "tool_execution_start":
|
|
var params map[string]any
|
|
if len(evt.Args) > 0 {
|
|
_ = json.Unmarshal(evt.Args, ¶ms)
|
|
}
|
|
trySend(msgCh, Message{
|
|
Type: MessageToolUse,
|
|
Tool: evt.ToolName,
|
|
CallID: evt.ToolCallID,
|
|
Input: params,
|
|
})
|
|
|
|
case "tool_execution_end":
|
|
trySend(msgCh, Message{
|
|
Type: MessageToolResult,
|
|
CallID: evt.ToolCallID,
|
|
Output: decodePiResult(evt.Result),
|
|
})
|
|
|
|
case "turn_end":
|
|
if msg := decodePiMessage(evt.Message); msg != nil && msg.Usage != nil {
|
|
model := msg.Model
|
|
if model == "" {
|
|
model = opts.Model
|
|
}
|
|
if model == "" {
|
|
model = "unknown"
|
|
}
|
|
u := usage[model]
|
|
u.InputTokens += msg.Usage.Input
|
|
u.OutputTokens += msg.Usage.Output
|
|
u.CacheReadTokens += msg.Usage.CacheRead
|
|
u.CacheWriteTokens += msg.Usage.CacheWrite
|
|
usage[model] = u
|
|
}
|
|
|
|
case "error":
|
|
errText := decodePiString(evt.Message)
|
|
trySend(msgCh, Message{Type: MessageError, Content: errText})
|
|
if finalStatus == "completed" {
|
|
finalStatus = "failed"
|
|
finalError = errText
|
|
}
|
|
|
|
case "auto_retry_end":
|
|
if !evt.Success && finalStatus == "completed" {
|
|
finalStatus = "failed"
|
|
if evt.FinalError != "" {
|
|
finalError = evt.FinalError
|
|
} else {
|
|
finalError = "pi exhausted automatic retries"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
waitErr := cmd.Wait()
|
|
duration := time.Since(startTime)
|
|
|
|
if runCtx.Err() == context.DeadlineExceeded {
|
|
finalStatus = "timeout"
|
|
finalError = fmt.Sprintf("pi timed out after %s", timeout)
|
|
} else if runCtx.Err() == context.Canceled {
|
|
finalStatus = "aborted"
|
|
finalError = "execution cancelled"
|
|
} else if waitErr != nil && finalStatus == "completed" {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("pi exited with error: %v", waitErr)
|
|
}
|
|
|
|
b.cfg.Logger.Info("pi finished", "pid", cmd.Process.Pid, "status", finalStatus, "duration", duration.Round(time.Millisecond).String())
|
|
|
|
resCh <- Result{
|
|
Status: finalStatus,
|
|
Output: output.String(),
|
|
Error: finalError,
|
|
DurationMs: duration.Milliseconds(),
|
|
SessionID: sessionPath,
|
|
Usage: usage,
|
|
}
|
|
}()
|
|
|
|
return &Session{Messages: msgCh, Result: resCh}, nil
|
|
}
|
|
|
|
// ── Pi event types ──
|
|
|
|
// piStreamEvent is the union of fields we consume from Pi's JSON event
|
|
// stream. Fields that can be either string or object across event types
|
|
// (e.g. `message`, `result`) are held as json.RawMessage and decoded on
|
|
// demand by the switch arms.
|
|
type piStreamEvent struct {
|
|
Type string `json:"type"`
|
|
|
|
// message_update
|
|
AssistantMessageEvent *piAssistantMessageEvent `json:"assistantMessageEvent,omitempty"`
|
|
|
|
// tool_execution_start / tool_execution_end
|
|
ToolCallID string `json:"toolCallId,omitempty"`
|
|
ToolName string `json:"toolName,omitempty"`
|
|
Args json.RawMessage `json:"args,omitempty"`
|
|
Result json.RawMessage `json:"result,omitempty"`
|
|
IsError bool `json:"isError,omitempty"`
|
|
|
|
// error: Message is a string. turn_end: Message is an object.
|
|
Message json.RawMessage `json:"message,omitempty"`
|
|
|
|
// auto_retry_end
|
|
Success bool `json:"success,omitempty"`
|
|
FinalError string `json:"finalError,omitempty"`
|
|
}
|
|
|
|
type piAssistantMessageEvent struct {
|
|
Type string `json:"type"`
|
|
Delta string `json:"delta,omitempty"`
|
|
}
|
|
|
|
type piMessage struct {
|
|
Role string `json:"role,omitempty"`
|
|
Model string `json:"model,omitempty"`
|
|
Usage *piUsage `json:"usage,omitempty"`
|
|
}
|
|
|
|
type piUsage struct {
|
|
Input int64 `json:"input"`
|
|
Output int64 `json:"output"`
|
|
CacheRead int64 `json:"cacheRead"`
|
|
CacheWrite int64 `json:"cacheWrite"`
|
|
TotalTokens int64 `json:"totalTokens"`
|
|
}
|
|
|
|
func decodePiMessage(raw json.RawMessage) *piMessage {
|
|
if len(raw) == 0 {
|
|
return nil
|
|
}
|
|
var m piMessage
|
|
if err := json.Unmarshal(raw, &m); err != nil {
|
|
return nil
|
|
}
|
|
return &m
|
|
}
|
|
|
|
func decodePiString(raw json.RawMessage) string {
|
|
if len(raw) == 0 {
|
|
return ""
|
|
}
|
|
var s string
|
|
if err := json.Unmarshal(raw, &s); err == nil {
|
|
return s
|
|
}
|
|
return strings.Trim(string(raw), `"`)
|
|
}
|
|
|
|
func decodePiResult(raw json.RawMessage) string {
|
|
if len(raw) == 0 {
|
|
return ""
|
|
}
|
|
var s string
|
|
if err := json.Unmarshal(raw, &s); err == nil {
|
|
return s
|
|
}
|
|
return string(raw)
|
|
}
|
|
|
|
// ── Arg builder ──
|
|
|
|
// piBlockedArgs are flags hardcoded by the daemon that must not be
|
|
// overridden by user-configured custom_args. Overriding these would
|
|
// break the daemon↔Pi communication protocol.
|
|
var piBlockedArgs = map[string]blockedArgMode{
|
|
"-p": blockedStandalone, // non-interactive mode
|
|
"--print": blockedStandalone, // alias for -p
|
|
"--mode": blockedWithValue, // "json" event stream protocol
|
|
"--session": blockedWithValue, // daemon manages the session path
|
|
}
|
|
|
|
// buildPiArgs assembles the argv for a one-shot Pi invocation.
|
|
//
|
|
// Flags:
|
|
//
|
|
// -p non-interactive mode (prompt is positional)
|
|
// --mode json emit one JSON event per line on stdout
|
|
// --session <path> session log file (created upfront, reused on resume)
|
|
// --provider <name> provider, when Model is "provider/id"
|
|
// --model <id> model identifier
|
|
// --append-system-prompt <s> extra system instructions
|
|
//
|
|
// Custom args appended before the positional prompt. The prompt is a
|
|
// positional argument and must be last.
|
|
func buildPiArgs(prompt, sessionPath string, opts ExecOptions, logger *slog.Logger) []string {
|
|
args := []string{
|
|
"-p",
|
|
"--mode", "json",
|
|
}
|
|
if sessionPath != "" {
|
|
args = append(args, "--session", sessionPath)
|
|
}
|
|
if opts.Model != "" {
|
|
provider, model := splitPiModel(opts.Model)
|
|
if provider != "" {
|
|
args = append(args, "--provider", provider)
|
|
}
|
|
if model != "" {
|
|
args = append(args, "--model", model)
|
|
}
|
|
}
|
|
// Note: we intentionally do NOT pass --tools here. Omitting it lets
|
|
// Pi use its full tool registry, including user-installed extension
|
|
// tools. Passing --tools acts as a restrictive allowlist that
|
|
// silently filters out extension-registered tools (#2379).
|
|
// Users who want to restrict tools can do so via custom_args.
|
|
if opts.SystemPrompt != "" {
|
|
args = append(args, "--append-system-prompt", opts.SystemPrompt)
|
|
}
|
|
args = append(args, filterCustomArgs(opts.CustomArgs, piBlockedArgs, logger)...)
|
|
args = append(args, prompt)
|
|
return args
|
|
}
|
|
|
|
// splitPiModel parses a "provider/model" string into its parts. Plain
|
|
// "model" strings pass through as (provider="", model="model").
|
|
func splitPiModel(s string) (provider, model string) {
|
|
s = strings.TrimSpace(s)
|
|
if i := strings.Index(s, "/"); i >= 0 {
|
|
return strings.TrimSpace(s[:i]), strings.TrimSpace(s[i+1:])
|
|
}
|
|
return "", s
|
|
}
|
|
|
|
// ── Session path ──
|
|
|
|
// piSessionDir returns the directory where Pi session JSONL files live.
|
|
// Exported via a helper so the usage scanner (package usage) can point at
|
|
// the same location without duplicating the path construction.
|
|
func piSessionDir() (string, error) {
|
|
home, err := os.UserHomeDir()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return filepath.Join(home, ".multica", "pi-sessions"), nil
|
|
}
|
|
|
|
func newPiSessionPath() (string, error) {
|
|
dir, err := piSessionDir()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
name := fmt.Sprintf("%s.jsonl", time.Now().UTC().Format("20060102T150405.000000000"))
|
|
return filepath.Join(dir, name), nil
|
|
}
|
|
|
|
// ensurePiSessionFile creates an empty session file if one does not yet
|
|
// exist at path. Pi refuses to start when --session points at a missing
|
|
// file; paths that already exist (a resumed session) are left untouched.
|
|
func ensurePiSessionFile(path string) error {
|
|
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
|
|
return err
|
|
}
|
|
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0o644)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return f.Close()
|
|
}
|
|
|
|
// PiSessionDir exposes piSessionDir to other packages in this module.
|
|
func PiSessionDir() (string, error) {
|
|
return piSessionDir()
|
|
}
|