mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
Active long-running sessions are no longer killed by a fixed wall-clock deadline. Liveness is delegated to the idle watchdog (MULTICA_AGENT_IDLE_WATCHDOG, default 30m) with a larger in-flight-tool budget (MULTICA_AGENT_TOOL_WATCHDOG, default 2h). MULTICA_AGENT_TIMEOUT is an opt-in absolute cap (default 0 = no cap). The server-side 2.5h sweeper is unchanged as a coarse backstop. Fixes #3745.
421 lines
13 KiB
Go
421 lines
13 KiB
Go
package agent
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// kimiBlockedArgs are flags hardcoded by the daemon that must not be
|
|
// overridden by user-configured custom_args. `acp` is the protocol
|
|
// subcommand that drives the ACP JSON-RPC transport for Kimi Code CLI;
|
|
// overriding it would break the daemon↔Kimi communication contract.
|
|
var kimiBlockedArgs = map[string]blockedArgMode{
|
|
"acp": blockedStandalone,
|
|
}
|
|
|
|
// kimiBackend implements Backend by spawning `kimi acp` and communicating
|
|
// via the ACP (Agent Client Protocol) JSON-RPC 2.0 over stdin/stdout.
|
|
//
|
|
// Kimi Code CLI (https://github.com/MoonshotAI/kimi-cli) supports ACP out of
|
|
// the box via the `kimi acp` subcommand. We reuse the existing hermesClient
|
|
// ACP transport since both runtimes speak the same protocol — only the
|
|
// binary, env, and tool-name extraction differ.
|
|
type kimiBackend struct {
|
|
cfg Config
|
|
}
|
|
|
|
func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
|
|
execPath := b.cfg.ExecutablePath
|
|
if execPath == "" {
|
|
execPath = "kimi"
|
|
}
|
|
if _, err := exec.LookPath(execPath); err != nil {
|
|
return nil, fmt.Errorf("kimi executable not found at %q: %w", execPath, err)
|
|
}
|
|
|
|
// Translate the agent's mcp_config (Claude-style object of objects)
|
|
// into the array shape ACP `session/new` expects. Fail closed on
|
|
// malformed JSON so the launch surfaces the real error instead of
|
|
// silently dropping all MCP servers.
|
|
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kimi: invalid mcp_config: %w", err)
|
|
}
|
|
|
|
timeout := opts.Timeout
|
|
runCtx, cancel := runContext(ctx, timeout)
|
|
|
|
// `kimi acp` ignores --yolo / --auto-approve (they're flags on the
|
|
// root `kimi` command, not on the `acp` subcommand). Instead, the
|
|
// daemon auto-approves in hermesClient.handleAgentRequest by replying
|
|
// "approve_for_session" to every session/request_permission request.
|
|
kimiArgs := append([]string{"acp"}, filterCustomArgs(opts.CustomArgs, kimiBlockedArgs, b.cfg.Logger)...)
|
|
cmd := exec.CommandContext(runCtx, execPath, kimiArgs...)
|
|
hideAgentWindow(cmd)
|
|
b.cfg.Logger.Info("agent command", "exec", execPath, "args", kimiArgs)
|
|
if opts.Cwd != "" {
|
|
cmd.Dir = opts.Cwd
|
|
}
|
|
cmd.Env = buildEnv(b.cfg.Env)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("kimi stdout pipe: %w", err)
|
|
}
|
|
stdin, err := cmd.StdinPipe()
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("kimi stdin pipe: %w", err)
|
|
}
|
|
// Forward stderr to the daemon log *and* sniff provider-level
|
|
// errors out of it so we can surface them in the task result.
|
|
// Kimi's session/prompt still reports stopReason=end_turn when
|
|
// the underlying HTTP call to api.kimi.com returns 4xx/5xx, so
|
|
// without this the daemon reports a misleading "empty output"
|
|
// and the actionable error (expired token, rate limit, upstream
|
|
// 5xx, …) stays buried in the daemon log.
|
|
//
|
|
// StderrPipe + an explicit copier give us a join point
|
|
// (`stderrDone`) that fires before the failure-promotion
|
|
// decision; see the matching comment in hermes.go for why the
|
|
// io.MultiWriter form races with stopReason=end_turn under load.
|
|
providerErr := newACPProviderErrorSniffer("kimi")
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("kimi stderr pipe: %w", err)
|
|
}
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("start kimi: %w", err)
|
|
}
|
|
|
|
stderrSink := io.MultiWriter(newLogWriter(b.cfg.Logger, "[kimi:stderr] "), providerErr)
|
|
stderrDone := make(chan struct{})
|
|
go func() {
|
|
defer close(stderrDone)
|
|
_, _ = io.Copy(stderrSink, stderr)
|
|
}()
|
|
|
|
b.cfg.Logger.Info("kimi acp started", "pid", cmd.Process.Pid, "cwd", opts.Cwd)
|
|
|
|
msgCh := make(chan Message, 256)
|
|
resCh := make(chan Result, 1)
|
|
|
|
var outputMu sync.Mutex
|
|
var output strings.Builder
|
|
|
|
promptDone := make(chan hermesPromptResult, 1)
|
|
|
|
// Reuse the hermesClient ACP transport — Kimi speaks the same protocol.
|
|
c := &hermesClient{
|
|
cfg: b.cfg,
|
|
stdin: stdin,
|
|
pending: make(map[int]*pendingRPC),
|
|
pendingTools: make(map[string]*pendingToolCall),
|
|
onMessage: func(msg Message) {
|
|
// hermesClient.handleToolCallStart has already mapped
|
|
// the raw ACP title via hermesToolNameFromTitle — which
|
|
// covers lowercase hermes-style titles ("read:", "patch
|
|
// (replace)", …) but not capitalised kimi-style ones
|
|
// ("Read file: …", "Run command: …"). Re-normalise so
|
|
// the UI sees consistent snake_case identifiers across
|
|
// both backends. No-op when the name is already normal
|
|
// form (e.g. already mapped to "read_file").
|
|
if msg.Type == MessageToolUse {
|
|
msg.Tool = kimiToolNameFromTitle(msg.Tool)
|
|
}
|
|
if msg.Type == MessageText {
|
|
outputMu.Lock()
|
|
output.WriteString(msg.Content)
|
|
outputMu.Unlock()
|
|
}
|
|
trySend(msgCh, msg)
|
|
},
|
|
onPromptDone: func(result hermesPromptResult) {
|
|
select {
|
|
case promptDone <- result:
|
|
default:
|
|
}
|
|
},
|
|
}
|
|
|
|
// Start reading stdout in background.
|
|
readerDone := make(chan struct{})
|
|
go func() {
|
|
defer close(readerDone)
|
|
scanner := bufio.NewScanner(stdout)
|
|
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if line == "" {
|
|
continue
|
|
}
|
|
c.handleLine(line)
|
|
}
|
|
c.closeAllPending(fmt.Errorf("kimi process exited"))
|
|
}()
|
|
|
|
// Drive the ACP session lifecycle in a goroutine.
|
|
go func() {
|
|
defer cancel()
|
|
defer close(msgCh)
|
|
defer close(resCh)
|
|
defer func() {
|
|
stdin.Close()
|
|
_ = cmd.Wait()
|
|
}()
|
|
|
|
startTime := time.Now()
|
|
finalStatus := "completed"
|
|
var finalError string
|
|
var sessionID string
|
|
|
|
// 1. Initialize handshake.
|
|
initResult, err := c.request(runCtx, "initialize", map[string]any{
|
|
"protocolVersion": 1,
|
|
"clientInfo": map[string]any{
|
|
"name": "multica-agent-sdk",
|
|
"version": "0.2.0",
|
|
},
|
|
"clientCapabilities": map[string]any{},
|
|
})
|
|
if err != nil {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kimi initialize failed: %v", err)
|
|
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
|
|
return
|
|
}
|
|
|
|
// Drop MCP entries whose remote transport the runtime didn't
|
|
// advertise. See the matching comment in hermes.go for the why —
|
|
// shipping an http/sse entry to a stdio-only runtime tanks the
|
|
// whole session/new.
|
|
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "kimi", b.cfg.Logger)
|
|
|
|
// 2. Create or resume a session.
|
|
cwd := opts.Cwd
|
|
if cwd == "" {
|
|
cwd = "."
|
|
}
|
|
|
|
if opts.ResumeSessionID != "" {
|
|
// Per ACP Session Setup, session/resume accepts mcpServers and
|
|
// the runtime re-connects them as part of the resume. Without
|
|
// this, a resumed Kimi task lost access to MCP tools that a
|
|
// fresh task on the same agent would have.
|
|
result, err := c.request(runCtx, "session/resume", map[string]any{
|
|
"cwd": cwd,
|
|
"sessionId": opts.ResumeSessionID,
|
|
"mcpServers": mcpServers,
|
|
})
|
|
if err != nil {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kimi session/resume failed: %v", err)
|
|
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
|
|
return
|
|
}
|
|
var changed bool
|
|
sessionID, changed = resolveResumedSessionID(opts.ResumeSessionID, result)
|
|
if changed {
|
|
b.cfg.Logger.Warn("agent returned a different session id on resume — original was likely lost; continuing with the new id",
|
|
"backend", "kimi",
|
|
"requested", opts.ResumeSessionID,
|
|
"actual", sessionID,
|
|
)
|
|
}
|
|
} else {
|
|
result, err := c.request(runCtx, "session/new", map[string]any{
|
|
"cwd": cwd,
|
|
"mcpServers": mcpServers,
|
|
})
|
|
if err != nil {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kimi session/new failed: %v", err)
|
|
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
|
|
return
|
|
}
|
|
sessionID = extractACPSessionID(result)
|
|
if sessionID == "" {
|
|
finalStatus = "failed"
|
|
finalError = "kimi session/new returned no session ID"
|
|
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
|
|
return
|
|
}
|
|
}
|
|
|
|
c.sessionID = sessionID
|
|
b.cfg.Logger.Info("kimi session created", "session_id", sessionID)
|
|
|
|
// 3. If the caller picked a model (via agent.model from the
|
|
// UI dropdown), ask kimi to switch the session to it before
|
|
// we send any prompt. Kimi's ACP server exposes
|
|
// `session/set_model` and advertises available models via
|
|
// the `models.availableModels` block returned by
|
|
// `session/new` — we pass the chosen modelId through
|
|
// verbatim. This MUST fail the task on error: silently
|
|
// falling back to kimi's default model would let the user
|
|
// believe their pick was honoured while the task actually
|
|
// ran on something else.
|
|
if opts.Model != "" {
|
|
if _, err := c.request(runCtx, "session/set_model", map[string]any{
|
|
"sessionId": sessionID,
|
|
"modelId": opts.Model,
|
|
}); err != nil {
|
|
b.cfg.Logger.Warn("kimi set_session_model failed", "error", err, "requested_model", opts.Model)
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kimi could not switch to model %q: %v", opts.Model, err)
|
|
resCh <- Result{
|
|
Status: finalStatus,
|
|
Error: finalError,
|
|
DurationMs: time.Since(startTime).Milliseconds(),
|
|
SessionID: sessionID,
|
|
}
|
|
return
|
|
}
|
|
b.cfg.Logger.Info("kimi session model set", "model", opts.Model)
|
|
}
|
|
|
|
// 4. Build the prompt content. If we have a system prompt, prepend it.
|
|
userText := prompt
|
|
if opts.SystemPrompt != "" {
|
|
userText = opts.SystemPrompt + "\n\n---\n\n" + prompt
|
|
}
|
|
|
|
// 5. Send the prompt and wait for PromptResponse.
|
|
_, err = c.request(runCtx, "session/prompt", map[string]any{
|
|
"sessionId": sessionID,
|
|
"prompt": []map[string]any{
|
|
{"type": "text", "text": userText},
|
|
},
|
|
})
|
|
if err != nil {
|
|
if runCtx.Err() == context.DeadlineExceeded {
|
|
finalStatus = "timeout"
|
|
finalError = fmt.Sprintf("kimi timed out after %s", timeout)
|
|
} else if runCtx.Err() == context.Canceled {
|
|
finalStatus = "aborted"
|
|
finalError = "execution cancelled"
|
|
} else {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kimi session/prompt failed: %v", err)
|
|
}
|
|
} else {
|
|
select {
|
|
case pr := <-promptDone:
|
|
if pr.stopReason == "cancelled" {
|
|
finalStatus = "aborted"
|
|
finalError = "kimi cancelled the prompt"
|
|
}
|
|
c.usageMu.Lock()
|
|
c.usage.InputTokens += pr.usage.InputTokens
|
|
c.usage.OutputTokens += pr.usage.OutputTokens
|
|
c.usageMu.Unlock()
|
|
default:
|
|
}
|
|
}
|
|
|
|
duration := time.Since(startTime)
|
|
b.cfg.Logger.Info("kimi finished", "pid", cmd.Process.Pid, "status", finalStatus, "duration", duration.Round(time.Millisecond).String())
|
|
|
|
stdin.Close()
|
|
cancel()
|
|
|
|
<-readerDone
|
|
// Ensure the stderr copier has drained before consulting the
|
|
// provider-error sniffer; see hermes.go for the failure mode.
|
|
<-stderrDone
|
|
|
|
outputMu.Lock()
|
|
finalOutput := output.String()
|
|
outputMu.Unlock()
|
|
|
|
// Promote completed→failed when stderr or the agent text
|
|
// stream show a terminal upstream-LLM failure (HTTP 4xx /
|
|
// rate-limit / expired token). See the helper docs for the
|
|
// full signal set; the key safety property is that transient
|
|
// per-attempt warnings followed by a successful retry stay
|
|
// "completed".
|
|
finalStatus, finalError = promoteACPResultOnProviderError(finalStatus, finalError, finalOutput, providerErr)
|
|
|
|
c.usageMu.Lock()
|
|
u := c.usage
|
|
c.usageMu.Unlock()
|
|
|
|
var usageMap map[string]TokenUsage
|
|
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 {
|
|
model := opts.Model
|
|
if model == "" {
|
|
model = "unknown"
|
|
}
|
|
usageMap = map[string]TokenUsage{model: u}
|
|
}
|
|
|
|
resCh <- Result{
|
|
Status: finalStatus,
|
|
Output: finalOutput,
|
|
Error: finalError,
|
|
DurationMs: duration.Milliseconds(),
|
|
SessionID: sessionID,
|
|
Usage: usageMap,
|
|
}
|
|
}()
|
|
|
|
return &Session{Messages: msgCh, Result: resCh}, nil
|
|
}
|
|
|
|
// kimiToolNameFromTitle normalises tool names emitted by Kimi's ACP
|
|
// server into the snake_case identifiers the Multica UI expects.
|
|
//
|
|
// Kimi follows the ACP spec where `title` is a short human-readable
|
|
// label such as "Read file: /path/to/foo.go" or "Run command: ls".
|
|
// hermesToolNameFromTitle upstream handles hermes' lowercase
|
|
// convention ("read:", "patch (replace)") but not kimi's capitalised
|
|
// format — so we get called on the already-mapped name from hermes
|
|
// and fix up anything that slipped through. Empty input returns "".
|
|
func kimiToolNameFromTitle(title string) string {
|
|
t := strings.TrimSpace(title)
|
|
if t == "" {
|
|
return ""
|
|
}
|
|
|
|
// Strip everything after the first colon — ACP titles often look like
|
|
// "Tool Name: argument detail" and we want only the tool name.
|
|
if idx := strings.Index(t, ":"); idx > 0 {
|
|
t = strings.TrimSpace(t[:idx])
|
|
}
|
|
|
|
lower := strings.ToLower(t)
|
|
switch lower {
|
|
case "read", "read file":
|
|
return "read_file"
|
|
case "write", "write file":
|
|
return "write_file"
|
|
case "edit", "patch":
|
|
return "edit_file"
|
|
case "shell", "bash", "terminal", "run command", "run shell command":
|
|
return "terminal"
|
|
case "search", "grep", "find":
|
|
return "search_files"
|
|
case "glob":
|
|
return "glob"
|
|
case "web search":
|
|
return "web_search"
|
|
case "fetch", "web fetch":
|
|
return "web_fetch"
|
|
case "todo", "todo write":
|
|
return "todo_write"
|
|
}
|
|
|
|
// Fallback: snake_case the title so the UI gets a stable identifier.
|
|
return strings.ReplaceAll(lower, " ", "_")
|
|
}
|