Files
multica/server/pkg/agent/kiro.go
Bohan Jiang 3708fb0f07 fix(daemon): inactivity-based agent run timeout, no wall-clock guillotine (MUL-3064)
Active long-running sessions are no longer killed by a fixed wall-clock deadline. Liveness is delegated to the idle watchdog (MULTICA_AGENT_IDLE_WATCHDOG, default 30m) with a larger in-flight-tool budget (MULTICA_AGENT_TOOL_WATCHDOG, default 2h). MULTICA_AGENT_TIMEOUT is an opt-in absolute cap (default 0 = no cap). The server-side 2.5h sweeper is unchanged as a coarse backstop.

Fixes #3745.
2026-06-05 15:06:07 +08:00

400 lines
12 KiB
Go

package agent
import (
"bufio"
"context"
"fmt"
"io"
"os/exec"
"strings"
"sync"
"sync/atomic"
"time"
)
// kiroBlockedArgs are flags hardcoded by the daemon that must not be
// overridden by user-configured custom_args. `acp` is the protocol subcommand,
// and --trust-all-tools covers Kiro's CLI-level tool gate while
// hermesClient handles ACP session/request_permission auto-approval. In Kiro
// CLI 2.1.1, `-a` is short for --trust-all-tools, not --agent; --agent remains
// allowed so users can select a custom Kiro agent.
var kiroBlockedArgs = map[string]blockedArgMode{
"acp": blockedStandalone,
"-a": blockedStandalone,
"--trust-all-tools": blockedStandalone,
"--trust-tools": blockedWithValue,
}
// kiroBackend implements Backend by spawning `kiro-cli acp` and communicating
// via the standard ACP JSON-RPC 2.0 transport over stdin/stdout.
//
// Kiro CLI advertises loadSession, returns models from session/new, and supports
// session/set_model, so the existing Hermes/Kimi ACP client can drive it with
// only provider-specific launch and tool-name normalization.
type kiroBackend struct {
cfg Config
}
func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
execPath := b.cfg.ExecutablePath
if execPath == "" {
execPath = "kiro-cli"
}
if _, err := exec.LookPath(execPath); err != nil {
return nil, fmt.Errorf("kiro executable not found at %q: %w", execPath, err)
}
// Translate the agent's mcp_config (Claude-style object of objects)
// into the array shape ACP `session/new` and `session/load` expect.
// Fail closed on malformed JSON so the launch surfaces the real error
// instead of silently dropping all MCP servers.
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
if err != nil {
return nil, fmt.Errorf("kiro: invalid mcp_config: %w", err)
}
timeout := opts.Timeout
runCtx, cancel := runContext(ctx, timeout)
kiroArgs := append([]string{"acp", "--trust-all-tools"}, filterCustomArgs(opts.CustomArgs, kiroBlockedArgs, b.cfg.Logger)...)
cmd := exec.CommandContext(runCtx, execPath, kiroArgs...)
hideAgentWindow(cmd)
b.cfg.Logger.Info("agent command", "exec", execPath, "args", kiroArgs)
if opts.Cwd != "" {
cmd.Dir = opts.Cwd
}
cmd.Env = buildEnv(b.cfg.Env)
stdout, err := cmd.StdoutPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("kiro stdout pipe: %w", err)
}
stdin, err := cmd.StdinPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("kiro stdin pipe: %w", err)
}
// StderrPipe + an explicit copier give us a join point
// (`stderrDone`) that fires before the failure-promotion
// decision; see the matching comment in hermes.go for why the
// io.MultiWriter form races with stopReason=end_turn under load.
providerErr := newACPProviderErrorSniffer("kiro")
stderr, err := cmd.StderrPipe()
if err != nil {
cancel()
return nil, fmt.Errorf("kiro stderr pipe: %w", err)
}
if err := cmd.Start(); err != nil {
cancel()
return nil, fmt.Errorf("start kiro: %w", err)
}
stderrSink := io.MultiWriter(newLogWriter(b.cfg.Logger, "[kiro:stderr] "), providerErr)
stderrDone := make(chan struct{})
go func() {
defer close(stderrDone)
_, _ = io.Copy(stderrSink, stderr)
}()
b.cfg.Logger.Info("kiro acp started", "pid", cmd.Process.Pid, "cwd", opts.Cwd)
msgCh := make(chan Message, 256)
resCh := make(chan Result, 1)
var outputMu sync.Mutex
var output strings.Builder
var streamingCurrentTurn atomic.Bool
promptDone := make(chan hermesPromptResult, 1)
c := &hermesClient{
cfg: b.cfg,
stdin: stdin,
pending: make(map[int]*pendingRPC),
pendingTools: make(map[string]*pendingToolCall),
acceptNotification: func(string) bool {
return streamingCurrentTurn.Load()
},
onMessage: func(msg Message) {
if !streamingCurrentTurn.Load() {
return
}
if msg.Type == MessageToolUse {
msg.Tool = kiroToolNameFromTitle(msg.Tool)
}
if msg.Type == MessageText {
outputMu.Lock()
output.WriteString(msg.Content)
outputMu.Unlock()
}
trySend(msgCh, msg)
},
onPromptDone: func(result hermesPromptResult) {
if !streamingCurrentTurn.Load() {
return
}
select {
case promptDone <- result:
default:
}
},
}
readerDone := make(chan struct{})
go func() {
defer close(readerDone)
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if line == "" {
continue
}
c.handleLine(line)
}
c.closeAllPending(fmt.Errorf("kiro process exited"))
}()
go func() {
defer cancel()
defer close(msgCh)
defer close(resCh)
defer func() {
stdin.Close()
_ = cmd.Wait()
}()
startTime := time.Now()
finalStatus := "completed"
var finalError string
var sessionID string
initResult, err := c.request(runCtx, "initialize", map[string]any{
"protocolVersion": 1,
"clientInfo": map[string]any{
"name": "multica-agent-sdk",
"version": "0.2.0",
},
"clientCapabilities": map[string]any{},
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("kiro initialize failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
// Drop MCP entries whose remote transport the runtime didn't
// advertise. See the matching comment in hermes.go for why
// unconditionally sending http/sse to a stdio-only ACP runtime
// tanks the whole session/new.
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "kiro", b.cfg.Logger)
cwd := opts.Cwd
if cwd == "" {
cwd = "."
}
if opts.ResumeSessionID != "" {
result, err := c.request(runCtx, "session/load", map[string]any{
"cwd": cwd,
"sessionId": opts.ResumeSessionID,
"mcpServers": mcpServers,
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("kiro session/load failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
// Apply the same defensive resolution kimi/hermes use: if
// kiro echoes a sessionId in the session/load response, prefer
// it (the canonical id the backend is committed to). When the
// response is empty or doesn't include sessionId — kiro's
// current observed shape — the helper falls back to the
// requested id, preserving today's behavior. Fixing this here
// too means a future kiro that DOES return a different id on
// silent state reset is handled the same way as hermes/kimi.
var changed bool
sessionID, changed = resolveResumedSessionID(opts.ResumeSessionID, result)
if changed {
b.cfg.Logger.Warn("agent returned a different session id on resume — original was likely lost; continuing with the new id",
"backend", "kiro",
"requested", opts.ResumeSessionID,
"actual", sessionID,
)
}
} else {
result, err := c.request(runCtx, "session/new", map[string]any{
"cwd": cwd,
"mcpServers": mcpServers,
})
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("kiro session/new failed: %v", err)
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
sessionID = extractACPSessionID(result)
if sessionID == "" {
finalStatus = "failed"
finalError = "kiro session/new returned no session ID"
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
return
}
}
c.sessionID = sessionID
b.cfg.Logger.Info("kiro session created", "session_id", sessionID)
if opts.Model != "" {
if _, err := c.request(runCtx, "session/set_model", map[string]any{
"sessionId": sessionID,
"modelId": opts.Model,
}); err != nil {
b.cfg.Logger.Warn("kiro set_session_model failed", "error", err, "requested_model", opts.Model)
finalStatus = "failed"
finalError = fmt.Sprintf("kiro could not switch to model %q: %v", opts.Model, err)
resCh <- Result{
Status: finalStatus,
Error: finalError,
DurationMs: time.Since(startTime).Milliseconds(),
SessionID: sessionID,
}
return
}
b.cfg.Logger.Info("kiro session model set", "model", opts.Model)
}
userText := prompt
if opts.SystemPrompt != "" {
userText = opts.SystemPrompt + "\n\n---\n\n" + prompt
}
promptBlocks := []map[string]any{
{"type": "text", "text": userText},
}
// Kiro's published docs use `content`, while Kiro CLI 2.1.1 still
// requires the standard ACP `prompt` field. Send both so either wire
// shape can drive the turn.
// TODO: drop one field once Kiro lands on a single canonical payload.
streamingCurrentTurn.Store(true)
_, err = c.request(runCtx, "session/prompt", map[string]any{
"sessionId": sessionID,
"content": promptBlocks,
"prompt": promptBlocks,
})
if err != nil {
if runCtx.Err() == context.DeadlineExceeded {
finalStatus = "timeout"
finalError = fmt.Sprintf("kiro timed out after %s", timeout)
} else if runCtx.Err() == context.Canceled {
finalStatus = "aborted"
finalError = "execution cancelled"
} else {
finalStatus = "failed"
finalError = fmt.Sprintf("kiro session/prompt failed: %v", err)
}
} else {
select {
case pr := <-promptDone:
if pr.stopReason == "cancelled" {
finalStatus = "aborted"
finalError = "kiro cancelled the prompt"
}
c.usageMu.Lock()
c.usage.InputTokens += pr.usage.InputTokens
c.usage.OutputTokens += pr.usage.OutputTokens
c.usageMu.Unlock()
default:
}
}
duration := time.Since(startTime)
b.cfg.Logger.Info("kiro finished", "pid", cmd.Process.Pid, "status", finalStatus, "duration", duration.Round(time.Millisecond).String())
stdin.Close()
cancel()
<-readerDone
// Ensure the stderr copier has drained before consulting the
// provider-error sniffer; see hermes.go for the failure mode.
<-stderrDone
outputMu.Lock()
finalOutput := output.String()
outputMu.Unlock()
// Promote completed→failed when stderr or the agent text
// stream show a terminal upstream-LLM failure (HTTP 4xx /
// rate-limit / expired token). See the helper docs for the
// full signal set; the key safety property is that transient
// per-attempt warnings followed by a successful retry stay
// "completed".
finalStatus, finalError = promoteACPResultOnProviderError(finalStatus, finalError, finalOutput, providerErr)
c.usageMu.Lock()
u := c.usage
c.usageMu.Unlock()
var usageMap map[string]TokenUsage
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 {
model := opts.Model
if model == "" {
model = "unknown"
}
usageMap = map[string]TokenUsage{model: u}
}
resCh <- Result{
Status: finalStatus,
Output: finalOutput,
Error: finalError,
DurationMs: duration.Milliseconds(),
SessionID: sessionID,
Usage: usageMap,
}
}()
return &Session{Messages: msgCh, Result: resCh}, nil
}
func kiroToolNameFromTitle(title string) string {
t := strings.TrimSpace(title)
if t == "" {
return ""
}
if idx := strings.Index(t, ":"); idx > 0 {
t = strings.TrimSpace(t[:idx])
}
lower := strings.ToLower(t)
switch lower {
case "read", "read file":
return "read_file"
case "write", "write file":
return "write_file"
case "edit", "patch":
return "edit_file"
case "shell", "bash", "terminal", "run command", "run shell command":
return "terminal"
case "grep", "search", "find":
return "search_files"
case "glob":
return "glob"
case "code":
return "code"
case "web search":
return "web_search"
case "fetch", "web fetch":
return "web_fetch"
case "todo", "todo write", "todo list", "todo_list":
return "todo_write"
}
return strings.ReplaceAll(lower, " ", "_")
}