mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-30 10:59:31 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/1b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78c9514499 | ||
|
|
0a49fef396 |
@@ -2,14 +2,17 @@ package agent
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -42,6 +45,15 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
return nil, fmt.Errorf("hermes executable not found at %q: %w", execPath, err)
|
||||
}
|
||||
|
||||
// Translate the agent's mcp_config (Claude-style object of objects)
|
||||
// into the array shape ACP `session/new` expects. Fail closed on
|
||||
// malformed JSON so the launch surfaces the real error instead of
|
||||
// silently dropping all MCP servers.
|
||||
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("hermes: invalid mcp_config: %w", err)
|
||||
}
|
||||
|
||||
timeout := opts.Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 20 * time.Minute
|
||||
@@ -194,7 +206,7 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
effectiveModel := strings.TrimSpace(opts.Model)
|
||||
|
||||
// 1. Initialize handshake.
|
||||
_, err := c.request(runCtx, "initialize", map[string]any{
|
||||
initResult, err := c.request(runCtx, "initialize", map[string]any{
|
||||
"protocolVersion": 1,
|
||||
"clientInfo": map[string]any{
|
||||
"name": "multica-agent-sdk",
|
||||
@@ -209,6 +221,13 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
return
|
||||
}
|
||||
|
||||
// Drop MCP entries whose remote transport the runtime didn't
|
||||
// advertise. ACP requires the client to honour
|
||||
// agentCapabilities.mcpCapabilities; sending an http/sse entry to
|
||||
// a runtime that says it only supports stdio reliably rejects the
|
||||
// whole session/new request.
|
||||
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "hermes", b.cfg.Logger)
|
||||
|
||||
// 2. Create or resume a session.
|
||||
cwd := opts.Cwd
|
||||
if cwd == "" {
|
||||
@@ -216,9 +235,14 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
}
|
||||
|
||||
if opts.ResumeSessionID != "" {
|
||||
// Per ACP Session Setup, session/resume accepts mcpServers and
|
||||
// the runtime re-connects them as part of the resume. Without
|
||||
// this, a resumed Hermes task lost access to MCP tools that a
|
||||
// fresh task on the same agent would have.
|
||||
result, err := c.request(runCtx, "session/resume", map[string]any{
|
||||
"cwd": cwd,
|
||||
"sessionId": opts.ResumeSessionID,
|
||||
"cwd": cwd,
|
||||
"sessionId": opts.ResumeSessionID,
|
||||
"mcpServers": mcpServers,
|
||||
})
|
||||
if err != nil {
|
||||
finalStatus = "failed"
|
||||
@@ -239,7 +263,7 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
|
||||
effectiveModel = extractACPCurrentModelID(result)
|
||||
}
|
||||
} else {
|
||||
result, err := c.request(runCtx, "session/new", buildHermesSessionParams(cwd, opts.Model))
|
||||
result, err := c.request(runCtx, "session/new", buildHermesSessionParams(cwd, opts.Model, mcpServers))
|
||||
if err != nil {
|
||||
finalStatus = "failed"
|
||||
finalError = fmt.Sprintf("hermes session/new failed: %v", err)
|
||||
@@ -1193,10 +1217,17 @@ func resolveResumedSessionID(requested string, response json.RawMessage) (string
|
||||
// buildHermesSessionParams constructs the params map for the ACP `session/new`
|
||||
// request. The `model` field is only included when non-empty so Hermes falls
|
||||
// back to its default only when no explicit model was configured.
|
||||
func buildHermesSessionParams(cwd, model string) map[string]any {
|
||||
//
|
||||
// mcpServers should be the ACP-shaped array produced by buildACPMcpServers
|
||||
// from the agent's mcp_config; a nil slice is normalised to an empty array
|
||||
// so the wire request always carries the field (ACP requires it).
|
||||
func buildHermesSessionParams(cwd, model string, mcpServers []any) map[string]any {
|
||||
if mcpServers == nil {
|
||||
mcpServers = []any{}
|
||||
}
|
||||
params := map[string]any{
|
||||
"cwd": cwd,
|
||||
"mcpServers": []any{},
|
||||
"mcpServers": mcpServers,
|
||||
}
|
||||
if model != "" {
|
||||
params["model"] = model
|
||||
@@ -1204,6 +1235,229 @@ func buildHermesSessionParams(cwd, model string) map[string]any {
|
||||
return params
|
||||
}
|
||||
|
||||
// buildACPMcpServers translates an agent's Claude-style mcp_config
|
||||
// (`{"mcpServers": {"<name>": {...}}}`) into the array shape that ACP's
|
||||
// `session/new` and `session/load` requests expect.
|
||||
//
|
||||
// Each Claude-style entry maps to one of:
|
||||
//
|
||||
// - Stdio: `{name, command, args, env: [{name,value}, ...]}` —
|
||||
// when the entry has a `command` field. No `type` field is emitted;
|
||||
// ACP treats untagged entries as stdio.
|
||||
// - HTTP / SSE: `{type, name, url, headers: [{name,value}, ...]}` —
|
||||
// when the entry has a `url` field. `type` defaults to "http"; Claude's
|
||||
// "sse" and "streamable-http" / "http_streamable" aliases are accepted.
|
||||
//
|
||||
// Empty / null input returns an empty slice — the launch proceeds with no
|
||||
// MCP servers (the existing default for ACP backends). Malformed top-level
|
||||
// JSON returns an error so the launch fails closed, mirroring codex's
|
||||
// `renderCodexMcpServersBlock` contract. Individual entries that have
|
||||
// neither `command` nor `url` are skipped with a warning rather than
|
||||
// failing the whole launch, so a single bad entry can't kill the agent.
|
||||
//
|
||||
// Output entries are sorted by name and each entry's env / headers are
|
||||
// sorted by key, so the wire request is deterministic across reruns —
|
||||
// useful for tests, log diffs, and reproducibility.
|
||||
func buildACPMcpServers(raw json.RawMessage, logger *slog.Logger) ([]any, error) {
|
||||
trimmed := bytes.TrimSpace(raw)
|
||||
if len(trimmed) == 0 || bytes.Equal(trimmed, []byte("null")) {
|
||||
return []any{}, nil
|
||||
}
|
||||
var parsed struct {
|
||||
McpServers map[string]json.RawMessage `json:"mcpServers"`
|
||||
}
|
||||
if err := json.Unmarshal(trimmed, &parsed); err != nil {
|
||||
return nil, fmt.Errorf("parse mcp_config json: %w", err)
|
||||
}
|
||||
if len(parsed.McpServers) == 0 {
|
||||
return []any{}, nil
|
||||
}
|
||||
|
||||
names := make([]string, 0, len(parsed.McpServers))
|
||||
for name := range parsed.McpServers {
|
||||
names = append(names, name)
|
||||
}
|
||||
sort.Strings(names)
|
||||
|
||||
out := make([]any, 0, len(names))
|
||||
for _, name := range names {
|
||||
entry, err := convertACPMcpServer(name, parsed.McpServers[name])
|
||||
if err != nil {
|
||||
if logger != nil {
|
||||
logger.Warn("skipping invalid mcp_config entry", "name", name, "error", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
out = append(out, entry)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// convertACPMcpServer converts a single Claude-style entry into the ACP
|
||||
// McpServer wire shape. Returns an error for entries that can't be
|
||||
// classified (no command and no url).
|
||||
func convertACPMcpServer(name string, raw json.RawMessage) (map[string]any, error) {
|
||||
var entry struct {
|
||||
Type string `json:"type"`
|
||||
Command string `json:"command"`
|
||||
Args []string `json:"args"`
|
||||
Env map[string]string `json:"env"`
|
||||
URL string `json:"url"`
|
||||
Headers map[string]string `json:"headers"`
|
||||
}
|
||||
if err := json.Unmarshal(raw, &entry); err != nil {
|
||||
return nil, fmt.Errorf("parse entry: %w", err)
|
||||
}
|
||||
|
||||
command := strings.TrimSpace(entry.Command)
|
||||
url := strings.TrimSpace(entry.URL)
|
||||
|
||||
if command != "" {
|
||||
args := entry.Args
|
||||
if args == nil {
|
||||
args = []string{}
|
||||
}
|
||||
envArr := make([]map[string]any, 0, len(entry.Env))
|
||||
for _, k := range sortedStringMapKeys(entry.Env) {
|
||||
envArr = append(envArr, map[string]any{
|
||||
"name": k,
|
||||
"value": entry.Env[k],
|
||||
})
|
||||
}
|
||||
return map[string]any{
|
||||
"name": name,
|
||||
"command": command,
|
||||
"args": args,
|
||||
"env": envArr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if url != "" {
|
||||
t := strings.ToLower(strings.TrimSpace(entry.Type))
|
||||
switch t {
|
||||
case "sse":
|
||||
t = "sse"
|
||||
case "", "http", "streamable-http", "http_streamable":
|
||||
t = "http"
|
||||
default:
|
||||
// Unknown remote transport — degrade to "http" rather than fail.
|
||||
// ACP servers that don't recognise the type will reject the
|
||||
// session/new request and surface a real error to the user.
|
||||
t = "http"
|
||||
}
|
||||
headerArr := make([]map[string]any, 0, len(entry.Headers))
|
||||
for _, k := range sortedStringMapKeys(entry.Headers) {
|
||||
headerArr = append(headerArr, map[string]any{
|
||||
"name": k,
|
||||
"value": entry.Headers[k],
|
||||
})
|
||||
}
|
||||
return map[string]any{
|
||||
"type": t,
|
||||
"name": name,
|
||||
"url": url,
|
||||
"headers": headerArr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("entry has neither command nor url")
|
||||
}
|
||||
|
||||
func sortedStringMapKeys(m map[string]string) []string {
|
||||
keys := make([]string, 0, len(m))
|
||||
for k := range m {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
return keys
|
||||
}
|
||||
|
||||
// acpMcpTransportCapabilities reports which remote MCP transports the ACP
|
||||
// runtime advertised in its `initialize` response. Stdio is always
|
||||
// supported (it's the baseline transport and the spec does not gate it),
|
||||
// so it's not represented here.
|
||||
type acpMcpTransportCapabilities struct {
|
||||
HTTP bool
|
||||
SSE bool
|
||||
}
|
||||
|
||||
// extractACPMcpCapabilities reads `agentCapabilities.mcpCapabilities.http`
|
||||
// and `.sse` out of an ACP `initialize` response. Missing or false fields
|
||||
// stay false, matching the spec default: the runtime must opt-in to
|
||||
// remote MCP transports. Unparseable responses degrade to "neither
|
||||
// supported" so we fail closed on remote entries.
|
||||
//
|
||||
// See https://agentclientprotocol.com/protocol/initialization — clients
|
||||
// MUST NOT send `mcpServers` entries with a type the agent did not
|
||||
// advertise support for.
|
||||
func extractACPMcpCapabilities(result json.RawMessage) acpMcpTransportCapabilities {
|
||||
var r struct {
|
||||
AgentCapabilities struct {
|
||||
McpCapabilities struct {
|
||||
HTTP bool `json:"http"`
|
||||
SSE bool `json:"sse"`
|
||||
} `json:"mcpCapabilities"`
|
||||
} `json:"agentCapabilities"`
|
||||
}
|
||||
if err := json.Unmarshal(result, &r); err != nil {
|
||||
return acpMcpTransportCapabilities{}
|
||||
}
|
||||
return acpMcpTransportCapabilities{
|
||||
HTTP: r.AgentCapabilities.McpCapabilities.HTTP,
|
||||
SSE: r.AgentCapabilities.McpCapabilities.SSE,
|
||||
}
|
||||
}
|
||||
|
||||
// filterACPMcpServersByCapability drops remote MCP entries whose transport
|
||||
// the runtime didn't advertise in its initialize response. Stdio entries
|
||||
// (no `type` field) always pass through.
|
||||
//
|
||||
// Sending an http/sse entry to a runtime that doesn't support it is a
|
||||
// protocol violation per the ACP spec, and Hermes / Kimi observed in
|
||||
// practice reject the whole session/new request with a JSON-RPC error.
|
||||
// Dropping the offending entries with a warning lets the rest of the
|
||||
// session start and surfaces the problem in the daemon log instead of
|
||||
// tanking every task on that agent.
|
||||
func filterACPMcpServersByCapability(
|
||||
servers []any,
|
||||
caps acpMcpTransportCapabilities,
|
||||
backend string,
|
||||
logger *slog.Logger,
|
||||
) []any {
|
||||
if len(servers) == 0 {
|
||||
return servers
|
||||
}
|
||||
filtered := make([]any, 0, len(servers))
|
||||
for _, raw := range servers {
|
||||
entry, ok := raw.(map[string]any)
|
||||
if !ok {
|
||||
filtered = append(filtered, raw)
|
||||
continue
|
||||
}
|
||||
transport, _ := entry["type"].(string)
|
||||
switch transport {
|
||||
case "http":
|
||||
if !caps.HTTP {
|
||||
if logger != nil {
|
||||
logger.Warn("dropping http MCP server: runtime did not advertise mcpCapabilities.http",
|
||||
"backend", backend, "name", entry["name"])
|
||||
}
|
||||
continue
|
||||
}
|
||||
case "sse":
|
||||
if !caps.SSE {
|
||||
if logger != nil {
|
||||
logger.Warn("dropping sse MCP server: runtime did not advertise mcpCapabilities.sse",
|
||||
"backend", backend, "name", entry["name"])
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
filtered = append(filtered, entry)
|
||||
}
|
||||
return filtered
|
||||
}
|
||||
|
||||
// hermesToolNameFromTitle extracts a tool name from the ACP tool call title.
|
||||
// Hermes ACP titles look like "terminal: ls -la", "read: /path/to/file", etc.
|
||||
// Some titles have no colon (e.g. "execute code").
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -149,7 +150,7 @@ func TestResolveResumedSessionIDEmptyResponse(t *testing.T) {
|
||||
|
||||
func TestBuildHermesSessionParamsIncludesModel(t *testing.T) {
|
||||
t.Parallel()
|
||||
params := buildHermesSessionParams("/tmp/work", "gpt-4o")
|
||||
params := buildHermesSessionParams("/tmp/work", "gpt-4o", nil)
|
||||
if params["cwd"] != "/tmp/work" {
|
||||
t.Errorf("cwd: got %v, want /tmp/work", params["cwd"])
|
||||
}
|
||||
@@ -163,12 +164,239 @@ func TestBuildHermesSessionParamsIncludesModel(t *testing.T) {
|
||||
|
||||
func TestBuildHermesSessionParamsOmitsEmptyModel(t *testing.T) {
|
||||
t.Parallel()
|
||||
params := buildHermesSessionParams("/tmp/work", "")
|
||||
params := buildHermesSessionParams("/tmp/work", "", nil)
|
||||
if _, present := params["model"]; present {
|
||||
t.Error("expected model key to be omitted when model is empty")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildHermesSessionParamsPassesThroughMcpServers(t *testing.T) {
|
||||
t.Parallel()
|
||||
servers := []any{map[string]any{"name": "fetch", "command": "uvx", "args": []string{}, "env": []map[string]any{}}}
|
||||
params := buildHermesSessionParams("/tmp/work", "", servers)
|
||||
got, ok := params["mcpServers"].([]any)
|
||||
if !ok {
|
||||
t.Fatalf("mcpServers: got %T, want []any", params["mcpServers"])
|
||||
}
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("len(mcpServers): got %d, want 1", len(got))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildHermesSessionParamsNilMcpServersBecomesEmptyArray(t *testing.T) {
|
||||
t.Parallel()
|
||||
// ACP requires the field; nil must surface as `[]` so the wire request
|
||||
// stays well-formed even when no MCP servers are configured.
|
||||
params := buildHermesSessionParams("/tmp/work", "", nil)
|
||||
got, ok := params["mcpServers"].([]any)
|
||||
if !ok {
|
||||
t.Fatalf("mcpServers: got %T, want []any", params["mcpServers"])
|
||||
}
|
||||
if len(got) != 0 {
|
||||
t.Errorf("len(mcpServers): got %d, want 0", len(got))
|
||||
}
|
||||
}
|
||||
|
||||
// ── buildACPMcpServers ──
|
||||
|
||||
func TestBuildACPMcpServersEmptyInputReturnsEmpty(t *testing.T) {
|
||||
t.Parallel()
|
||||
for _, raw := range []json.RawMessage{nil, {}, json.RawMessage("null"), json.RawMessage(" null "), json.RawMessage("{}"), json.RawMessage(`{"mcpServers":{}}`)} {
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("raw=%q: unexpected error: %v", string(raw), err)
|
||||
}
|
||||
if got == nil {
|
||||
t.Errorf("raw=%q: got nil, want non-nil empty slice", string(raw))
|
||||
}
|
||||
if len(got) != 0 {
|
||||
t.Errorf("raw=%q: got %d entries, want 0", string(raw), len(got))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersTranslatesStdioEntry(t *testing.T) {
|
||||
t.Parallel()
|
||||
raw := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx","args":["mcp-server-fetch"],"env":{"API_KEY":"secret","HOME":"/tmp"}}}}`)
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("len: got %d, want 1", len(got))
|
||||
}
|
||||
entry, ok := got[0].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("entry type: got %T, want map[string]any", got[0])
|
||||
}
|
||||
if entry["name"] != "fetch" {
|
||||
t.Errorf("name: got %v, want fetch", entry["name"])
|
||||
}
|
||||
if entry["command"] != "uvx" {
|
||||
t.Errorf("command: got %v, want uvx", entry["command"])
|
||||
}
|
||||
if _, hasType := entry["type"]; hasType {
|
||||
t.Errorf("stdio entry should not include type field, got %v", entry["type"])
|
||||
}
|
||||
args, ok := entry["args"].([]string)
|
||||
if !ok || len(args) != 1 || args[0] != "mcp-server-fetch" {
|
||||
t.Errorf("args: got %v, want [mcp-server-fetch]", entry["args"])
|
||||
}
|
||||
envArr, ok := entry["env"].([]map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("env type: got %T, want []map[string]any", entry["env"])
|
||||
}
|
||||
// Env entries sorted by key for determinism.
|
||||
if len(envArr) != 2 {
|
||||
t.Fatalf("len(env): got %d, want 2", len(envArr))
|
||||
}
|
||||
if envArr[0]["name"] != "API_KEY" || envArr[0]["value"] != "secret" {
|
||||
t.Errorf("env[0]: got %v, want {name:API_KEY,value:secret}", envArr[0])
|
||||
}
|
||||
if envArr[1]["name"] != "HOME" || envArr[1]["value"] != "/tmp" {
|
||||
t.Errorf("env[1]: got %v, want {name:HOME,value:/tmp}", envArr[1])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersStdioWithoutArgsOrEnvUsesEmptyArrays(t *testing.T) {
|
||||
t.Parallel()
|
||||
// ACP requires args and env to be arrays; missing fields must become
|
||||
// `[]` rather than null so the wire shape passes server-side validation.
|
||||
raw := json.RawMessage(`{"mcpServers":{"minimal":{"command":"echo"}}}`)
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
entry := got[0].(map[string]any)
|
||||
if args, ok := entry["args"].([]string); !ok || len(args) != 0 {
|
||||
t.Errorf("args: got %v, want []", entry["args"])
|
||||
}
|
||||
if env, ok := entry["env"].([]map[string]any); !ok || len(env) != 0 {
|
||||
t.Errorf("env: got %v, want []", entry["env"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersTranslatesHttpEntry(t *testing.T) {
|
||||
t.Parallel()
|
||||
raw := json.RawMessage(`{"mcpServers":{"remote":{"type":"http","url":"https://example.com/mcp","headers":{"Authorization":"Bearer x","X-Trace":"abc"}}}}`)
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("len: got %d, want 1", len(got))
|
||||
}
|
||||
entry := got[0].(map[string]any)
|
||||
if entry["type"] != "http" {
|
||||
t.Errorf("type: got %v, want http", entry["type"])
|
||||
}
|
||||
if entry["name"] != "remote" {
|
||||
t.Errorf("name: got %v, want remote", entry["name"])
|
||||
}
|
||||
if entry["url"] != "https://example.com/mcp" {
|
||||
t.Errorf("url: got %v, want https://example.com/mcp", entry["url"])
|
||||
}
|
||||
headers, ok := entry["headers"].([]map[string]any)
|
||||
if !ok || len(headers) != 2 {
|
||||
t.Fatalf("headers: got %v, want 2 entries", entry["headers"])
|
||||
}
|
||||
if headers[0]["name"] != "Authorization" {
|
||||
t.Errorf("headers[0].name: got %v, want Authorization", headers[0]["name"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersDefaultsRemoteTypeToHttp(t *testing.T) {
|
||||
t.Parallel()
|
||||
// A `url` without `type` should default to "http" rather than be classified
|
||||
// as stdio or get dropped.
|
||||
raw := json.RawMessage(`{"mcpServers":{"remote":{"url":"https://example.com/mcp"}}}`)
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
entry := got[0].(map[string]any)
|
||||
if entry["type"] != "http" {
|
||||
t.Errorf("type: got %v, want http (default)", entry["type"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersSupportsSseTransport(t *testing.T) {
|
||||
t.Parallel()
|
||||
raw := json.RawMessage(`{"mcpServers":{"remote":{"type":"sse","url":"https://example.com/sse"}}}`)
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
entry := got[0].(map[string]any)
|
||||
if entry["type"] != "sse" {
|
||||
t.Errorf("type: got %v, want sse", entry["type"])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersAcceptsStreamableHttpAlias(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Claude's MCP CLI uses "streamable-http" / "http_streamable" as
|
||||
// aliases for the http transport; ACP only knows "http", so the
|
||||
// translator must collapse the alias.
|
||||
for _, alias := range []string{"streamable-http", "http_streamable", "Streamable-HTTP"} {
|
||||
raw := json.RawMessage(`{"mcpServers":{"remote":{"type":"` + alias + `","url":"https://example.com/mcp"}}}`)
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("alias=%s: unexpected error: %v", alias, err)
|
||||
}
|
||||
entry := got[0].(map[string]any)
|
||||
if entry["type"] != "http" {
|
||||
t.Errorf("alias=%s: type got %v, want http", alias, entry["type"])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersSortsEntriesByName(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Map iteration is randomized in Go; the translator sorts by name so
|
||||
// the wire request and test assertions are deterministic.
|
||||
raw := json.RawMessage(`{"mcpServers":{"zeta":{"command":"z"},"alpha":{"command":"a"},"mid":{"command":"m"}}}`)
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
want := []string{"alpha", "mid", "zeta"}
|
||||
for i, w := range want {
|
||||
if got[i].(map[string]any)["name"] != w {
|
||||
t.Errorf("position %d: got %v, want %s", i, got[i].(map[string]any)["name"], w)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersSkipsInvalidEntriesAndContinues(t *testing.T) {
|
||||
t.Parallel()
|
||||
// An entry with neither command nor url is invalid — drop it with a
|
||||
// warning rather than failing the whole launch, so a single bad entry
|
||||
// in the agent UI doesn't take MCP down for the rest of the agent.
|
||||
raw := json.RawMessage(`{"mcpServers":{"bad":{"args":["nothing"]},"good":{"command":"uvx"}}}`)
|
||||
got, err := buildACPMcpServers(raw, slog.Default())
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("len: got %d, want 1 (bad entry should be skipped)", len(got))
|
||||
}
|
||||
if got[0].(map[string]any)["name"] != "good" {
|
||||
t.Errorf("kept the wrong entry: %v", got[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildACPMcpServersReturnsErrorOnMalformedJSON(t *testing.T) {
|
||||
t.Parallel()
|
||||
_, err := buildACPMcpServers(json.RawMessage(`not json`), slog.Default())
|
||||
if err == nil {
|
||||
t.Fatal("expected error for malformed JSON, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "parse mcp_config json") {
|
||||
t.Errorf("error message: got %q, want it to mention parsing", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// ── hermesToolNameFromTitle ──
|
||||
|
||||
func TestHermesToolNameFromTitle(t *testing.T) {
|
||||
@@ -1362,3 +1590,373 @@ func TestHermesBackendDoesNotPromoteOnTransientRetry(t *testing.T) {
|
||||
t.Fatal("timeout waiting for result")
|
||||
}
|
||||
}
|
||||
|
||||
// ── extractACPMcpCapabilities ──
|
||||
|
||||
func TestExtractACPMcpCapabilities(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
raw string
|
||||
wantHTTP bool
|
||||
wantSSE bool
|
||||
}{
|
||||
{
|
||||
name: "both true",
|
||||
raw: `{"protocolVersion":1,"agentCapabilities":{"mcpCapabilities":{"http":true,"sse":true}}}`,
|
||||
wantHTTP: true,
|
||||
wantSSE: true,
|
||||
},
|
||||
{
|
||||
name: "http only",
|
||||
raw: `{"agentCapabilities":{"mcpCapabilities":{"http":true}}}`,
|
||||
wantHTTP: true,
|
||||
wantSSE: false,
|
||||
},
|
||||
{
|
||||
name: "sse only",
|
||||
raw: `{"agentCapabilities":{"mcpCapabilities":{"sse":true}}}`,
|
||||
wantHTTP: false,
|
||||
wantSSE: true,
|
||||
},
|
||||
{
|
||||
name: "block missing",
|
||||
raw: `{"agentCapabilities":{}}`,
|
||||
wantHTTP: false,
|
||||
wantSSE: false,
|
||||
},
|
||||
{
|
||||
name: "agentCapabilities missing",
|
||||
raw: `{"protocolVersion":1}`,
|
||||
wantHTTP: false,
|
||||
wantSSE: false,
|
||||
},
|
||||
{
|
||||
name: "malformed json",
|
||||
raw: `not json`,
|
||||
wantHTTP: false,
|
||||
wantSSE: false,
|
||||
},
|
||||
}
|
||||
for _, tc := range tests {
|
||||
got := extractACPMcpCapabilities(json.RawMessage(tc.raw))
|
||||
if got.HTTP != tc.wantHTTP || got.SSE != tc.wantSSE {
|
||||
t.Errorf("%s: got {HTTP:%v SSE:%v}, want {HTTP:%v SSE:%v}", tc.name, got.HTTP, got.SSE, tc.wantHTTP, tc.wantSSE)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── filterACPMcpServersByCapability ──
|
||||
|
||||
func TestFilterACPMcpServersByCapabilityStdioAlwaysPassesThrough(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Stdio entries have no `type` field — the ACP spec doesn't gate stdio,
|
||||
// so the filter must pass them through regardless of capabilities.
|
||||
servers := []any{
|
||||
map[string]any{"name": "fetch", "command": "uvx"},
|
||||
}
|
||||
got := filterACPMcpServersByCapability(servers, acpMcpTransportCapabilities{}, "hermes", slog.Default())
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("len: got %d, want 1", len(got))
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterACPMcpServersByCapabilityDropsUnsupportedHttp(t *testing.T) {
|
||||
t.Parallel()
|
||||
servers := []any{
|
||||
map[string]any{"name": "stdio-ok", "command": "uvx"},
|
||||
map[string]any{"type": "http", "name": "http-drop", "url": "https://x/mcp"},
|
||||
map[string]any{"type": "sse", "name": "sse-keep", "url": "https://x/sse"},
|
||||
}
|
||||
got := filterACPMcpServersByCapability(servers, acpMcpTransportCapabilities{SSE: true}, "hermes", slog.Default())
|
||||
if len(got) != 2 {
|
||||
t.Fatalf("len: got %d, want 2 (http should be dropped, sse kept)", len(got))
|
||||
}
|
||||
names := []string{got[0].(map[string]any)["name"].(string), got[1].(map[string]any)["name"].(string)}
|
||||
wantNames := map[string]bool{"stdio-ok": true, "sse-keep": true}
|
||||
for _, n := range names {
|
||||
if !wantNames[n] {
|
||||
t.Errorf("unexpected entry kept: %q", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterACPMcpServersByCapabilityDropsUnsupportedSse(t *testing.T) {
|
||||
t.Parallel()
|
||||
servers := []any{
|
||||
map[string]any{"type": "sse", "name": "sse-drop", "url": "https://x/sse"},
|
||||
map[string]any{"type": "http", "name": "http-keep", "url": "https://x/mcp"},
|
||||
}
|
||||
got := filterACPMcpServersByCapability(servers, acpMcpTransportCapabilities{HTTP: true}, "kimi", slog.Default())
|
||||
if len(got) != 1 {
|
||||
t.Fatalf("len: got %d, want 1", len(got))
|
||||
}
|
||||
if got[0].(map[string]any)["name"] != "http-keep" {
|
||||
t.Errorf("kept wrong entry: %v", got[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterACPMcpServersByCapabilityKeepsAllWhenBothSupported(t *testing.T) {
|
||||
t.Parallel()
|
||||
servers := []any{
|
||||
map[string]any{"name": "stdio", "command": "uvx"},
|
||||
map[string]any{"type": "http", "name": "http", "url": "https://x/mcp"},
|
||||
map[string]any{"type": "sse", "name": "sse", "url": "https://x/sse"},
|
||||
}
|
||||
got := filterACPMcpServersByCapability(servers, acpMcpTransportCapabilities{HTTP: true, SSE: true}, "kiro", slog.Default())
|
||||
if len(got) != 3 {
|
||||
t.Fatalf("len: got %d, want 3", len(got))
|
||||
}
|
||||
}
|
||||
|
||||
func TestFilterACPMcpServersByCapabilityEmptyInputReturnsEmpty(t *testing.T) {
|
||||
t.Parallel()
|
||||
got := filterACPMcpServersByCapability(nil, acpMcpTransportCapabilities{HTTP: true, SSE: true}, "hermes", slog.Default())
|
||||
if len(got) != 0 {
|
||||
t.Errorf("len: got %d, want 0", len(got))
|
||||
}
|
||||
}
|
||||
|
||||
// TestHermesExecuteFailsClosedOnMalformedMcpConfig pins the contract that
|
||||
// a malformed mcp_config aborts the launch *before* the child is spawned.
|
||||
// Silently launching with no MCP servers would look indistinguishable
|
||||
// from "the saved config was applied" and is exactly the surprise the
|
||||
// MCP Tab is meant to remove.
|
||||
func TestHermesExecuteFailsClosedOnMalformedMcpConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
// Any existing executable is fine — Execute returns before the spawn.
|
||||
fakePath := filepath.Join(t.TempDir(), "hermes")
|
||||
writeTestExecutable(t, fakePath, []byte("#!/bin/sh\nexit 0\n"))
|
||||
|
||||
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
|
||||
if err != nil {
|
||||
t.Fatalf("new hermes backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
_, err = backend.Execute(ctx, "prompt", ExecOptions{
|
||||
Timeout: 2 * time.Second,
|
||||
McpConfig: json.RawMessage(`not json`),
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("expected Execute to fail closed on malformed mcp_config, got nil error")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "mcp_config") {
|
||||
t.Fatalf("expected error to mention mcp_config, got %q", err)
|
||||
}
|
||||
}
|
||||
|
||||
// fakeACPRecordingScript impersonates an ACP agent that records every
|
||||
// JSON-RPC frame it receives to a file (one per line) before responding.
|
||||
// The runtime name parameter lets the same script drive Hermes / Kimi /
|
||||
// Kiro fakes — only the session/load vs session/resume method differs.
|
||||
//
|
||||
// `caps` is the JSON for `agentCapabilities` returned from initialize so
|
||||
// tests can pin the capability gate (e.g. `{"mcpCapabilities":{"http":false}}`).
|
||||
//
|
||||
// session/new / session/resume both echo back the requested sessionId so
|
||||
// tests don't need to thread one through; session/prompt returns
|
||||
// end_turn so Execute completes cleanly.
|
||||
func fakeACPRecordingScript(recordPath, sessionID, caps string) string {
|
||||
return `#!/bin/sh
|
||||
RECORD_PATH=` + recordPath + `
|
||||
while IFS= read -r line; do
|
||||
printf '%s\n' "$line" >> "$RECORD_PATH"
|
||||
id=$(printf '%s' "$line" | sed -n 's/.*"id":\([0-9]*\).*/\1/p')
|
||||
case "$line" in
|
||||
*'"method":"initialize"'*)
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"protocolVersion":1,"agentCapabilities":` + caps + `}}\n' "$id"
|
||||
;;
|
||||
*'"method":"session/new"'*|*'"method":"session/resume"'*|*'"method":"session/load"'*)
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"sessionId":"` + sessionID + `"}}\n' "$id"
|
||||
;;
|
||||
*'"method":"session/prompt"'*)
|
||||
printf '{"jsonrpc":"2.0","id":%s,"result":{"stopReason":"end_turn"}}\n' "$id"
|
||||
exit 0
|
||||
;;
|
||||
esac
|
||||
done
|
||||
`
|
||||
}
|
||||
|
||||
// findRecordedFrame returns the first recorded JSON-RPC frame whose
|
||||
// `method` matches the requested one. Used by the resume / capability
|
||||
// tests below to inspect what we actually sent on the wire.
|
||||
func findRecordedFrame(t *testing.T, recordPath, method string) map[string]any {
|
||||
t.Helper()
|
||||
data, err := os.ReadFile(recordPath)
|
||||
if err != nil {
|
||||
t.Fatalf("read record file: %v", err)
|
||||
}
|
||||
for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") {
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
var frame map[string]any
|
||||
if err := json.Unmarshal([]byte(line), &frame); err != nil {
|
||||
continue
|
||||
}
|
||||
if frame["method"] == method {
|
||||
return frame
|
||||
}
|
||||
}
|
||||
t.Fatalf("no recorded frame for method %q in %s", method, string(data))
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestHermesResumeIncludesMcpServers pins the contract that
|
||||
// session/resume carries the managed MCP set. Without this, a resumed
|
||||
// Hermes task lost access to MCP tools that a fresh task on the same
|
||||
// agent would have — which is the inconsistency Elon's review flagged.
|
||||
func TestHermesResumeIncludesMcpServers(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
|
||||
fakePath := filepath.Join(t.TempDir(), "hermes")
|
||||
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_resume", `{}`)))
|
||||
|
||||
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
|
||||
if err != nil {
|
||||
t.Fatalf("new hermes backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
|
||||
Timeout: 5 * time.Second,
|
||||
ResumeSessionID: "ses_resume",
|
||||
McpConfig: json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-session.Result:
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for result")
|
||||
}
|
||||
|
||||
frame := findRecordedFrame(t, recordPath, "session/resume")
|
||||
params, ok := frame["params"].(map[string]any)
|
||||
if !ok {
|
||||
t.Fatalf("session/resume params: got %T, want map", frame["params"])
|
||||
}
|
||||
servers, ok := params["mcpServers"].([]any)
|
||||
if !ok {
|
||||
t.Fatalf("session/resume.mcpServers: got %T, want []any", params["mcpServers"])
|
||||
}
|
||||
if len(servers) != 1 {
|
||||
t.Fatalf("session/resume.mcpServers: got %d entries, want 1", len(servers))
|
||||
}
|
||||
entry := servers[0].(map[string]any)
|
||||
if entry["name"] != "fetch" || entry["command"] != "uvx" {
|
||||
t.Errorf("session/resume.mcpServers[0]: got %v, want {name:fetch,command:uvx,...}", entry)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHermesDropsRemoteMcpWhenCapabilityNotAdvertised pins the contract
|
||||
// that when the runtime's initialize response advertises no http/sse
|
||||
// support, those entries are filtered out of session/new — sending them
|
||||
// anyway is a protocol violation that reliably tanks the request.
|
||||
func TestHermesDropsRemoteMcpWhenCapabilityNotAdvertised(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
|
||||
fakePath := filepath.Join(t.TempDir(), "hermes")
|
||||
// agentCapabilities = {} → neither http nor sse advertised.
|
||||
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_new", `{}`)))
|
||||
|
||||
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
|
||||
if err != nil {
|
||||
t.Fatalf("new hermes backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
|
||||
Timeout: 5 * time.Second,
|
||||
McpConfig: json.RawMessage(`{"mcpServers":{
|
||||
"local":{"command":"uvx"},
|
||||
"remote-http":{"type":"http","url":"https://x/mcp"},
|
||||
"remote-sse":{"type":"sse","url":"https://x/sse"}
|
||||
}}`),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-session.Result:
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for result")
|
||||
}
|
||||
|
||||
frame := findRecordedFrame(t, recordPath, "session/new")
|
||||
params := frame["params"].(map[string]any)
|
||||
servers, ok := params["mcpServers"].([]any)
|
||||
if !ok {
|
||||
t.Fatalf("session/new.mcpServers: got %T, want []any", params["mcpServers"])
|
||||
}
|
||||
if len(servers) != 1 {
|
||||
t.Fatalf("session/new.mcpServers: got %d entries, want 1 (only stdio should remain)", len(servers))
|
||||
}
|
||||
if servers[0].(map[string]any)["name"] != "local" {
|
||||
t.Errorf("kept the wrong entry: %v", servers[0])
|
||||
}
|
||||
}
|
||||
|
||||
// TestHermesKeepsRemoteMcpWhenCapabilityAdvertised confirms the gate
|
||||
// doesn't over-filter: when the runtime advertises http+sse, all entries
|
||||
// must pass through to session/new.
|
||||
func TestHermesKeepsRemoteMcpWhenCapabilityAdvertised(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
|
||||
fakePath := filepath.Join(t.TempDir(), "hermes")
|
||||
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_new", `{"mcpCapabilities":{"http":true,"sse":true}}`)))
|
||||
|
||||
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
|
||||
if err != nil {
|
||||
t.Fatalf("new hermes backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
|
||||
Timeout: 5 * time.Second,
|
||||
McpConfig: json.RawMessage(`{"mcpServers":{
|
||||
"local":{"command":"uvx"},
|
||||
"remote-http":{"type":"http","url":"https://x/mcp"},
|
||||
"remote-sse":{"type":"sse","url":"https://x/sse"}
|
||||
}}`),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-session.Result:
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for result")
|
||||
}
|
||||
|
||||
frame := findRecordedFrame(t, recordPath, "session/new")
|
||||
params := frame["params"].(map[string]any)
|
||||
servers := params["mcpServers"].([]any)
|
||||
if len(servers) != 3 {
|
||||
t.Fatalf("session/new.mcpServers: got %d entries, want 3", len(servers))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,15 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
return nil, fmt.Errorf("kimi executable not found at %q: %w", execPath, err)
|
||||
}
|
||||
|
||||
// Translate the agent's mcp_config (Claude-style object of objects)
|
||||
// into the array shape ACP `session/new` expects. Fail closed on
|
||||
// malformed JSON so the launch surfaces the real error instead of
|
||||
// silently dropping all MCP servers.
|
||||
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("kimi: invalid mcp_config: %w", err)
|
||||
}
|
||||
|
||||
timeout := opts.Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 20 * time.Minute
|
||||
@@ -174,7 +183,7 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
var sessionID string
|
||||
|
||||
// 1. Initialize handshake.
|
||||
_, err := c.request(runCtx, "initialize", map[string]any{
|
||||
initResult, err := c.request(runCtx, "initialize", map[string]any{
|
||||
"protocolVersion": 1,
|
||||
"clientInfo": map[string]any{
|
||||
"name": "multica-agent-sdk",
|
||||
@@ -189,6 +198,12 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
return
|
||||
}
|
||||
|
||||
// Drop MCP entries whose remote transport the runtime didn't
|
||||
// advertise. See the matching comment in hermes.go for the why —
|
||||
// shipping an http/sse entry to a stdio-only runtime tanks the
|
||||
// whole session/new.
|
||||
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "kimi", b.cfg.Logger)
|
||||
|
||||
// 2. Create or resume a session.
|
||||
cwd := opts.Cwd
|
||||
if cwd == "" {
|
||||
@@ -196,9 +211,14 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
}
|
||||
|
||||
if opts.ResumeSessionID != "" {
|
||||
// Per ACP Session Setup, session/resume accepts mcpServers and
|
||||
// the runtime re-connects them as part of the resume. Without
|
||||
// this, a resumed Kimi task lost access to MCP tools that a
|
||||
// fresh task on the same agent would have.
|
||||
result, err := c.request(runCtx, "session/resume", map[string]any{
|
||||
"cwd": cwd,
|
||||
"sessionId": opts.ResumeSessionID,
|
||||
"cwd": cwd,
|
||||
"sessionId": opts.ResumeSessionID,
|
||||
"mcpServers": mcpServers,
|
||||
})
|
||||
if err != nil {
|
||||
finalStatus = "failed"
|
||||
@@ -218,7 +238,7 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
} else {
|
||||
result, err := c.request(runCtx, "session/new", map[string]any{
|
||||
"cwd": cwd,
|
||||
"mcpServers": []any{},
|
||||
"mcpServers": mcpServers,
|
||||
})
|
||||
if err != nil {
|
||||
finalStatus = "failed"
|
||||
|
||||
@@ -2,6 +2,7 @@ package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -209,3 +210,49 @@ func TestKimiBackendInvokesACPSubcommand(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestKimiResumeIncludesMcpServers pins the same contract as the matching
|
||||
// Hermes test: session/resume must carry the managed MCP set so a resumed
|
||||
// Kimi task has the same MCP tools as a fresh one.
|
||||
func TestKimiResumeIncludesMcpServers(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
|
||||
fakePath := filepath.Join(t.TempDir(), "kimi")
|
||||
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_resume", `{}`)))
|
||||
|
||||
backend, err := New("kimi", Config{ExecutablePath: fakePath, Logger: slog.Default()})
|
||||
if err != nil {
|
||||
t.Fatalf("new kimi backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
|
||||
Timeout: 5 * time.Second,
|
||||
ResumeSessionID: "ses_resume",
|
||||
McpConfig: json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-session.Result:
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for result")
|
||||
}
|
||||
|
||||
frame := findRecordedFrame(t, recordPath, "session/resume")
|
||||
params := frame["params"].(map[string]any)
|
||||
servers, ok := params["mcpServers"].([]any)
|
||||
if !ok {
|
||||
t.Fatalf("session/resume.mcpServers: got %T, want []any", params["mcpServers"])
|
||||
}
|
||||
if len(servers) != 1 || servers[0].(map[string]any)["name"] != "fetch" {
|
||||
t.Fatalf("session/resume.mcpServers: got %v, want one entry named fetch", servers)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,6 +44,15 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
return nil, fmt.Errorf("kiro executable not found at %q: %w", execPath, err)
|
||||
}
|
||||
|
||||
// Translate the agent's mcp_config (Claude-style object of objects)
|
||||
// into the array shape ACP `session/new` and `session/load` expect.
|
||||
// Fail closed on malformed JSON so the launch surfaces the real error
|
||||
// instead of silently dropping all MCP servers.
|
||||
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("kiro: invalid mcp_config: %w", err)
|
||||
}
|
||||
|
||||
timeout := opts.Timeout
|
||||
if timeout == 0 {
|
||||
timeout = 20 * time.Minute
|
||||
@@ -165,7 +174,7 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
var finalError string
|
||||
var sessionID string
|
||||
|
||||
_, err := c.request(runCtx, "initialize", map[string]any{
|
||||
initResult, err := c.request(runCtx, "initialize", map[string]any{
|
||||
"protocolVersion": 1,
|
||||
"clientInfo": map[string]any{
|
||||
"name": "multica-agent-sdk",
|
||||
@@ -180,6 +189,12 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
return
|
||||
}
|
||||
|
||||
// Drop MCP entries whose remote transport the runtime didn't
|
||||
// advertise. See the matching comment in hermes.go for why
|
||||
// unconditionally sending http/sse to a stdio-only ACP runtime
|
||||
// tanks the whole session/new.
|
||||
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "kiro", b.cfg.Logger)
|
||||
|
||||
cwd := opts.Cwd
|
||||
if cwd == "" {
|
||||
cwd = "."
|
||||
@@ -189,7 +204,7 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
result, err := c.request(runCtx, "session/load", map[string]any{
|
||||
"cwd": cwd,
|
||||
"sessionId": opts.ResumeSessionID,
|
||||
"mcpServers": []any{},
|
||||
"mcpServers": mcpServers,
|
||||
})
|
||||
if err != nil {
|
||||
finalStatus = "failed"
|
||||
@@ -217,7 +232,7 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
|
||||
} else {
|
||||
result, err := c.request(runCtx, "session/new", map[string]any{
|
||||
"cwd": cwd,
|
||||
"mcpServers": []any{},
|
||||
"mcpServers": mcpServers,
|
||||
})
|
||||
if err != nil {
|
||||
finalStatus = "failed"
|
||||
|
||||
@@ -2,6 +2,7 @@ package agent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -315,3 +316,50 @@ func TestKiroBackendUsesSessionLoadForResume(t *testing.T) {
|
||||
t.Fatalf("session/prompt must send standard ACP prompt field for Kiro 2.1.1 compatibility, got:\n%s", requests)
|
||||
}
|
||||
}
|
||||
|
||||
// TestKiroLoadIncludesMcpServersFromConfig pins that the agent's managed
|
||||
// MCP set actually reaches the wire on session/load — the resume path is
|
||||
// otherwise indistinguishable from the no-config case, which is how the
|
||||
// missing-on-resume bug got past the first round of review.
|
||||
func TestKiroLoadIncludesMcpServersFromConfig(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
|
||||
fakePath := filepath.Join(t.TempDir(), "kiro-cli")
|
||||
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_load", `{}`)))
|
||||
|
||||
backend, err := New("kiro", Config{ExecutablePath: fakePath, Logger: slog.Default()})
|
||||
if err != nil {
|
||||
t.Fatalf("new kiro backend: %v", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
|
||||
Timeout: 5 * time.Second,
|
||||
ResumeSessionID: "ses_load",
|
||||
McpConfig: json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("execute: %v", err)
|
||||
}
|
||||
go func() {
|
||||
for range session.Messages {
|
||||
}
|
||||
}()
|
||||
select {
|
||||
case <-session.Result:
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timeout waiting for result")
|
||||
}
|
||||
|
||||
frame := findRecordedFrame(t, recordPath, "session/load")
|
||||
params := frame["params"].(map[string]any)
|
||||
servers, ok := params["mcpServers"].([]any)
|
||||
if !ok {
|
||||
t.Fatalf("session/load.mcpServers: got %T, want []any", params["mcpServers"])
|
||||
}
|
||||
if len(servers) != 1 || servers[0].(map[string]any)["name"] != "fetch" {
|
||||
t.Fatalf("session/load.mcpServers: got %v, want one entry named fetch", servers)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user