mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 10:02:36 +02:00
Compare commits
1 Commits
agent/lamb
...
agent/j/55
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d30515f84 |
@@ -197,11 +197,27 @@ func normalizeAPIBaseURL(raw string) string {
|
||||
return raw
|
||||
}
|
||||
|
||||
// inAgentExecutionContext reports whether the CLI is being invoked from
|
||||
// inside a daemon-managed agent task (daemon sets MULTICA_AGENT_ID and
|
||||
// MULTICA_TASK_ID in the agent env). In that context the workspace must be
|
||||
// provided explicitly by the daemon — falling back to user-global
|
||||
// ~/.multica/config.json would let the agent act on whatever workspace the
|
||||
// user last configured, which is how cross-workspace contamination happens
|
||||
// when multiple workspaces share a host.
|
||||
func inAgentExecutionContext() bool {
|
||||
return os.Getenv("MULTICA_AGENT_ID") != "" || os.Getenv("MULTICA_TASK_ID") != ""
|
||||
}
|
||||
|
||||
func resolveWorkspaceID(cmd *cobra.Command) string {
|
||||
val := cli.FlagOrEnv(cmd, "workspace-id", "MULTICA_WORKSPACE_ID", "")
|
||||
if val != "" {
|
||||
return val
|
||||
}
|
||||
// Inside an agent task the daemon is the only authority on workspace
|
||||
// identity. Never read the user-global CLI config here.
|
||||
if inAgentExecutionContext() {
|
||||
return ""
|
||||
}
|
||||
profile := resolveProfile(cmd)
|
||||
cfg, _ := cli.LoadCLIConfigForProfile(profile)
|
||||
return cfg.WorkspaceID
|
||||
@@ -213,6 +229,9 @@ func resolveWorkspaceID(cmd *cobra.Command) string {
|
||||
func requireWorkspaceID(cmd *cobra.Command) (string, error) {
|
||||
id := resolveWorkspaceID(cmd)
|
||||
if id == "" {
|
||||
if inAgentExecutionContext() {
|
||||
return "", fmt.Errorf("workspace_id is required: MULTICA_WORKSPACE_ID must be set by the daemon in agent execution context (no fallback to user config)")
|
||||
}
|
||||
return "", fmt.Errorf("workspace_id is required: use --workspace-id flag, set MULTICA_WORKSPACE_ID env, or run 'multica config set workspace_id <id>'")
|
||||
}
|
||||
return id, nil
|
||||
|
||||
84
server/cmd/multica/cmd_agent_test.go
Normal file
84
server/cmd/multica/cmd_agent_test.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/cli"
|
||||
)
|
||||
|
||||
// TestResolveWorkspaceID_AgentContextSkipsConfig is a regression test for
|
||||
// the cross-workspace contamination bug (#1235). Inside a daemon-spawned
|
||||
// agent task (MULTICA_AGENT_ID / MULTICA_TASK_ID set), the CLI must NOT
|
||||
// silently read the user-global ~/.multica/config.json to recover a missing
|
||||
// workspace — that fallback is how agent operations leaked into an
|
||||
// unrelated workspace when the daemon failed to inject the right value.
|
||||
//
|
||||
// Outside agent context, the three-level fallback (flag → env → config) is
|
||||
// unchanged.
|
||||
func TestResolveWorkspaceID_AgentContextSkipsConfig(t *testing.T) {
|
||||
t.Setenv("HOME", t.TempDir())
|
||||
|
||||
// Seed the global CLI config with a workspace_id that must NOT be
|
||||
// picked up while running inside an agent task.
|
||||
if err := cli.SaveCLIConfig(cli.CLIConfig{WorkspaceID: "config-file-ws"}); err != nil {
|
||||
t.Fatalf("seed config: %v", err)
|
||||
}
|
||||
|
||||
t.Run("outside agent context falls back to config", func(t *testing.T) {
|
||||
t.Setenv("MULTICA_AGENT_ID", "")
|
||||
t.Setenv("MULTICA_TASK_ID", "")
|
||||
t.Setenv("MULTICA_WORKSPACE_ID", "")
|
||||
|
||||
got := resolveWorkspaceID(testCmd())
|
||||
if got != "config-file-ws" {
|
||||
t.Fatalf("resolveWorkspaceID() = %q, want %q (config fallback)", got, "config-file-ws")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("agent context with explicit env uses env", func(t *testing.T) {
|
||||
t.Setenv("MULTICA_AGENT_ID", "agent-123")
|
||||
t.Setenv("MULTICA_TASK_ID", "task-456")
|
||||
t.Setenv("MULTICA_WORKSPACE_ID", "env-ws")
|
||||
|
||||
got := resolveWorkspaceID(testCmd())
|
||||
if got != "env-ws" {
|
||||
t.Fatalf("resolveWorkspaceID() = %q, want %q (env)", got, "env-ws")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("agent context without env returns empty, never config", func(t *testing.T) {
|
||||
t.Setenv("MULTICA_AGENT_ID", "agent-123")
|
||||
t.Setenv("MULTICA_TASK_ID", "task-456")
|
||||
t.Setenv("MULTICA_WORKSPACE_ID", "")
|
||||
|
||||
got := resolveWorkspaceID(testCmd())
|
||||
if got != "" {
|
||||
t.Fatalf("resolveWorkspaceID() = %q, want empty (no silent config fallback in agent context)", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("task marker alone also counts as agent context", func(t *testing.T) {
|
||||
t.Setenv("MULTICA_AGENT_ID", "")
|
||||
t.Setenv("MULTICA_TASK_ID", "task-456")
|
||||
t.Setenv("MULTICA_WORKSPACE_ID", "")
|
||||
|
||||
if got := resolveWorkspaceID(testCmd()); got != "" {
|
||||
t.Fatalf("resolveWorkspaceID() = %q, want empty", got)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("requireWorkspaceID surfaces agent-context error", func(t *testing.T) {
|
||||
t.Setenv("MULTICA_AGENT_ID", "agent-123")
|
||||
t.Setenv("MULTICA_TASK_ID", "task-456")
|
||||
t.Setenv("MULTICA_WORKSPACE_ID", "")
|
||||
|
||||
_, err := requireWorkspaceID(testCmd())
|
||||
if err == nil {
|
||||
t.Fatal("requireWorkspaceID(): expected error inside agent context with empty env, got nil")
|
||||
}
|
||||
if !strings.Contains(err.Error(), "agent execution context") {
|
||||
t.Fatalf("requireWorkspaceID() error = %q, want it to mention agent execution context", err.Error())
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -890,6 +890,15 @@ func (d *Daemon) handleTask(ctx context.Context, task Task) {
|
||||
}
|
||||
|
||||
func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLog *slog.Logger) (TaskResult, error) {
|
||||
// Refuse to spawn an agent without a workspace. An empty workspace_id
|
||||
// here would make MULTICA_WORKSPACE_ID empty in the agent env, and the
|
||||
// CLI would otherwise silently fall back to the user-global config — a
|
||||
// path that can leak operations into an unrelated workspace when
|
||||
// multiple workspaces share a host.
|
||||
if task.WorkspaceID == "" {
|
||||
return TaskResult{}, fmt.Errorf("refusing to spawn agent: task has no workspace_id (task_id=%s)", task.ID)
|
||||
}
|
||||
|
||||
entry, ok := d.cfg.Agents[provider]
|
||||
if !ok {
|
||||
return TaskResult{}, fmt.Errorf("no agent configured for provider %q", provider)
|
||||
|
||||
@@ -536,10 +536,16 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
|
||||
logClaimEndpointSlow(runtimeID, outcome, start, authMs, claimMs, buildMs)
|
||||
}()
|
||||
|
||||
// Verify the caller owns this runtime's workspace.
|
||||
if _, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID); !ok {
|
||||
// Verify the caller owns this runtime's workspace. The runtime's
|
||||
// workspace_id is the authoritative value a claimed task must match
|
||||
// below — a task whose resolved workspace doesn't equal this runtime's
|
||||
// workspace is rejected even if it was enqueued against this
|
||||
// runtime_id (defense-in-depth against upstream routing bugs).
|
||||
runtime, ok := h.requireDaemonRuntimeAccess(w, r, runtimeID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
runtimeWorkspaceID := uuidToString(runtime.WorkspaceID)
|
||||
authMs = time.Since(start).Milliseconds()
|
||||
|
||||
claimStart := time.Now()
|
||||
@@ -685,6 +691,33 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// Workspace isolation check: the daemon uses this response's workspace_id
|
||||
// as the only authority for MULTICA_WORKSPACE_ID in the agent env. An
|
||||
// empty value would make the CLI silently fall back to the user-global
|
||||
// config and talk to whatever workspace the user happened to last
|
||||
// configure; a value that doesn't match the runtime's workspace means
|
||||
// upstream routed a foreign-workspace task here. Both cases must hard-
|
||||
// fail AND cancel the just-dispatched task so the queue / agent status
|
||||
// don't sit stuck until the stale-task sweeper fires minutes later.
|
||||
if resp.WorkspaceID == "" || resp.WorkspaceID != runtimeWorkspaceID {
|
||||
outcome = "error_workspace"
|
||||
slog.Error("task claim: workspace isolation check failed, cancelling task",
|
||||
"task_id", uuidToString(task.ID),
|
||||
"runtime_id", runtimeID,
|
||||
"runtime_workspace", runtimeWorkspaceID,
|
||||
"resolved_workspace", resp.WorkspaceID,
|
||||
"has_issue", task.IssueID.Valid,
|
||||
"has_chat", task.ChatSessionID.Valid,
|
||||
"has_autopilot_run", task.AutopilotRunID.Valid,
|
||||
)
|
||||
if _, cerr := h.TaskService.CancelTask(r.Context(), task.ID); cerr != nil {
|
||||
slog.Error("task claim: cancel after workspace check failed",
|
||||
"task_id", uuidToString(task.ID), "error", cerr)
|
||||
}
|
||||
writeError(w, http.StatusInternalServerError, "task workspace isolation check failed")
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("task claimed by runtime", "task_id", uuidToString(task.ID), "runtime_id", runtimeID, "agent_id", uuidToString(task.AgentID), "prior_session", resp.PriorSessionID)
|
||||
writeJSON(w, http.StatusOK, map[string]any{"task": resp})
|
||||
}
|
||||
|
||||
@@ -1222,3 +1222,85 @@ func TestClaimTask_AutopilotRunOnly_PopulatesWorkspaceID(t *testing.T) {
|
||||
t.Fatalf("expected workspace_id %q, got %q", testWorkspaceID, resp.Task.WorkspaceID)
|
||||
}
|
||||
}
|
||||
|
||||
// TestClaimTaskByRuntime_TaskWorkspaceMismatch_CancelsAndRejects verifies
|
||||
// the defense-in-depth check in ClaimTaskByRuntime: if a task is somehow
|
||||
// dispatched to a runtime whose workspace doesn't match the task's
|
||||
// resolved workspace (upstream routing / data-integrity bug), the handler
|
||||
// must 500 AND cancel the dispatched task so it doesn't sit in
|
||||
// 'dispatched' until the 5-minute sweeper — which would also leave the
|
||||
// agent stuck reporting 'working' in the UI.
|
||||
func TestClaimTaskByRuntime_TaskWorkspaceMismatch_CancelsAndRejects(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Local agent/runtime (belongs to testWorkspace).
|
||||
var localAgentID, localRuntimeID string
|
||||
if err := testPool.QueryRow(ctx,
|
||||
`SELECT id, runtime_id FROM agent WHERE workspace_id = $1 LIMIT 1`,
|
||||
testWorkspaceID,
|
||||
).Scan(&localAgentID, &localRuntimeID); err != nil {
|
||||
t.Fatalf("setup: get local agent: %v", err)
|
||||
}
|
||||
|
||||
// Foreign workspace with its own issue — what the misrouted task will
|
||||
// resolve to.
|
||||
var foreignWorkspaceID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO workspace (name, slug, description, issue_prefix)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id
|
||||
`, "Mismatch Foreign", "mismatch-foreign-claim", "", "MFC").Scan(&foreignWorkspaceID); err != nil {
|
||||
t.Fatalf("setup: create foreign workspace: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM workspace WHERE id = $1`, foreignWorkspaceID) })
|
||||
|
||||
var foreignIssueID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO issue (workspace_id, title, status, priority, creator_id, creator_type, number, position)
|
||||
VALUES ($1, 'mismatch-foreign-issue', 'todo', 'medium', $2, 'member', 77001, 0)
|
||||
RETURNING id
|
||||
`, foreignWorkspaceID, testUserID).Scan(&foreignIssueID); err != nil {
|
||||
t.Fatalf("setup: create foreign issue: %v", err)
|
||||
}
|
||||
|
||||
// Construct the inconsistent task: runtime_id belongs to testWorkspace,
|
||||
// but issue_id is in foreignWorkspace. This is the data shape a routing
|
||||
// bug would produce.
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority)
|
||||
VALUES ($1, $2, $3, 'queued', 2)
|
||||
RETURNING id
|
||||
`, localAgentID, localRuntimeID, foreignIssueID).Scan(&taskID); err != nil {
|
||||
t.Fatalf("setup: create mismatched task: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newDaemonTokenRequest("POST", "/api/daemon/runtimes/"+localRuntimeID+"/claim", nil,
|
||||
testWorkspaceID, "legit-daemon")
|
||||
rctx := chi.NewRouteContext()
|
||||
rctx.URLParams.Add("runtimeId", localRuntimeID)
|
||||
req = req.WithContext(context.WithValue(req.Context(), chi.RouteCtxKey, rctx))
|
||||
|
||||
testHandler.ClaimTaskByRuntime(w, req)
|
||||
if w.Code != http.StatusInternalServerError {
|
||||
t.Fatalf("ClaimTaskByRuntime (mismatch): expected 500, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
// Task must NOT remain dispatched — it has to be cancelled so the agent
|
||||
// is released immediately rather than stuck until the sweeper fires.
|
||||
var status string
|
||||
if err := testPool.QueryRow(ctx,
|
||||
`SELECT status FROM agent_task_queue WHERE id = $1`, taskID,
|
||||
).Scan(&status); err != nil {
|
||||
t.Fatalf("read task status: %v", err)
|
||||
}
|
||||
if status != "cancelled" {
|
||||
t.Fatalf("ClaimTaskByRuntime (mismatch): expected task status=cancelled, got %q", status)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user