mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
Active long-running sessions are no longer killed by a fixed wall-clock deadline. Liveness is delegated to the idle watchdog (MULTICA_AGENT_IDLE_WATCHDOG, default 30m) with a larger in-flight-tool budget (MULTICA_AGENT_TOOL_WATCHDOG, default 2h). MULTICA_AGENT_TIMEOUT is an opt-in absolute cap (default 0 = no cap). The server-side 2.5h sweeper is unchanged as a coarse backstop. Fixes #3745.
400 lines
12 KiB
Go
400 lines
12 KiB
Go
package agent
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os/exec"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// kiroBlockedArgs are flags hardcoded by the daemon that must not be
|
|
// overridden by user-configured custom_args. `acp` is the protocol subcommand,
|
|
// and --trust-all-tools covers Kiro's CLI-level tool gate while
|
|
// hermesClient handles ACP session/request_permission auto-approval. In Kiro
|
|
// CLI 2.1.1, `-a` is short for --trust-all-tools, not --agent; --agent remains
|
|
// allowed so users can select a custom Kiro agent.
|
|
var kiroBlockedArgs = map[string]blockedArgMode{
|
|
"acp": blockedStandalone,
|
|
"-a": blockedStandalone,
|
|
"--trust-all-tools": blockedStandalone,
|
|
"--trust-tools": blockedWithValue,
|
|
}
|
|
|
|
// kiroBackend implements Backend by spawning `kiro-cli acp` and communicating
|
|
// via the standard ACP JSON-RPC 2.0 transport over stdin/stdout.
|
|
//
|
|
// Kiro CLI advertises loadSession, returns models from session/new, and supports
|
|
// session/set_model, so the existing Hermes/Kimi ACP client can drive it with
|
|
// only provider-specific launch and tool-name normalization.
|
|
type kiroBackend struct {
|
|
cfg Config
|
|
}
|
|
|
|
func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptions) (*Session, error) {
|
|
execPath := b.cfg.ExecutablePath
|
|
if execPath == "" {
|
|
execPath = "kiro-cli"
|
|
}
|
|
if _, err := exec.LookPath(execPath); err != nil {
|
|
return nil, fmt.Errorf("kiro executable not found at %q: %w", execPath, err)
|
|
}
|
|
|
|
// Translate the agent's mcp_config (Claude-style object of objects)
|
|
// into the array shape ACP `session/new` and `session/load` expect.
|
|
// Fail closed on malformed JSON so the launch surfaces the real error
|
|
// instead of silently dropping all MCP servers.
|
|
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kiro: invalid mcp_config: %w", err)
|
|
}
|
|
|
|
timeout := opts.Timeout
|
|
runCtx, cancel := runContext(ctx, timeout)
|
|
|
|
kiroArgs := append([]string{"acp", "--trust-all-tools"}, filterCustomArgs(opts.CustomArgs, kiroBlockedArgs, b.cfg.Logger)...)
|
|
cmd := exec.CommandContext(runCtx, execPath, kiroArgs...)
|
|
hideAgentWindow(cmd)
|
|
b.cfg.Logger.Info("agent command", "exec", execPath, "args", kiroArgs)
|
|
if opts.Cwd != "" {
|
|
cmd.Dir = opts.Cwd
|
|
}
|
|
cmd.Env = buildEnv(b.cfg.Env)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("kiro stdout pipe: %w", err)
|
|
}
|
|
stdin, err := cmd.StdinPipe()
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("kiro stdin pipe: %w", err)
|
|
}
|
|
// StderrPipe + an explicit copier give us a join point
|
|
// (`stderrDone`) that fires before the failure-promotion
|
|
// decision; see the matching comment in hermes.go for why the
|
|
// io.MultiWriter form races with stopReason=end_turn under load.
|
|
providerErr := newACPProviderErrorSniffer("kiro")
|
|
stderr, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("kiro stderr pipe: %w", err)
|
|
}
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("start kiro: %w", err)
|
|
}
|
|
|
|
stderrSink := io.MultiWriter(newLogWriter(b.cfg.Logger, "[kiro:stderr] "), providerErr)
|
|
stderrDone := make(chan struct{})
|
|
go func() {
|
|
defer close(stderrDone)
|
|
_, _ = io.Copy(stderrSink, stderr)
|
|
}()
|
|
|
|
b.cfg.Logger.Info("kiro acp started", "pid", cmd.Process.Pid, "cwd", opts.Cwd)
|
|
|
|
msgCh := make(chan Message, 256)
|
|
resCh := make(chan Result, 1)
|
|
|
|
var outputMu sync.Mutex
|
|
var output strings.Builder
|
|
var streamingCurrentTurn atomic.Bool
|
|
|
|
promptDone := make(chan hermesPromptResult, 1)
|
|
|
|
c := &hermesClient{
|
|
cfg: b.cfg,
|
|
stdin: stdin,
|
|
pending: make(map[int]*pendingRPC),
|
|
pendingTools: make(map[string]*pendingToolCall),
|
|
acceptNotification: func(string) bool {
|
|
return streamingCurrentTurn.Load()
|
|
},
|
|
onMessage: func(msg Message) {
|
|
if !streamingCurrentTurn.Load() {
|
|
return
|
|
}
|
|
if msg.Type == MessageToolUse {
|
|
msg.Tool = kiroToolNameFromTitle(msg.Tool)
|
|
}
|
|
if msg.Type == MessageText {
|
|
outputMu.Lock()
|
|
output.WriteString(msg.Content)
|
|
outputMu.Unlock()
|
|
}
|
|
trySend(msgCh, msg)
|
|
},
|
|
onPromptDone: func(result hermesPromptResult) {
|
|
if !streamingCurrentTurn.Load() {
|
|
return
|
|
}
|
|
select {
|
|
case promptDone <- result:
|
|
default:
|
|
}
|
|
},
|
|
}
|
|
|
|
readerDone := make(chan struct{})
|
|
go func() {
|
|
defer close(readerDone)
|
|
scanner := bufio.NewScanner(stdout)
|
|
scanner.Buffer(make([]byte, 0, 1024*1024), 10*1024*1024)
|
|
for scanner.Scan() {
|
|
line := strings.TrimSpace(scanner.Text())
|
|
if line == "" {
|
|
continue
|
|
}
|
|
c.handleLine(line)
|
|
}
|
|
c.closeAllPending(fmt.Errorf("kiro process exited"))
|
|
}()
|
|
|
|
go func() {
|
|
defer cancel()
|
|
defer close(msgCh)
|
|
defer close(resCh)
|
|
defer func() {
|
|
stdin.Close()
|
|
_ = cmd.Wait()
|
|
}()
|
|
|
|
startTime := time.Now()
|
|
finalStatus := "completed"
|
|
var finalError string
|
|
var sessionID string
|
|
|
|
initResult, err := c.request(runCtx, "initialize", map[string]any{
|
|
"protocolVersion": 1,
|
|
"clientInfo": map[string]any{
|
|
"name": "multica-agent-sdk",
|
|
"version": "0.2.0",
|
|
},
|
|
"clientCapabilities": map[string]any{},
|
|
})
|
|
if err != nil {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kiro initialize failed: %v", err)
|
|
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
|
|
return
|
|
}
|
|
|
|
// Drop MCP entries whose remote transport the runtime didn't
|
|
// advertise. See the matching comment in hermes.go for why
|
|
// unconditionally sending http/sse to a stdio-only ACP runtime
|
|
// tanks the whole session/new.
|
|
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "kiro", b.cfg.Logger)
|
|
|
|
cwd := opts.Cwd
|
|
if cwd == "" {
|
|
cwd = "."
|
|
}
|
|
|
|
if opts.ResumeSessionID != "" {
|
|
result, err := c.request(runCtx, "session/load", map[string]any{
|
|
"cwd": cwd,
|
|
"sessionId": opts.ResumeSessionID,
|
|
"mcpServers": mcpServers,
|
|
})
|
|
if err != nil {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kiro session/load failed: %v", err)
|
|
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
|
|
return
|
|
}
|
|
// Apply the same defensive resolution kimi/hermes use: if
|
|
// kiro echoes a sessionId in the session/load response, prefer
|
|
// it (the canonical id the backend is committed to). When the
|
|
// response is empty or doesn't include sessionId — kiro's
|
|
// current observed shape — the helper falls back to the
|
|
// requested id, preserving today's behavior. Fixing this here
|
|
// too means a future kiro that DOES return a different id on
|
|
// silent state reset is handled the same way as hermes/kimi.
|
|
var changed bool
|
|
sessionID, changed = resolveResumedSessionID(opts.ResumeSessionID, result)
|
|
if changed {
|
|
b.cfg.Logger.Warn("agent returned a different session id on resume — original was likely lost; continuing with the new id",
|
|
"backend", "kiro",
|
|
"requested", opts.ResumeSessionID,
|
|
"actual", sessionID,
|
|
)
|
|
}
|
|
} else {
|
|
result, err := c.request(runCtx, "session/new", map[string]any{
|
|
"cwd": cwd,
|
|
"mcpServers": mcpServers,
|
|
})
|
|
if err != nil {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kiro session/new failed: %v", err)
|
|
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
|
|
return
|
|
}
|
|
sessionID = extractACPSessionID(result)
|
|
if sessionID == "" {
|
|
finalStatus = "failed"
|
|
finalError = "kiro session/new returned no session ID"
|
|
resCh <- Result{Status: finalStatus, Error: finalError, DurationMs: time.Since(startTime).Milliseconds()}
|
|
return
|
|
}
|
|
}
|
|
|
|
c.sessionID = sessionID
|
|
b.cfg.Logger.Info("kiro session created", "session_id", sessionID)
|
|
|
|
if opts.Model != "" {
|
|
if _, err := c.request(runCtx, "session/set_model", map[string]any{
|
|
"sessionId": sessionID,
|
|
"modelId": opts.Model,
|
|
}); err != nil {
|
|
b.cfg.Logger.Warn("kiro set_session_model failed", "error", err, "requested_model", opts.Model)
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kiro could not switch to model %q: %v", opts.Model, err)
|
|
resCh <- Result{
|
|
Status: finalStatus,
|
|
Error: finalError,
|
|
DurationMs: time.Since(startTime).Milliseconds(),
|
|
SessionID: sessionID,
|
|
}
|
|
return
|
|
}
|
|
b.cfg.Logger.Info("kiro session model set", "model", opts.Model)
|
|
}
|
|
|
|
userText := prompt
|
|
if opts.SystemPrompt != "" {
|
|
userText = opts.SystemPrompt + "\n\n---\n\n" + prompt
|
|
}
|
|
|
|
promptBlocks := []map[string]any{
|
|
{"type": "text", "text": userText},
|
|
}
|
|
// Kiro's published docs use `content`, while Kiro CLI 2.1.1 still
|
|
// requires the standard ACP `prompt` field. Send both so either wire
|
|
// shape can drive the turn.
|
|
// TODO: drop one field once Kiro lands on a single canonical payload.
|
|
streamingCurrentTurn.Store(true)
|
|
_, err = c.request(runCtx, "session/prompt", map[string]any{
|
|
"sessionId": sessionID,
|
|
"content": promptBlocks,
|
|
"prompt": promptBlocks,
|
|
})
|
|
if err != nil {
|
|
if runCtx.Err() == context.DeadlineExceeded {
|
|
finalStatus = "timeout"
|
|
finalError = fmt.Sprintf("kiro timed out after %s", timeout)
|
|
} else if runCtx.Err() == context.Canceled {
|
|
finalStatus = "aborted"
|
|
finalError = "execution cancelled"
|
|
} else {
|
|
finalStatus = "failed"
|
|
finalError = fmt.Sprintf("kiro session/prompt failed: %v", err)
|
|
}
|
|
} else {
|
|
select {
|
|
case pr := <-promptDone:
|
|
if pr.stopReason == "cancelled" {
|
|
finalStatus = "aborted"
|
|
finalError = "kiro cancelled the prompt"
|
|
}
|
|
c.usageMu.Lock()
|
|
c.usage.InputTokens += pr.usage.InputTokens
|
|
c.usage.OutputTokens += pr.usage.OutputTokens
|
|
c.usageMu.Unlock()
|
|
default:
|
|
}
|
|
}
|
|
|
|
duration := time.Since(startTime)
|
|
b.cfg.Logger.Info("kiro finished", "pid", cmd.Process.Pid, "status", finalStatus, "duration", duration.Round(time.Millisecond).String())
|
|
|
|
stdin.Close()
|
|
cancel()
|
|
|
|
<-readerDone
|
|
// Ensure the stderr copier has drained before consulting the
|
|
// provider-error sniffer; see hermes.go for the failure mode.
|
|
<-stderrDone
|
|
|
|
outputMu.Lock()
|
|
finalOutput := output.String()
|
|
outputMu.Unlock()
|
|
|
|
// Promote completed→failed when stderr or the agent text
|
|
// stream show a terminal upstream-LLM failure (HTTP 4xx /
|
|
// rate-limit / expired token). See the helper docs for the
|
|
// full signal set; the key safety property is that transient
|
|
// per-attempt warnings followed by a successful retry stay
|
|
// "completed".
|
|
finalStatus, finalError = promoteACPResultOnProviderError(finalStatus, finalError, finalOutput, providerErr)
|
|
|
|
c.usageMu.Lock()
|
|
u := c.usage
|
|
c.usageMu.Unlock()
|
|
|
|
var usageMap map[string]TokenUsage
|
|
if u.InputTokens > 0 || u.OutputTokens > 0 || u.CacheReadTokens > 0 {
|
|
model := opts.Model
|
|
if model == "" {
|
|
model = "unknown"
|
|
}
|
|
usageMap = map[string]TokenUsage{model: u}
|
|
}
|
|
|
|
resCh <- Result{
|
|
Status: finalStatus,
|
|
Output: finalOutput,
|
|
Error: finalError,
|
|
DurationMs: duration.Milliseconds(),
|
|
SessionID: sessionID,
|
|
Usage: usageMap,
|
|
}
|
|
}()
|
|
|
|
return &Session{Messages: msgCh, Result: resCh}, nil
|
|
}
|
|
|
|
func kiroToolNameFromTitle(title string) string {
|
|
t := strings.TrimSpace(title)
|
|
if t == "" {
|
|
return ""
|
|
}
|
|
|
|
if idx := strings.Index(t, ":"); idx > 0 {
|
|
t = strings.TrimSpace(t[:idx])
|
|
}
|
|
|
|
lower := strings.ToLower(t)
|
|
switch lower {
|
|
case "read", "read file":
|
|
return "read_file"
|
|
case "write", "write file":
|
|
return "write_file"
|
|
case "edit", "patch":
|
|
return "edit_file"
|
|
case "shell", "bash", "terminal", "run command", "run shell command":
|
|
return "terminal"
|
|
case "grep", "search", "find":
|
|
return "search_files"
|
|
case "glob":
|
|
return "glob"
|
|
case "code":
|
|
return "code"
|
|
case "web search":
|
|
return "web_search"
|
|
case "fetch", "web fetch":
|
|
return "web_fetch"
|
|
case "todo", "todo write", "todo list", "todo_list":
|
|
return "todo_write"
|
|
}
|
|
|
|
return strings.ReplaceAll(lower, " ", "_")
|
|
}
|