Compare commits

...

2 Commits

Author SHA1 Message Date
J
78c9514499 MUL-2764 fix(agent): wire mcp_config through ACP resume + gate http/sse on capability
Addresses the two blockers Elon raised on #3439:

1. session/resume now carries mcpServers for Hermes and Kimi (Kiro's
   session/load already did). Per the ACP Session Setup spec the resume
   path re-attaches MCP servers, and without this a resumed task lost
   access to MCP tools that a fresh task on the same agent would have
   had. Pinned with new TestHermesResumeIncludesMcpServers and
   TestKimiResumeIncludesMcpServers integration tests that inspect the
   recorded wire request.

2. Added extractACPMcpCapabilities + filterACPMcpServersByCapability so
   http/sse MCP entries get dropped (with a daemon-log warning naming
   the entry) when the runtime's initialize response doesn't advertise
   mcpCapabilities.http / .sse. Sending those entries to a stdio-only
   runtime is a spec violation and reliably tanks session/new; now they
   get filtered and the rest of the session still starts. Stdio entries
   pass through unconditionally. Both backends wire the filter in right
   after initialize so session/new and session/resume see the same
   filtered list.

Also added TestKiroLoadIncludesMcpServersFromConfig — Elon flagged that
no test pinned "non-empty mcp_config actually reaches the wire" for
Kimi/Kiro, so the wire assertions go in for all three runtimes.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-28 16:21:50 +08:00
J
0a49fef396 MUL-2764 feat(agent): wire mcp_config through ACP runtimes (Hermes / Kimi / Kiro)
The MCP config Tab (#3419) already lets admins save mcp_config on an
agent, and the daemon plumbs it through to `agent.ExecOptions.McpConfig`
for every runtime. Claude and Codex consume it; the three ACP runtimes
(Hermes / Kimi / Kiro) ignored the field and hardcoded an empty
`mcpServers: []` in their `session/new` requests.

Add `buildACPMcpServers` to translate the Claude-style `{"mcpServers":
{"<name>": {...}}}` object-of-objects into the array shape ACP requires
(`[{name, command, args, env: [{name,value}, ...]}, ...]` for stdio;
`[{type, name, url, headers: [...]}, ...]` for http/sse), then pass the
translated array on `session/new` (all three) and `session/load` (kiro
resume). Malformed JSON fails the launch closed — same contract Codex's
`renderCodexMcpServersBlock` uses — so users see a real error instead of
silently running with no MCP servers. Individual unclassifiable entries
(no command, no url) are skipped with a warning so one bad row can't
take MCP down for the rest of the agent.

Co-authored-by: multica-agent <github@multica.ai>
2026-05-28 15:59:36 +08:00
6 changed files with 997 additions and 15 deletions

View File

@@ -2,14 +2,17 @@ package agent
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"os/exec"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"sync"
@@ -42,6 +45,15 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
return nil, fmt.Errorf("hermes executable not found at %q: %w", execPath, err)
}
// Translate the agent's mcp_config (Claude-style object of objects)
// into the array shape ACP `session/new` expects. Fail closed on
// malformed JSON so the launch surfaces the real error instead of
// silently dropping all MCP servers.
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
if err != nil {
return nil, fmt.Errorf("hermes: invalid mcp_config: %w", err)
}
timeout := opts.Timeout
if timeout == 0 {
timeout = 20 * time.Minute
@@ -194,7 +206,7 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
effectiveModel := strings.TrimSpace(opts.Model)
// 1. Initialize handshake.
_, err := c.request(runCtx, "initialize", map[string]any{
initResult, err := c.request(runCtx, "initialize", map[string]any{
"protocolVersion": 1,
"clientInfo": map[string]any{
"name": "multica-agent-sdk",
@@ -209,6 +221,13 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
return
}
// Drop MCP entries whose remote transport the runtime didn't
// advertise. ACP requires the client to honour
// agentCapabilities.mcpCapabilities; sending an http/sse entry to
// a runtime that says it only supports stdio reliably rejects the
// whole session/new request.
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "hermes", b.cfg.Logger)
// 2. Create or resume a session.
cwd := opts.Cwd
if cwd == "" {
@@ -216,9 +235,14 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
}
if opts.ResumeSessionID != "" {
// Per ACP Session Setup, session/resume accepts mcpServers and
// the runtime re-connects them as part of the resume. Without
// this, a resumed Hermes task lost access to MCP tools that a
// fresh task on the same agent would have.
result, err := c.request(runCtx, "session/resume", map[string]any{
"cwd": cwd,
"sessionId": opts.ResumeSessionID,
"cwd": cwd,
"sessionId": opts.ResumeSessionID,
"mcpServers": mcpServers,
})
if err != nil {
finalStatus = "failed"
@@ -239,7 +263,7 @@ func (b *hermesBackend) Execute(ctx context.Context, prompt string, opts ExecOpt
effectiveModel = extractACPCurrentModelID(result)
}
} else {
result, err := c.request(runCtx, "session/new", buildHermesSessionParams(cwd, opts.Model))
result, err := c.request(runCtx, "session/new", buildHermesSessionParams(cwd, opts.Model, mcpServers))
if err != nil {
finalStatus = "failed"
finalError = fmt.Sprintf("hermes session/new failed: %v", err)
@@ -1193,10 +1217,17 @@ func resolveResumedSessionID(requested string, response json.RawMessage) (string
// buildHermesSessionParams constructs the params map for the ACP `session/new`
// request. The `model` field is only included when non-empty so Hermes falls
// back to its default only when no explicit model was configured.
func buildHermesSessionParams(cwd, model string) map[string]any {
//
// mcpServers should be the ACP-shaped array produced by buildACPMcpServers
// from the agent's mcp_config; a nil slice is normalised to an empty array
// so the wire request always carries the field (ACP requires it).
func buildHermesSessionParams(cwd, model string, mcpServers []any) map[string]any {
if mcpServers == nil {
mcpServers = []any{}
}
params := map[string]any{
"cwd": cwd,
"mcpServers": []any{},
"mcpServers": mcpServers,
}
if model != "" {
params["model"] = model
@@ -1204,6 +1235,229 @@ func buildHermesSessionParams(cwd, model string) map[string]any {
return params
}
// buildACPMcpServers translates an agent's Claude-style mcp_config
// (`{"mcpServers": {"<name>": {...}}}`) into the array shape that ACP's
// `session/new` and `session/load` requests expect.
//
// Each Claude-style entry maps to one of:
//
// - Stdio: `{name, command, args, env: [{name,value}, ...]}` —
// when the entry has a `command` field. No `type` field is emitted;
// ACP treats untagged entries as stdio.
// - HTTP / SSE: `{type, name, url, headers: [{name,value}, ...]}` —
// when the entry has a `url` field. `type` defaults to "http"; Claude's
// "sse" and "streamable-http" / "http_streamable" aliases are accepted.
//
// Empty / null input returns an empty slice — the launch proceeds with no
// MCP servers (the existing default for ACP backends). Malformed top-level
// JSON returns an error so the launch fails closed, mirroring codex's
// `renderCodexMcpServersBlock` contract. Individual entries that have
// neither `command` nor `url` are skipped with a warning rather than
// failing the whole launch, so a single bad entry can't kill the agent.
//
// Output entries are sorted by name and each entry's env / headers are
// sorted by key, so the wire request is deterministic across reruns —
// useful for tests, log diffs, and reproducibility.
func buildACPMcpServers(raw json.RawMessage, logger *slog.Logger) ([]any, error) {
trimmed := bytes.TrimSpace(raw)
if len(trimmed) == 0 || bytes.Equal(trimmed, []byte("null")) {
return []any{}, nil
}
var parsed struct {
McpServers map[string]json.RawMessage `json:"mcpServers"`
}
if err := json.Unmarshal(trimmed, &parsed); err != nil {
return nil, fmt.Errorf("parse mcp_config json: %w", err)
}
if len(parsed.McpServers) == 0 {
return []any{}, nil
}
names := make([]string, 0, len(parsed.McpServers))
for name := range parsed.McpServers {
names = append(names, name)
}
sort.Strings(names)
out := make([]any, 0, len(names))
for _, name := range names {
entry, err := convertACPMcpServer(name, parsed.McpServers[name])
if err != nil {
if logger != nil {
logger.Warn("skipping invalid mcp_config entry", "name", name, "error", err)
}
continue
}
out = append(out, entry)
}
return out, nil
}
// convertACPMcpServer converts a single Claude-style entry into the ACP
// McpServer wire shape. Returns an error for entries that can't be
// classified (no command and no url).
func convertACPMcpServer(name string, raw json.RawMessage) (map[string]any, error) {
var entry struct {
Type string `json:"type"`
Command string `json:"command"`
Args []string `json:"args"`
Env map[string]string `json:"env"`
URL string `json:"url"`
Headers map[string]string `json:"headers"`
}
if err := json.Unmarshal(raw, &entry); err != nil {
return nil, fmt.Errorf("parse entry: %w", err)
}
command := strings.TrimSpace(entry.Command)
url := strings.TrimSpace(entry.URL)
if command != "" {
args := entry.Args
if args == nil {
args = []string{}
}
envArr := make([]map[string]any, 0, len(entry.Env))
for _, k := range sortedStringMapKeys(entry.Env) {
envArr = append(envArr, map[string]any{
"name": k,
"value": entry.Env[k],
})
}
return map[string]any{
"name": name,
"command": command,
"args": args,
"env": envArr,
}, nil
}
if url != "" {
t := strings.ToLower(strings.TrimSpace(entry.Type))
switch t {
case "sse":
t = "sse"
case "", "http", "streamable-http", "http_streamable":
t = "http"
default:
// Unknown remote transport — degrade to "http" rather than fail.
// ACP servers that don't recognise the type will reject the
// session/new request and surface a real error to the user.
t = "http"
}
headerArr := make([]map[string]any, 0, len(entry.Headers))
for _, k := range sortedStringMapKeys(entry.Headers) {
headerArr = append(headerArr, map[string]any{
"name": k,
"value": entry.Headers[k],
})
}
return map[string]any{
"type": t,
"name": name,
"url": url,
"headers": headerArr,
}, nil
}
return nil, fmt.Errorf("entry has neither command nor url")
}
func sortedStringMapKeys(m map[string]string) []string {
keys := make([]string, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Strings(keys)
return keys
}
// acpMcpTransportCapabilities reports which remote MCP transports the ACP
// runtime advertised in its `initialize` response. Stdio is always
// supported (it's the baseline transport and the spec does not gate it),
// so it's not represented here.
type acpMcpTransportCapabilities struct {
HTTP bool
SSE bool
}
// extractACPMcpCapabilities reads `agentCapabilities.mcpCapabilities.http`
// and `.sse` out of an ACP `initialize` response. Missing or false fields
// stay false, matching the spec default: the runtime must opt-in to
// remote MCP transports. Unparseable responses degrade to "neither
// supported" so we fail closed on remote entries.
//
// See https://agentclientprotocol.com/protocol/initialization — clients
// MUST NOT send `mcpServers` entries with a type the agent did not
// advertise support for.
func extractACPMcpCapabilities(result json.RawMessage) acpMcpTransportCapabilities {
var r struct {
AgentCapabilities struct {
McpCapabilities struct {
HTTP bool `json:"http"`
SSE bool `json:"sse"`
} `json:"mcpCapabilities"`
} `json:"agentCapabilities"`
}
if err := json.Unmarshal(result, &r); err != nil {
return acpMcpTransportCapabilities{}
}
return acpMcpTransportCapabilities{
HTTP: r.AgentCapabilities.McpCapabilities.HTTP,
SSE: r.AgentCapabilities.McpCapabilities.SSE,
}
}
// filterACPMcpServersByCapability drops remote MCP entries whose transport
// the runtime didn't advertise in its initialize response. Stdio entries
// (no `type` field) always pass through.
//
// Sending an http/sse entry to a runtime that doesn't support it is a
// protocol violation per the ACP spec, and Hermes / Kimi observed in
// practice reject the whole session/new request with a JSON-RPC error.
// Dropping the offending entries with a warning lets the rest of the
// session start and surfaces the problem in the daemon log instead of
// tanking every task on that agent.
func filterACPMcpServersByCapability(
servers []any,
caps acpMcpTransportCapabilities,
backend string,
logger *slog.Logger,
) []any {
if len(servers) == 0 {
return servers
}
filtered := make([]any, 0, len(servers))
for _, raw := range servers {
entry, ok := raw.(map[string]any)
if !ok {
filtered = append(filtered, raw)
continue
}
transport, _ := entry["type"].(string)
switch transport {
case "http":
if !caps.HTTP {
if logger != nil {
logger.Warn("dropping http MCP server: runtime did not advertise mcpCapabilities.http",
"backend", backend, "name", entry["name"])
}
continue
}
case "sse":
if !caps.SSE {
if logger != nil {
logger.Warn("dropping sse MCP server: runtime did not advertise mcpCapabilities.sse",
"backend", backend, "name", entry["name"])
}
continue
}
}
filtered = append(filtered, entry)
}
return filtered
}
// hermesToolNameFromTitle extracts a tool name from the ACP tool call title.
// Hermes ACP titles look like "terminal: ls -la", "read: /path/to/file", etc.
// Some titles have no colon (e.g. "execute code").

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
@@ -149,7 +150,7 @@ func TestResolveResumedSessionIDEmptyResponse(t *testing.T) {
func TestBuildHermesSessionParamsIncludesModel(t *testing.T) {
t.Parallel()
params := buildHermesSessionParams("/tmp/work", "gpt-4o")
params := buildHermesSessionParams("/tmp/work", "gpt-4o", nil)
if params["cwd"] != "/tmp/work" {
t.Errorf("cwd: got %v, want /tmp/work", params["cwd"])
}
@@ -163,12 +164,239 @@ func TestBuildHermesSessionParamsIncludesModel(t *testing.T) {
func TestBuildHermesSessionParamsOmitsEmptyModel(t *testing.T) {
t.Parallel()
params := buildHermesSessionParams("/tmp/work", "")
params := buildHermesSessionParams("/tmp/work", "", nil)
if _, present := params["model"]; present {
t.Error("expected model key to be omitted when model is empty")
}
}
func TestBuildHermesSessionParamsPassesThroughMcpServers(t *testing.T) {
t.Parallel()
servers := []any{map[string]any{"name": "fetch", "command": "uvx", "args": []string{}, "env": []map[string]any{}}}
params := buildHermesSessionParams("/tmp/work", "", servers)
got, ok := params["mcpServers"].([]any)
if !ok {
t.Fatalf("mcpServers: got %T, want []any", params["mcpServers"])
}
if len(got) != 1 {
t.Fatalf("len(mcpServers): got %d, want 1", len(got))
}
}
func TestBuildHermesSessionParamsNilMcpServersBecomesEmptyArray(t *testing.T) {
t.Parallel()
// ACP requires the field; nil must surface as `[]` so the wire request
// stays well-formed even when no MCP servers are configured.
params := buildHermesSessionParams("/tmp/work", "", nil)
got, ok := params["mcpServers"].([]any)
if !ok {
t.Fatalf("mcpServers: got %T, want []any", params["mcpServers"])
}
if len(got) != 0 {
t.Errorf("len(mcpServers): got %d, want 0", len(got))
}
}
// ── buildACPMcpServers ──
func TestBuildACPMcpServersEmptyInputReturnsEmpty(t *testing.T) {
t.Parallel()
for _, raw := range []json.RawMessage{nil, {}, json.RawMessage("null"), json.RawMessage(" null "), json.RawMessage("{}"), json.RawMessage(`{"mcpServers":{}}`)} {
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("raw=%q: unexpected error: %v", string(raw), err)
}
if got == nil {
t.Errorf("raw=%q: got nil, want non-nil empty slice", string(raw))
}
if len(got) != 0 {
t.Errorf("raw=%q: got %d entries, want 0", string(raw), len(got))
}
}
}
func TestBuildACPMcpServersTranslatesStdioEntry(t *testing.T) {
t.Parallel()
raw := json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx","args":["mcp-server-fetch"],"env":{"API_KEY":"secret","HOME":"/tmp"}}}}`)
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(got) != 1 {
t.Fatalf("len: got %d, want 1", len(got))
}
entry, ok := got[0].(map[string]any)
if !ok {
t.Fatalf("entry type: got %T, want map[string]any", got[0])
}
if entry["name"] != "fetch" {
t.Errorf("name: got %v, want fetch", entry["name"])
}
if entry["command"] != "uvx" {
t.Errorf("command: got %v, want uvx", entry["command"])
}
if _, hasType := entry["type"]; hasType {
t.Errorf("stdio entry should not include type field, got %v", entry["type"])
}
args, ok := entry["args"].([]string)
if !ok || len(args) != 1 || args[0] != "mcp-server-fetch" {
t.Errorf("args: got %v, want [mcp-server-fetch]", entry["args"])
}
envArr, ok := entry["env"].([]map[string]any)
if !ok {
t.Fatalf("env type: got %T, want []map[string]any", entry["env"])
}
// Env entries sorted by key for determinism.
if len(envArr) != 2 {
t.Fatalf("len(env): got %d, want 2", len(envArr))
}
if envArr[0]["name"] != "API_KEY" || envArr[0]["value"] != "secret" {
t.Errorf("env[0]: got %v, want {name:API_KEY,value:secret}", envArr[0])
}
if envArr[1]["name"] != "HOME" || envArr[1]["value"] != "/tmp" {
t.Errorf("env[1]: got %v, want {name:HOME,value:/tmp}", envArr[1])
}
}
func TestBuildACPMcpServersStdioWithoutArgsOrEnvUsesEmptyArrays(t *testing.T) {
t.Parallel()
// ACP requires args and env to be arrays; missing fields must become
// `[]` rather than null so the wire shape passes server-side validation.
raw := json.RawMessage(`{"mcpServers":{"minimal":{"command":"echo"}}}`)
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
entry := got[0].(map[string]any)
if args, ok := entry["args"].([]string); !ok || len(args) != 0 {
t.Errorf("args: got %v, want []", entry["args"])
}
if env, ok := entry["env"].([]map[string]any); !ok || len(env) != 0 {
t.Errorf("env: got %v, want []", entry["env"])
}
}
func TestBuildACPMcpServersTranslatesHttpEntry(t *testing.T) {
t.Parallel()
raw := json.RawMessage(`{"mcpServers":{"remote":{"type":"http","url":"https://example.com/mcp","headers":{"Authorization":"Bearer x","X-Trace":"abc"}}}}`)
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(got) != 1 {
t.Fatalf("len: got %d, want 1", len(got))
}
entry := got[0].(map[string]any)
if entry["type"] != "http" {
t.Errorf("type: got %v, want http", entry["type"])
}
if entry["name"] != "remote" {
t.Errorf("name: got %v, want remote", entry["name"])
}
if entry["url"] != "https://example.com/mcp" {
t.Errorf("url: got %v, want https://example.com/mcp", entry["url"])
}
headers, ok := entry["headers"].([]map[string]any)
if !ok || len(headers) != 2 {
t.Fatalf("headers: got %v, want 2 entries", entry["headers"])
}
if headers[0]["name"] != "Authorization" {
t.Errorf("headers[0].name: got %v, want Authorization", headers[0]["name"])
}
}
func TestBuildACPMcpServersDefaultsRemoteTypeToHttp(t *testing.T) {
t.Parallel()
// A `url` without `type` should default to "http" rather than be classified
// as stdio or get dropped.
raw := json.RawMessage(`{"mcpServers":{"remote":{"url":"https://example.com/mcp"}}}`)
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
entry := got[0].(map[string]any)
if entry["type"] != "http" {
t.Errorf("type: got %v, want http (default)", entry["type"])
}
}
func TestBuildACPMcpServersSupportsSseTransport(t *testing.T) {
t.Parallel()
raw := json.RawMessage(`{"mcpServers":{"remote":{"type":"sse","url":"https://example.com/sse"}}}`)
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
entry := got[0].(map[string]any)
if entry["type"] != "sse" {
t.Errorf("type: got %v, want sse", entry["type"])
}
}
func TestBuildACPMcpServersAcceptsStreamableHttpAlias(t *testing.T) {
t.Parallel()
// Claude's MCP CLI uses "streamable-http" / "http_streamable" as
// aliases for the http transport; ACP only knows "http", so the
// translator must collapse the alias.
for _, alias := range []string{"streamable-http", "http_streamable", "Streamable-HTTP"} {
raw := json.RawMessage(`{"mcpServers":{"remote":{"type":"` + alias + `","url":"https://example.com/mcp"}}}`)
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("alias=%s: unexpected error: %v", alias, err)
}
entry := got[0].(map[string]any)
if entry["type"] != "http" {
t.Errorf("alias=%s: type got %v, want http", alias, entry["type"])
}
}
}
func TestBuildACPMcpServersSortsEntriesByName(t *testing.T) {
t.Parallel()
// Map iteration is randomized in Go; the translator sorts by name so
// the wire request and test assertions are deterministic.
raw := json.RawMessage(`{"mcpServers":{"zeta":{"command":"z"},"alpha":{"command":"a"},"mid":{"command":"m"}}}`)
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
want := []string{"alpha", "mid", "zeta"}
for i, w := range want {
if got[i].(map[string]any)["name"] != w {
t.Errorf("position %d: got %v, want %s", i, got[i].(map[string]any)["name"], w)
}
}
}
func TestBuildACPMcpServersSkipsInvalidEntriesAndContinues(t *testing.T) {
t.Parallel()
// An entry with neither command nor url is invalid — drop it with a
// warning rather than failing the whole launch, so a single bad entry
// in the agent UI doesn't take MCP down for the rest of the agent.
raw := json.RawMessage(`{"mcpServers":{"bad":{"args":["nothing"]},"good":{"command":"uvx"}}}`)
got, err := buildACPMcpServers(raw, slog.Default())
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(got) != 1 {
t.Fatalf("len: got %d, want 1 (bad entry should be skipped)", len(got))
}
if got[0].(map[string]any)["name"] != "good" {
t.Errorf("kept the wrong entry: %v", got[0])
}
}
func TestBuildACPMcpServersReturnsErrorOnMalformedJSON(t *testing.T) {
t.Parallel()
_, err := buildACPMcpServers(json.RawMessage(`not json`), slog.Default())
if err == nil {
t.Fatal("expected error for malformed JSON, got nil")
}
if !strings.Contains(err.Error(), "parse mcp_config json") {
t.Errorf("error message: got %q, want it to mention parsing", err.Error())
}
}
// ── hermesToolNameFromTitle ──
func TestHermesToolNameFromTitle(t *testing.T) {
@@ -1362,3 +1590,373 @@ func TestHermesBackendDoesNotPromoteOnTransientRetry(t *testing.T) {
t.Fatal("timeout waiting for result")
}
}
// ── extractACPMcpCapabilities ──
func TestExtractACPMcpCapabilities(t *testing.T) {
t.Parallel()
tests := []struct {
name string
raw string
wantHTTP bool
wantSSE bool
}{
{
name: "both true",
raw: `{"protocolVersion":1,"agentCapabilities":{"mcpCapabilities":{"http":true,"sse":true}}}`,
wantHTTP: true,
wantSSE: true,
},
{
name: "http only",
raw: `{"agentCapabilities":{"mcpCapabilities":{"http":true}}}`,
wantHTTP: true,
wantSSE: false,
},
{
name: "sse only",
raw: `{"agentCapabilities":{"mcpCapabilities":{"sse":true}}}`,
wantHTTP: false,
wantSSE: true,
},
{
name: "block missing",
raw: `{"agentCapabilities":{}}`,
wantHTTP: false,
wantSSE: false,
},
{
name: "agentCapabilities missing",
raw: `{"protocolVersion":1}`,
wantHTTP: false,
wantSSE: false,
},
{
name: "malformed json",
raw: `not json`,
wantHTTP: false,
wantSSE: false,
},
}
for _, tc := range tests {
got := extractACPMcpCapabilities(json.RawMessage(tc.raw))
if got.HTTP != tc.wantHTTP || got.SSE != tc.wantSSE {
t.Errorf("%s: got {HTTP:%v SSE:%v}, want {HTTP:%v SSE:%v}", tc.name, got.HTTP, got.SSE, tc.wantHTTP, tc.wantSSE)
}
}
}
// ── filterACPMcpServersByCapability ──
func TestFilterACPMcpServersByCapabilityStdioAlwaysPassesThrough(t *testing.T) {
t.Parallel()
// Stdio entries have no `type` field — the ACP spec doesn't gate stdio,
// so the filter must pass them through regardless of capabilities.
servers := []any{
map[string]any{"name": "fetch", "command": "uvx"},
}
got := filterACPMcpServersByCapability(servers, acpMcpTransportCapabilities{}, "hermes", slog.Default())
if len(got) != 1 {
t.Fatalf("len: got %d, want 1", len(got))
}
}
func TestFilterACPMcpServersByCapabilityDropsUnsupportedHttp(t *testing.T) {
t.Parallel()
servers := []any{
map[string]any{"name": "stdio-ok", "command": "uvx"},
map[string]any{"type": "http", "name": "http-drop", "url": "https://x/mcp"},
map[string]any{"type": "sse", "name": "sse-keep", "url": "https://x/sse"},
}
got := filterACPMcpServersByCapability(servers, acpMcpTransportCapabilities{SSE: true}, "hermes", slog.Default())
if len(got) != 2 {
t.Fatalf("len: got %d, want 2 (http should be dropped, sse kept)", len(got))
}
names := []string{got[0].(map[string]any)["name"].(string), got[1].(map[string]any)["name"].(string)}
wantNames := map[string]bool{"stdio-ok": true, "sse-keep": true}
for _, n := range names {
if !wantNames[n] {
t.Errorf("unexpected entry kept: %q", n)
}
}
}
func TestFilterACPMcpServersByCapabilityDropsUnsupportedSse(t *testing.T) {
t.Parallel()
servers := []any{
map[string]any{"type": "sse", "name": "sse-drop", "url": "https://x/sse"},
map[string]any{"type": "http", "name": "http-keep", "url": "https://x/mcp"},
}
got := filterACPMcpServersByCapability(servers, acpMcpTransportCapabilities{HTTP: true}, "kimi", slog.Default())
if len(got) != 1 {
t.Fatalf("len: got %d, want 1", len(got))
}
if got[0].(map[string]any)["name"] != "http-keep" {
t.Errorf("kept wrong entry: %v", got[0])
}
}
func TestFilterACPMcpServersByCapabilityKeepsAllWhenBothSupported(t *testing.T) {
t.Parallel()
servers := []any{
map[string]any{"name": "stdio", "command": "uvx"},
map[string]any{"type": "http", "name": "http", "url": "https://x/mcp"},
map[string]any{"type": "sse", "name": "sse", "url": "https://x/sse"},
}
got := filterACPMcpServersByCapability(servers, acpMcpTransportCapabilities{HTTP: true, SSE: true}, "kiro", slog.Default())
if len(got) != 3 {
t.Fatalf("len: got %d, want 3", len(got))
}
}
func TestFilterACPMcpServersByCapabilityEmptyInputReturnsEmpty(t *testing.T) {
t.Parallel()
got := filterACPMcpServersByCapability(nil, acpMcpTransportCapabilities{HTTP: true, SSE: true}, "hermes", slog.Default())
if len(got) != 0 {
t.Errorf("len: got %d, want 0", len(got))
}
}
// TestHermesExecuteFailsClosedOnMalformedMcpConfig pins the contract that
// a malformed mcp_config aborts the launch *before* the child is spawned.
// Silently launching with no MCP servers would look indistinguishable
// from "the saved config was applied" and is exactly the surprise the
// MCP Tab is meant to remove.
func TestHermesExecuteFailsClosedOnMalformedMcpConfig(t *testing.T) {
t.Parallel()
// Any existing executable is fine — Execute returns before the spawn.
fakePath := filepath.Join(t.TempDir(), "hermes")
writeTestExecutable(t, fakePath, []byte("#!/bin/sh\nexit 0\n"))
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new hermes backend: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = backend.Execute(ctx, "prompt", ExecOptions{
Timeout: 2 * time.Second,
McpConfig: json.RawMessage(`not json`),
})
if err == nil {
t.Fatal("expected Execute to fail closed on malformed mcp_config, got nil error")
}
if !strings.Contains(err.Error(), "mcp_config") {
t.Fatalf("expected error to mention mcp_config, got %q", err)
}
}
// fakeACPRecordingScript impersonates an ACP agent that records every
// JSON-RPC frame it receives to a file (one per line) before responding.
// The runtime name parameter lets the same script drive Hermes / Kimi /
// Kiro fakes — only the session/load vs session/resume method differs.
//
// `caps` is the JSON for `agentCapabilities` returned from initialize so
// tests can pin the capability gate (e.g. `{"mcpCapabilities":{"http":false}}`).
//
// session/new / session/resume both echo back the requested sessionId so
// tests don't need to thread one through; session/prompt returns
// end_turn so Execute completes cleanly.
func fakeACPRecordingScript(recordPath, sessionID, caps string) string {
return `#!/bin/sh
RECORD_PATH=` + recordPath + `
while IFS= read -r line; do
printf '%s\n' "$line" >> "$RECORD_PATH"
id=$(printf '%s' "$line" | sed -n 's/.*"id":\([0-9]*\).*/\1/p')
case "$line" in
*'"method":"initialize"'*)
printf '{"jsonrpc":"2.0","id":%s,"result":{"protocolVersion":1,"agentCapabilities":` + caps + `}}\n' "$id"
;;
*'"method":"session/new"'*|*'"method":"session/resume"'*|*'"method":"session/load"'*)
printf '{"jsonrpc":"2.0","id":%s,"result":{"sessionId":"` + sessionID + `"}}\n' "$id"
;;
*'"method":"session/prompt"'*)
printf '{"jsonrpc":"2.0","id":%s,"result":{"stopReason":"end_turn"}}\n' "$id"
exit 0
;;
esac
done
`
}
// findRecordedFrame returns the first recorded JSON-RPC frame whose
// `method` matches the requested one. Used by the resume / capability
// tests below to inspect what we actually sent on the wire.
func findRecordedFrame(t *testing.T, recordPath, method string) map[string]any {
t.Helper()
data, err := os.ReadFile(recordPath)
if err != nil {
t.Fatalf("read record file: %v", err)
}
for _, line := range strings.Split(strings.TrimSpace(string(data)), "\n") {
line = strings.TrimSpace(line)
if line == "" {
continue
}
var frame map[string]any
if err := json.Unmarshal([]byte(line), &frame); err != nil {
continue
}
if frame["method"] == method {
return frame
}
}
t.Fatalf("no recorded frame for method %q in %s", method, string(data))
return nil
}
// TestHermesResumeIncludesMcpServers pins the contract that
// session/resume carries the managed MCP set. Without this, a resumed
// Hermes task lost access to MCP tools that a fresh task on the same
// agent would have — which is the inconsistency Elon's review flagged.
func TestHermesResumeIncludesMcpServers(t *testing.T) {
t.Parallel()
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
fakePath := filepath.Join(t.TempDir(), "hermes")
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_resume", `{}`)))
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new hermes backend: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
Timeout: 5 * time.Second,
ResumeSessionID: "ses_resume",
McpConfig: json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`),
})
if err != nil {
t.Fatalf("execute: %v", err)
}
go func() {
for range session.Messages {
}
}()
select {
case <-session.Result:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for result")
}
frame := findRecordedFrame(t, recordPath, "session/resume")
params, ok := frame["params"].(map[string]any)
if !ok {
t.Fatalf("session/resume params: got %T, want map", frame["params"])
}
servers, ok := params["mcpServers"].([]any)
if !ok {
t.Fatalf("session/resume.mcpServers: got %T, want []any", params["mcpServers"])
}
if len(servers) != 1 {
t.Fatalf("session/resume.mcpServers: got %d entries, want 1", len(servers))
}
entry := servers[0].(map[string]any)
if entry["name"] != "fetch" || entry["command"] != "uvx" {
t.Errorf("session/resume.mcpServers[0]: got %v, want {name:fetch,command:uvx,...}", entry)
}
}
// TestHermesDropsRemoteMcpWhenCapabilityNotAdvertised pins the contract
// that when the runtime's initialize response advertises no http/sse
// support, those entries are filtered out of session/new — sending them
// anyway is a protocol violation that reliably tanks the request.
func TestHermesDropsRemoteMcpWhenCapabilityNotAdvertised(t *testing.T) {
t.Parallel()
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
fakePath := filepath.Join(t.TempDir(), "hermes")
// agentCapabilities = {} → neither http nor sse advertised.
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_new", `{}`)))
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new hermes backend: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
Timeout: 5 * time.Second,
McpConfig: json.RawMessage(`{"mcpServers":{
"local":{"command":"uvx"},
"remote-http":{"type":"http","url":"https://x/mcp"},
"remote-sse":{"type":"sse","url":"https://x/sse"}
}}`),
})
if err != nil {
t.Fatalf("execute: %v", err)
}
go func() {
for range session.Messages {
}
}()
select {
case <-session.Result:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for result")
}
frame := findRecordedFrame(t, recordPath, "session/new")
params := frame["params"].(map[string]any)
servers, ok := params["mcpServers"].([]any)
if !ok {
t.Fatalf("session/new.mcpServers: got %T, want []any", params["mcpServers"])
}
if len(servers) != 1 {
t.Fatalf("session/new.mcpServers: got %d entries, want 1 (only stdio should remain)", len(servers))
}
if servers[0].(map[string]any)["name"] != "local" {
t.Errorf("kept the wrong entry: %v", servers[0])
}
}
// TestHermesKeepsRemoteMcpWhenCapabilityAdvertised confirms the gate
// doesn't over-filter: when the runtime advertises http+sse, all entries
// must pass through to session/new.
func TestHermesKeepsRemoteMcpWhenCapabilityAdvertised(t *testing.T) {
t.Parallel()
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
fakePath := filepath.Join(t.TempDir(), "hermes")
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_new", `{"mcpCapabilities":{"http":true,"sse":true}}`)))
backend, err := New("hermes", Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new hermes backend: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
Timeout: 5 * time.Second,
McpConfig: json.RawMessage(`{"mcpServers":{
"local":{"command":"uvx"},
"remote-http":{"type":"http","url":"https://x/mcp"},
"remote-sse":{"type":"sse","url":"https://x/sse"}
}}`),
})
if err != nil {
t.Fatalf("execute: %v", err)
}
go func() {
for range session.Messages {
}
}()
select {
case <-session.Result:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for result")
}
frame := findRecordedFrame(t, recordPath, "session/new")
params := frame["params"].(map[string]any)
servers := params["mcpServers"].([]any)
if len(servers) != 3 {
t.Fatalf("session/new.mcpServers: got %d entries, want 3", len(servers))
}
}

View File

@@ -39,6 +39,15 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
return nil, fmt.Errorf("kimi executable not found at %q: %w", execPath, err)
}
// Translate the agent's mcp_config (Claude-style object of objects)
// into the array shape ACP `session/new` expects. Fail closed on
// malformed JSON so the launch surfaces the real error instead of
// silently dropping all MCP servers.
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
if err != nil {
return nil, fmt.Errorf("kimi: invalid mcp_config: %w", err)
}
timeout := opts.Timeout
if timeout == 0 {
timeout = 20 * time.Minute
@@ -174,7 +183,7 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
var sessionID string
// 1. Initialize handshake.
_, err := c.request(runCtx, "initialize", map[string]any{
initResult, err := c.request(runCtx, "initialize", map[string]any{
"protocolVersion": 1,
"clientInfo": map[string]any{
"name": "multica-agent-sdk",
@@ -189,6 +198,12 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
return
}
// Drop MCP entries whose remote transport the runtime didn't
// advertise. See the matching comment in hermes.go for the why —
// shipping an http/sse entry to a stdio-only runtime tanks the
// whole session/new.
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "kimi", b.cfg.Logger)
// 2. Create or resume a session.
cwd := opts.Cwd
if cwd == "" {
@@ -196,9 +211,14 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
}
if opts.ResumeSessionID != "" {
// Per ACP Session Setup, session/resume accepts mcpServers and
// the runtime re-connects them as part of the resume. Without
// this, a resumed Kimi task lost access to MCP tools that a
// fresh task on the same agent would have.
result, err := c.request(runCtx, "session/resume", map[string]any{
"cwd": cwd,
"sessionId": opts.ResumeSessionID,
"cwd": cwd,
"sessionId": opts.ResumeSessionID,
"mcpServers": mcpServers,
})
if err != nil {
finalStatus = "failed"
@@ -218,7 +238,7 @@ func (b *kimiBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
} else {
result, err := c.request(runCtx, "session/new", map[string]any{
"cwd": cwd,
"mcpServers": []any{},
"mcpServers": mcpServers,
})
if err != nil {
finalStatus = "failed"

View File

@@ -2,6 +2,7 @@ package agent
import (
"context"
"encoding/json"
"log/slog"
"os"
"path/filepath"
@@ -209,3 +210,49 @@ func TestKimiBackendInvokesACPSubcommand(t *testing.T) {
}
}
}
// TestKimiResumeIncludesMcpServers pins the same contract as the matching
// Hermes test: session/resume must carry the managed MCP set so a resumed
// Kimi task has the same MCP tools as a fresh one.
func TestKimiResumeIncludesMcpServers(t *testing.T) {
t.Parallel()
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
fakePath := filepath.Join(t.TempDir(), "kimi")
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_resume", `{}`)))
backend, err := New("kimi", Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new kimi backend: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
Timeout: 5 * time.Second,
ResumeSessionID: "ses_resume",
McpConfig: json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`),
})
if err != nil {
t.Fatalf("execute: %v", err)
}
go func() {
for range session.Messages {
}
}()
select {
case <-session.Result:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for result")
}
frame := findRecordedFrame(t, recordPath, "session/resume")
params := frame["params"].(map[string]any)
servers, ok := params["mcpServers"].([]any)
if !ok {
t.Fatalf("session/resume.mcpServers: got %T, want []any", params["mcpServers"])
}
if len(servers) != 1 || servers[0].(map[string]any)["name"] != "fetch" {
t.Fatalf("session/resume.mcpServers: got %v, want one entry named fetch", servers)
}
}

View File

@@ -44,6 +44,15 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
return nil, fmt.Errorf("kiro executable not found at %q: %w", execPath, err)
}
// Translate the agent's mcp_config (Claude-style object of objects)
// into the array shape ACP `session/new` and `session/load` expect.
// Fail closed on malformed JSON so the launch surfaces the real error
// instead of silently dropping all MCP servers.
mcpServers, err := buildACPMcpServers(opts.McpConfig, b.cfg.Logger)
if err != nil {
return nil, fmt.Errorf("kiro: invalid mcp_config: %w", err)
}
timeout := opts.Timeout
if timeout == 0 {
timeout = 20 * time.Minute
@@ -165,7 +174,7 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
var finalError string
var sessionID string
_, err := c.request(runCtx, "initialize", map[string]any{
initResult, err := c.request(runCtx, "initialize", map[string]any{
"protocolVersion": 1,
"clientInfo": map[string]any{
"name": "multica-agent-sdk",
@@ -180,6 +189,12 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
return
}
// Drop MCP entries whose remote transport the runtime didn't
// advertise. See the matching comment in hermes.go for why
// unconditionally sending http/sse to a stdio-only ACP runtime
// tanks the whole session/new.
mcpServers = filterACPMcpServersByCapability(mcpServers, extractACPMcpCapabilities(initResult), "kiro", b.cfg.Logger)
cwd := opts.Cwd
if cwd == "" {
cwd = "."
@@ -189,7 +204,7 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
result, err := c.request(runCtx, "session/load", map[string]any{
"cwd": cwd,
"sessionId": opts.ResumeSessionID,
"mcpServers": []any{},
"mcpServers": mcpServers,
})
if err != nil {
finalStatus = "failed"
@@ -217,7 +232,7 @@ func (b *kiroBackend) Execute(ctx context.Context, prompt string, opts ExecOptio
} else {
result, err := c.request(runCtx, "session/new", map[string]any{
"cwd": cwd,
"mcpServers": []any{},
"mcpServers": mcpServers,
})
if err != nil {
finalStatus = "failed"

View File

@@ -2,6 +2,7 @@ package agent
import (
"context"
"encoding/json"
"log/slog"
"os"
"path/filepath"
@@ -315,3 +316,50 @@ func TestKiroBackendUsesSessionLoadForResume(t *testing.T) {
t.Fatalf("session/prompt must send standard ACP prompt field for Kiro 2.1.1 compatibility, got:\n%s", requests)
}
}
// TestKiroLoadIncludesMcpServersFromConfig pins that the agent's managed
// MCP set actually reaches the wire on session/load — the resume path is
// otherwise indistinguishable from the no-config case, which is how the
// missing-on-resume bug got past the first round of review.
func TestKiroLoadIncludesMcpServersFromConfig(t *testing.T) {
t.Parallel()
recordPath := filepath.Join(t.TempDir(), "frames.jsonl")
fakePath := filepath.Join(t.TempDir(), "kiro-cli")
writeTestExecutable(t, fakePath, []byte(fakeACPRecordingScript(recordPath, "ses_load", `{}`)))
backend, err := New("kiro", Config{ExecutablePath: fakePath, Logger: slog.Default()})
if err != nil {
t.Fatalf("new kiro backend: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
session, err := backend.Execute(ctx, "prompt-ignored", ExecOptions{
Timeout: 5 * time.Second,
ResumeSessionID: "ses_load",
McpConfig: json.RawMessage(`{"mcpServers":{"fetch":{"command":"uvx"}}}`),
})
if err != nil {
t.Fatalf("execute: %v", err)
}
go func() {
for range session.Messages {
}
}()
select {
case <-session.Result:
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for result")
}
frame := findRecordedFrame(t, recordPath, "session/load")
params := frame["params"].(map[string]any)
servers, ok := params["mcpServers"].([]any)
if !ok {
t.Fatalf("session/load.mcpServers: got %T, want []any", params["mcpServers"])
}
if len(servers) != 1 || servers[0].(map[string]any)["name"] != "fetch" {
t.Fatalf("session/load.mcpServers: got %v, want one entry named fetch", servers)
}
}