diff --git a/server/internal/handler/workspace.go b/server/internal/handler/workspace.go index ed01afb68..d6ff13f37 100644 --- a/server/internal/handler/workspace.go +++ b/server/internal/handler/workspace.go @@ -572,17 +572,22 @@ func (h *Handler) DeleteMember(w http.ResponseWriter, r *http.Request) { } } - if err := h.Queries.DeleteMember(r.Context(), target.ID); err != nil { + requesterUserID := requestUserID(r) + result, err := h.revokeAndRemoveMember(r.Context(), target.WorkspaceID, target.UserID, target.ID, parseUUID(requesterUserID)) + if err != nil { slog.Warn("delete member failed", append(logger.RequestAttrs(r), "error", err, "member_id", memberID, "workspace_id", workspaceID)...) writeError(w, http.StatusInternalServerError, "failed to delete member") return } + wsIDStr := uuidToString(requester.WorkspaceID) + logRevocation(result, wsIDStr, uuidToString(target.UserID)) + h.publishRevocation(r.Context(), result, wsIDStr, "member", requesterUserID) + slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(target.ID), "workspace_id", workspaceID, "user_id", uuidToString(target.UserID))...) - userID := requestUserID(r) - h.publish(protocol.EventMemberRemoved, uuidToString(requester.WorkspaceID), "member", userID, map[string]any{ + h.publish(protocol.EventMemberRemoved, wsIDStr, "member", requesterUserID, map[string]any{ "member_id": uuidToString(target.ID), - "workspace_id": uuidToString(requester.WorkspaceID), + "workspace_id": wsIDStr, "user_id": uuidToString(target.UserID), }) @@ -608,14 +613,18 @@ func (h *Handler) LeaveWorkspace(w http.ResponseWriter, r *http.Request) { } } - if err := h.Queries.DeleteMember(r.Context(), member.ID); err != nil { + result, err := h.revokeAndRemoveMember(r.Context(), member.WorkspaceID, member.UserID, member.ID, member.UserID) + if err != nil { slog.Warn("leave workspace failed", append(logger.RequestAttrs(r), "error", err, "workspace_id", workspaceID)...) writeError(w, http.StatusInternalServerError, "failed to leave workspace") return } - slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(member.ID), "workspace_id", workspaceID, "user_id", uuidToString(member.UserID))...) userID := requestUserID(r) + logRevocation(result, workspaceID, uuidToString(member.UserID)) + h.publishRevocation(r.Context(), result, workspaceID, "member", userID) + + slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(member.ID), "workspace_id", workspaceID, "user_id", uuidToString(member.UserID))...) h.publish(protocol.EventMemberRemoved, workspaceID, "member", userID, map[string]any{ "member_id": uuidToString(member.ID), "workspace_id": workspaceID, diff --git a/server/internal/handler/workspace_revoke.go b/server/internal/handler/workspace_revoke.go new file mode 100644 index 000000000..0d0ce1dcf --- /dev/null +++ b/server/internal/handler/workspace_revoke.go @@ -0,0 +1,201 @@ +package handler + +import ( + "context" + "log/slog" + + "github.com/jackc/pgx/v5/pgtype" + db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" +) + +// revokeAndRemoveMember converges all server-side state that should follow a +// member leaving a workspace: every runtime they own becomes unusable, every +// agent pinned to one of those runtimes is archived, every in-flight task on +// those runtimes is cancelled (cancelled rather than failed so the daemon's +// per-task status poller interrupts the running agent gracefully), the +// daemon_token rows for those runtimes are deleted, and finally the member row +// itself is removed. +// +// All DB writes run inside a single transaction so a partial revocation never +// leaves the workspace half-converged — e.g. a member who is "gone" but whose +// runtime row is still active. Once the transaction commits, daemon_token +// cache entries are invalidated and events are published (see +// publishRevocation) so connected clients and other workspace members observe +// the new state immediately. +// +// Note on scope: this revokes every runtime whose owner_id matches userID, +// regardless of how the daemon authenticates. Today most daemons fall back to +// PAT/JWT and `daemon_token` rows are unused in production; deleting them is +// a no-op for those daemons but takes effect once the mdt_ flow is live. +// Either way the agent-archive + task-cancel + force-offline writes are the +// actual production safety net: even if the daemon races back online with a +// still-valid PAT, it finds no agent it can run for, no queued task to claim, +// and the dispatcher (which gates on agent.archived_at IS NULL) won't hand it +// new work — and the member-row deletion in the same tx means subsequent +// requireWorkspaceMember checks will reject the daemon's PAT-authenticated +// requests with 404. +// +// archivedBy is the actor who triggered the revocation. For DeleteMember it's +// the requester (the admin doing the kick); for LeaveWorkspace it's the leaver +// themselves. +func (h *Handler) revokeAndRemoveMember(ctx context.Context, workspaceID, userID, memberID, archivedBy pgtype.UUID) (revocationResult, error) { + var empty revocationResult + + tx, err := h.TxStarter.Begin(ctx) + if err != nil { + return empty, err + } + defer tx.Rollback(ctx) + + qtx := h.Queries.WithTx(tx) + + runtimes, err := qtx.ListAgentRuntimesByOwner(ctx, db.ListAgentRuntimesByOwnerParams{ + WorkspaceID: workspaceID, + OwnerID: userID, + }) + if err != nil { + return empty, err + } + + result := revocationResult{Runtimes: runtimes} + + if len(runtimes) > 0 { + runtimeIDs := make([]pgtype.UUID, len(runtimes)) + daemonIDs := make([]string, 0, len(runtimes)) + for i, rt := range runtimes { + runtimeIDs[i] = rt.ID + if rt.DaemonID.Valid && rt.DaemonID.String != "" { + daemonIDs = append(daemonIDs, rt.DaemonID.String) + } + } + + result.ArchivedAgents, err = qtx.ArchiveAgentsByRuntime(ctx, db.ArchiveAgentsByRuntimeParams{ + ArchivedBy: archivedBy, + RuntimeIds: runtimeIDs, + }) + if err != nil { + return empty, err + } + + // Cancel by runtime AND by archived agent. agent.runtime_id can be + // reassigned via UpdateAgent without rewriting the runtime_id on + // historical agent_task_queue rows, so an archived agent may still + // have queued/running tasks pinned to a different runtime — and + // ClaimAgentTask does not gate on agent.archived_at, so those tasks + // would otherwise stay claimable after the agent is gone. + archivedAgentIDs := make([]pgtype.UUID, len(result.ArchivedAgents)) + for i, a := range result.ArchivedAgents { + archivedAgentIDs[i] = a.ID + } + result.CancelledTasks, err = qtx.CancelAgentTasksByRuntimeOrAgent(ctx, db.CancelAgentTasksByRuntimeOrAgentParams{ + RuntimeIds: runtimeIDs, + AgentIds: archivedAgentIDs, + }) + if err != nil { + return empty, err + } + + result.OfflineRuntimeIDs, err = qtx.ForceOfflineRuntimesByIDs(ctx, runtimeIDs) + if err != nil { + return empty, err + } + + if len(daemonIDs) > 0 { + result.RevokedTokenHashes, err = qtx.DeleteDaemonTokensByWorkspaceAndDaemons(ctx, db.DeleteDaemonTokensByWorkspaceAndDaemonsParams{ + WorkspaceID: workspaceID, + DaemonIds: daemonIDs, + }) + if err != nil { + return empty, err + } + } + } + + // Member row deletion lives inside the same tx so a successful revoke is + // never followed by a failed member-delete (which would leave the user + // still a member with a dead runtime), and a failed revoke never leaves + // the user out of the workspace with a still-online runtime. + if err := qtx.DeleteMember(ctx, memberID); err != nil { + return empty, err + } + + if err := tx.Commit(ctx); err != nil { + return empty, err + } + + return result, nil +} + +// revocationResult captures everything revokeMemberRuntimes touched so the +// caller can fan out events and analytics after the transaction commits. +// Publishing inside the transaction would let subscribers observe a state the +// tx might still roll back (see TaskService.BroadcastCancelledTasks docstring). +type revocationResult struct { + Runtimes []db.AgentRuntime + ArchivedAgents []db.Agent + CancelledTasks []db.AgentTaskQueue + OfflineRuntimeIDs []db.ForceOfflineRuntimesByIDsRow + RevokedTokenHashes []string +} + +func (r revocationResult) isEmpty() bool { + return len(r.Runtimes) == 0 +} + +// publishRevocation runs all post-commit side effects: invalidate daemon token +// cache, broadcast task:cancelled with per-agent reconciliation, broadcast +// agent:archived, and signal a runtime-list refresh. Safe to call on an empty +// result — it returns immediately. +func (h *Handler) publishRevocation(ctx context.Context, result revocationResult, workspaceIDStr, actorType, actorIDStr string) { + if result.isEmpty() { + return + } + + for _, hash := range result.RevokedTokenHashes { + h.DaemonTokenCache.Invalidate(ctx, hash) + } + + // Per-task cancellation: TaskService handles status reconciliation and + // per-task event broadcast. Run this before the agent:archived burst so + // subscribers see "task cancelled" before the parent agent disappears + // from active lists, matching the order ArchiveAgent uses. + if h.TaskService != nil && len(result.CancelledTasks) > 0 { + h.TaskService.BroadcastCancelledTasks(ctx, result.CancelledTasks) + } + + for _, agent := range result.ArchivedAgents { + h.publish(protocol.EventAgentArchived, workspaceIDStr, actorType, actorIDStr, map[string]any{ + "agent": agentToResponse(agent), + }) + } + + // Tell connected clients to refresh the runtime list. We piggyback on + // EventDaemonRegister with a "revoke" action — same channel the runtime + // delete handler uses — so the frontend invalidates its cached list + // without us having to introduce a new event type the desktop app would + // need a build to learn about. + if len(result.OfflineRuntimeIDs) > 0 { + h.publish(protocol.EventDaemonRegister, workspaceIDStr, actorType, actorIDStr, map[string]any{ + "action": "revoke", + }) + } +} + +// logRevocation emits a structured info line summarising the revocation. Kept +// separate from publish so the log is identical whether or not the bus is wired. +func logRevocation(result revocationResult, workspaceID, userID string, attrs ...any) { + if result.isEmpty() { + return + } + base := []any{ + "workspace_id", workspaceID, + "user_id", userID, + "runtimes_revoked", len(result.Runtimes), + "agents_archived", len(result.ArchivedAgents), + "tasks_cancelled", len(result.CancelledTasks), + "runtimes_taken_offline", len(result.OfflineRuntimeIDs), + "daemon_tokens_revoked", len(result.RevokedTokenHashes), + } + slog.Info("member runtimes revoked", append(base, attrs...)...) +} diff --git a/server/internal/handler/workspace_test.go b/server/internal/handler/workspace_test.go index cf67632d4..cb52369f3 100644 --- a/server/internal/handler/workspace_test.go +++ b/server/internal/handler/workspace_test.go @@ -2,6 +2,8 @@ package handler import ( "context" + "crypto/sha256" + "encoding/hex" "fmt" "net/http" "net/http/httptest" @@ -128,3 +130,344 @@ VALUES ($1, $2, 'owner') t.Fatal("workspace still exists after owner DELETE") } } + +// revocationFixture is a minimal (workspace, member-to-revoke, runtime, +// agent, queued-task, daemon-token) bundle used to drive the revocation +// tests. The "requester" is always testUserID (owner of the workspace) so +// `newRequest` passes the existing fixtures' auth context unchanged. +type revocationFixture struct { + WorkspaceID string + TargetUserID string + MemberID string + RuntimeID string + AgentID string + TaskID string + DaemonID string + TokenHash string +} + +func setupRevocationFixture(t *testing.T, slug, daemonID string) revocationFixture { + t.Helper() + ctx := context.Background() + + _, _ = testPool.Exec(ctx, `DELETE FROM workspace WHERE slug = $1`, slug) + + var wsID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO workspace (name, slug, description, issue_prefix) +VALUES ($1, $2, $3, $4) +RETURNING id +`, "Revocation "+slug, slug, "revocation test", "REV").Scan(&wsID); err != nil { + t.Fatalf("create workspace: %v", err) + } + + // Requester (= testUserID) is always an owner so DeleteMember authorization + // passes. Two owners total so LeaveWorkspace doesn't trip the "must keep + // at least one owner" guard. + if _, err := testPool.Exec(ctx, ` +INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner') +`, wsID, testUserID); err != nil { + t.Fatalf("create requester member: %v", err) + } + + targetEmail := fmt.Sprintf("revocation-%s@multica.ai", slug) + var targetUserID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO "user" (name, email) VALUES ($1, $2) RETURNING id +`, "Revocation Target "+slug, targetEmail).Scan(&targetUserID); err != nil { + t.Fatalf("create target user: %v", err) + } + + // Cleanup ordering: workspace first (cascade clears agent_runtime, + // agent, member, daemon_token), then user (whose deletion would + // otherwise be blocked by agent.owner_id / agent_runtime.owner_id FKs). + t.Cleanup(func() { + _, _ = testPool.Exec(context.Background(), `DELETE FROM workspace WHERE id = $1`, wsID) + _, _ = testPool.Exec(context.Background(), `DELETE FROM "user" WHERE id = $1`, targetUserID) + }) + + var memberID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner') RETURNING id +`, wsID, targetUserID).Scan(&memberID); err != nil { + t.Fatalf("create target member: %v", err) + } + + var runtimeID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO agent_runtime ( + workspace_id, daemon_id, name, runtime_mode, provider, status, + device_info, metadata, owner_id, last_seen_at +) +VALUES ($1, $2, 'Target Runtime', 'local', 'multica_daemon', 'online', '', '{}'::jsonb, $3, now()) +RETURNING id +`, wsID, daemonID, targetUserID).Scan(&runtimeID); err != nil { + t.Fatalf("insert runtime: %v", err) + } + + var agentID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO agent ( + workspace_id, name, description, runtime_mode, runtime_config, + runtime_id, visibility, max_concurrent_tasks, owner_id +) +VALUES ($1, 'Target Agent', '', 'local', '{}'::jsonb, $2, 'workspace', 1, $3) +RETURNING id +`, wsID, runtimeID, targetUserID).Scan(&agentID); err != nil { + t.Fatalf("insert agent: %v", err) + } + + var taskID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority) +VALUES ($1, $2, 'queued', 0) +RETURNING id +`, agentID, runtimeID).Scan(&taskID); err != nil { + t.Fatalf("insert task: %v", err) + } + + // daemon_token row — paired with the runtime's daemon_id so the + // revocation should sweep its hash up via DeleteDaemonTokensByWorkspaceAndDaemons. + rawToken := "mdt_test_" + slug + sum := sha256.Sum256([]byte(rawToken)) + tokenHash := hex.EncodeToString(sum[:]) + if _, err := testPool.Exec(ctx, ` +INSERT INTO daemon_token (token_hash, workspace_id, daemon_id, expires_at) +VALUES ($1, $2, $3, now() + interval '1 day') +`, tokenHash, wsID, daemonID); err != nil { + t.Fatalf("insert daemon_token: %v", err) + } + + return revocationFixture{ + WorkspaceID: wsID, + TargetUserID: targetUserID, + MemberID: memberID, + RuntimeID: runtimeID, + AgentID: agentID, + TaskID: taskID, + DaemonID: daemonID, + TokenHash: tokenHash, + } +} + +func assertRevoked(t *testing.T, fx revocationFixture) { + t.Helper() + ctx := context.Background() + + var memberExists bool + if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM member WHERE id = $1)`, fx.MemberID).Scan(&memberExists); err != nil { + t.Fatalf("query member: %v", err) + } + if memberExists { + t.Fatal("member row was not deleted") + } + + var runtimeStatus string + if err := testPool.QueryRow(ctx, `SELECT status FROM agent_runtime WHERE id = $1`, fx.RuntimeID).Scan(&runtimeStatus); err != nil { + t.Fatalf("query runtime: %v", err) + } + if runtimeStatus != "offline" { + t.Fatalf("expected runtime offline, got %q", runtimeStatus) + } + + var archivedAt *string + if err := testPool.QueryRow(ctx, `SELECT archived_at::text FROM agent WHERE id = $1`, fx.AgentID).Scan(&archivedAt); err != nil { + t.Fatalf("query agent: %v", err) + } + if archivedAt == nil { + t.Fatal("agent was not archived") + } + + var taskStatus string + if err := testPool.QueryRow(ctx, `SELECT status FROM agent_task_queue WHERE id = $1`, fx.TaskID).Scan(&taskStatus); err != nil { + t.Fatalf("query task: %v", err) + } + if taskStatus != "cancelled" { + t.Fatalf("expected task cancelled, got %q", taskStatus) + } + + var tokenExists bool + if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM daemon_token WHERE token_hash = $1)`, fx.TokenHash).Scan(&tokenExists); err != nil { + t.Fatalf("query daemon_token: %v", err) + } + if tokenExists { + t.Fatal("daemon_token row was not deleted") + } +} + +// TestDeleteMember_RevokesTargetRuntimes verifies that when an admin removes +// another member from a workspace, every runtime owned by the removed member +// has its agents archived, its in-flight tasks cancelled, its row flipped +// offline, and its daemon_token rows deleted — all atomically with the member +// row deletion. +func TestDeleteMember_RevokesTargetRuntimes(t *testing.T) { + fx := setupRevocationFixture(t, "handler-tests-revoke-kick", "daemon-revoke-kick") + + w := httptest.NewRecorder() + req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/members/"+fx.MemberID, nil) + req.Header.Set("X-Workspace-ID", fx.WorkspaceID) + req = withURLParams(req, "id", fx.WorkspaceID, "memberId", fx.MemberID) + testHandler.DeleteMember(w, req) + + if w.Code != http.StatusNoContent { + t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String()) + } + + assertRevoked(t, fx) +} + +// TestLeaveWorkspace_RevokesOwnRuntimes is the self-removal counterpart: when +// a member leaves a workspace voluntarily, their own runtimes are revoked +// with the same atomic write set as DeleteMember. +func TestLeaveWorkspace_RevokesOwnRuntimes(t *testing.T) { + fx := setupRevocationFixture(t, "handler-tests-revoke-leave", "daemon-revoke-leave") + + // Re-target the request from the leaving member's perspective: the + // leaver is the request actor, not the workspace owner. + w := httptest.NewRecorder() + req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/leave", nil) + req.Header.Set("X-User-ID", fx.TargetUserID) + req.Header.Set("X-Workspace-ID", fx.WorkspaceID) + req = withURLParam(req, "id", fx.WorkspaceID) + testHandler.LeaveWorkspace(w, req) + + if w.Code != http.StatusNoContent { + t.Fatalf("LeaveWorkspace: expected 204, got %d: %s", w.Code, w.Body.String()) + } + + assertRevoked(t, fx) +} + +// TestDeleteMember_CancelsTasksFromAgentReassignment covers a subtle +// case: an agent's runtime_id can be changed via UpdateAgent, but +// agent_task_queue.runtime_id keeps the value from when the task was +// queued. So after a leaving member is removed, an agent currently bound +// to their runtime gets archived — but tasks that agent queued under a +// PRIOR runtime (still owned by another active member) keep their old +// runtime_id and would not be caught by a runtime-only sweep. Because +// ClaimAgentTask does not gate on agent.archived_at, those orphaned +// queued tasks would remain claimable. +func TestDeleteMember_CancelsTasksFromAgentReassignment(t *testing.T) { + fx := setupRevocationFixture(t, "handler-tests-revoke-reassign", "daemon-revoke-reassign") + ctx := context.Background() + + // Create a SECOND runtime in the workspace owned by the requester + // (not the leaving member). The agent originally lived here. + var otherRuntimeID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO agent_runtime ( + workspace_id, daemon_id, name, runtime_mode, provider, status, + device_info, metadata, owner_id, last_seen_at +) +VALUES ($1, $2, 'Other Runtime', 'local', 'multica_daemon', 'online', '', '{}'::jsonb, $3, now()) +RETURNING id +`, fx.WorkspaceID, "daemon-revoke-reassign-other", testUserID).Scan(&otherRuntimeID); err != nil { + t.Fatalf("insert other runtime: %v", err) + } + + // Queue a task on the agent while it was still pinned to the OTHER + // runtime (simulating a task created before the agent was reassigned + // to the leaving member's runtime). + var orphanTaskID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority) +VALUES ($1, $2, 'queued', 0) +RETURNING id +`, fx.AgentID, otherRuntimeID).Scan(&orphanTaskID); err != nil { + t.Fatalf("insert orphan task: %v", err) + } + + w := httptest.NewRecorder() + req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/members/"+fx.MemberID, nil) + req.Header.Set("X-Workspace-ID", fx.WorkspaceID) + req = withURLParams(req, "id", fx.WorkspaceID, "memberId", fx.MemberID) + testHandler.DeleteMember(w, req) + + if w.Code != http.StatusNoContent { + t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String()) + } + + assertRevoked(t, fx) + + // The orphan task — same agent, different runtime — must also be + // cancelled. Without the by-agent leg in CancelAgentTasksByRuntimeOrAgent + // this stays 'queued' and would be picked up by the other runtime. + var orphanStatus string + if err := testPool.QueryRow(ctx, `SELECT status FROM agent_task_queue WHERE id = $1`, orphanTaskID).Scan(&orphanStatus); err != nil { + t.Fatalf("query orphan task: %v", err) + } + if orphanStatus != "cancelled" { + t.Fatalf("expected orphan task cancelled (archived agent leftover on other runtime), got %q", orphanStatus) + } + + // And the OTHER runtime — owned by an active member — must still be + // online: revocation is scoped to the leaving member's owned runtimes. + var otherStatus string + if err := testPool.QueryRow(ctx, `SELECT status FROM agent_runtime WHERE id = $1`, otherRuntimeID).Scan(&otherStatus); err != nil { + t.Fatalf("query other runtime: %v", err) + } + if otherStatus != "online" { + t.Fatalf("expected other-member runtime to stay online, got %q", otherStatus) + } +} + +// TestDeleteMember_NoRuntimes_DeletesMember covers the empty-revocation +// path: a member with no owned runtimes should still have their member row +// deleted by the same atomic transaction, with no spurious archive/cancel +// writes. +func TestDeleteMember_NoRuntimes_DeletesMember(t *testing.T) { + ctx := context.Background() + const slug = "handler-tests-revoke-no-runtimes" + _, _ = testPool.Exec(ctx, `DELETE FROM workspace WHERE slug = $1`, slug) + + var wsID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO workspace (name, slug, description, issue_prefix) +VALUES ($1, $2, $3, $4) +RETURNING id +`, "Revocation no runtimes", slug, "revocation no-runtimes test", "REV").Scan(&wsID); err != nil { + t.Fatalf("create workspace: %v", err) + } + + if _, err := testPool.Exec(ctx, ` +INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner') +`, wsID, testUserID); err != nil { + t.Fatalf("create requester member: %v", err) + } + + var targetUserID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO "user" (name, email) VALUES ($1, $2) RETURNING id +`, "Revocation No Runtimes Target", "revocation-no-runtimes@multica.ai").Scan(&targetUserID); err != nil { + t.Fatalf("create target user: %v", err) + } + t.Cleanup(func() { + _, _ = testPool.Exec(context.Background(), `DELETE FROM workspace WHERE id = $1`, wsID) + _, _ = testPool.Exec(context.Background(), `DELETE FROM "user" WHERE id = $1`, targetUserID) + }) + + var memberID string + if err := testPool.QueryRow(ctx, ` +INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'admin') RETURNING id +`, wsID, targetUserID).Scan(&memberID); err != nil { + t.Fatalf("create target member: %v", err) + } + + w := httptest.NewRecorder() + req := newRequest("DELETE", "/api/workspaces/"+wsID+"/members/"+memberID, nil) + req.Header.Set("X-Workspace-ID", wsID) + req = withURLParams(req, "id", wsID, "memberId", memberID) + testHandler.DeleteMember(w, req) + + if w.Code != http.StatusNoContent { + t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String()) + } + + var memberExists bool + if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM member WHERE id = $1)`, memberID).Scan(&memberExists); err != nil { + t.Fatalf("query member: %v", err) + } + if memberExists { + t.Fatal("member row was not deleted") + } +} diff --git a/server/pkg/db/generated/agent.sql.go b/server/pkg/db/generated/agent.sql.go index c44f30b91..924052130 100644 --- a/server/pkg/db/generated/agent.sql.go +++ b/server/pkg/db/generated/agent.sql.go @@ -51,6 +51,64 @@ func (q *Queries) ArchiveAgent(ctx context.Context, arg ArchiveAgentParams) (Age return i, err } +const archiveAgentsByRuntime = `-- name: ArchiveAgentsByRuntime :many +UPDATE agent +SET archived_at = now(), archived_by = $1, updated_at = now() +WHERE runtime_id = ANY($2::uuid[]) AND archived_at IS NULL +RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model +` + +type ArchiveAgentsByRuntimeParams struct { + ArchivedBy pgtype.UUID `json:"archived_by"` + RuntimeIds []pgtype.UUID `json:"runtime_ids"` +} + +// Bulk-archives every active agent bound to any runtime in the given set. +// Used when revoking a leaving member's runtimes so agents pinned to those +// runtimes can no longer be assigned new work. Returns the affected rows so +// the caller can broadcast agent:archived per agent. +func (q *Queries) ArchiveAgentsByRuntime(ctx context.Context, arg ArchiveAgentsByRuntimeParams) ([]Agent, error) { + rows, err := q.db.Query(ctx, archiveAgentsByRuntime, arg.ArchivedBy, arg.RuntimeIds) + if err != nil { + return nil, err + } + defer rows.Close() + items := []Agent{} + for rows.Next() { + var i Agent + if err := rows.Scan( + &i.ID, + &i.WorkspaceID, + &i.Name, + &i.AvatarUrl, + &i.RuntimeMode, + &i.RuntimeConfig, + &i.Visibility, + &i.Status, + &i.MaxConcurrentTasks, + &i.OwnerID, + &i.CreatedAt, + &i.UpdatedAt, + &i.Description, + &i.RuntimeID, + &i.Instructions, + &i.ArchivedAt, + &i.ArchivedBy, + &i.CustomEnv, + &i.CustomArgs, + &i.McpConfig, + &i.Model, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const cancelAgentTask = `-- name: CancelAgentTask :one UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() diff --git a/server/pkg/db/generated/daemon_token.sql.go b/server/pkg/db/generated/daemon_token.sql.go index 6165d6714..61cd912c4 100644 --- a/server/pkg/db/generated/daemon_token.sql.go +++ b/server/pkg/db/generated/daemon_token.sql.go @@ -43,24 +43,43 @@ func (q *Queries) CreateDaemonToken(ctx context.Context, arg CreateDaemonTokenPa return i, err } -const deleteDaemonTokensByWorkspaceAndDaemon = `-- name: DeleteDaemonTokensByWorkspaceAndDaemon :exec +const deleteDaemonTokensByWorkspaceAndDaemons = `-- name: DeleteDaemonTokensByWorkspaceAndDaemons :many DELETE FROM daemon_token -WHERE workspace_id = $1 AND daemon_id = $2 +WHERE workspace_id = $1 + AND daemon_id = ANY($2::text[]) +RETURNING token_hash ` -type DeleteDaemonTokensByWorkspaceAndDaemonParams struct { +type DeleteDaemonTokensByWorkspaceAndDaemonsParams struct { WorkspaceID pgtype.UUID `json:"workspace_id"` - DaemonID string `json:"daemon_id"` + DaemonIds []string `json:"daemon_ids"` } -// Callers MUST also invalidate auth.DaemonTokenCache for each affected -// token_hash so the deletion takes effect before the cache TTL expires. -// Today this query has no caller; when a deregister / rotate flow lands, -// change this to :many RETURNING token_hash and call -// DaemonTokenCache.Invalidate(hash) for each row. -func (q *Queries) DeleteDaemonTokensByWorkspaceAndDaemon(ctx context.Context, arg DeleteDaemonTokensByWorkspaceAndDaemonParams) error { - _, err := q.db.Exec(ctx, deleteDaemonTokensByWorkspaceAndDaemon, arg.WorkspaceID, arg.DaemonID) - return err +// Deletes every daemon_token row matching the (workspace_id, daemon_id) +// pairs implied by `daemon_ids`. Used by the member-revocation flow to +// nuke tokens for all runtimes a leaving member owned in one shot. +// Returns token_hash so the caller can invalidate auth.DaemonTokenCache +// before the 10-minute TTL expires — without that invalidate, a daemon +// can keep using its stale token until cache eviction even though the +// DB row is gone. +func (q *Queries) DeleteDaemonTokensByWorkspaceAndDaemons(ctx context.Context, arg DeleteDaemonTokensByWorkspaceAndDaemonsParams) ([]string, error) { + rows, err := q.db.Query(ctx, deleteDaemonTokensByWorkspaceAndDaemons, arg.WorkspaceID, arg.DaemonIds) + if err != nil { + return nil, err + } + defer rows.Close() + items := []string{} + for rows.Next() { + var token_hash string + if err := rows.Scan(&token_hash); err != nil { + return nil, err + } + items = append(items, token_hash) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil } const deleteExpiredDaemonTokens = `-- name: DeleteExpiredDaemonTokens :exec diff --git a/server/pkg/db/generated/runtime.sql.go b/server/pkg/db/generated/runtime.sql.go index ed6350f4f..f819affe1 100644 --- a/server/pkg/db/generated/runtime.sql.go +++ b/server/pkg/db/generated/runtime.sql.go @@ -11,6 +11,78 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const cancelAgentTasksByRuntimeOrAgent = `-- name: CancelAgentTasksByRuntimeOrAgent :many +UPDATE agent_task_queue +SET status = 'cancelled', completed_at = now() +WHERE (runtime_id = ANY($1::uuid[]) OR agent_id = ANY($2::uuid[])) + AND status IN ('queued', 'dispatched', 'running') +RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session +` + +type CancelAgentTasksByRuntimeOrAgentParams struct { + RuntimeIds []pgtype.UUID `json:"runtime_ids"` + AgentIds []pgtype.UUID `json:"agent_ids"` +} + +// Cancels every active task that either lives on one of the given runtimes +// OR belongs to one of the given agents. Used by the member-revocation flow: +// the runtime-side covers tasks queued against the leaving member's runtimes; +// the agent-side covers tasks pinned to a different runtime that those agents +// left behind from a prior UpdateAgent (agent.runtime_id can change, but +// agent_task_queue.runtime_id does not get rewritten when it does, so a task +// queued on runtime A by agent X — later moved to runtime B — survives the +// runtime-only revoke and could still be claimed because ClaimAgentTask does +// not gate on agent.archived_at). +// +// We use 'cancelled' rather than 'failed' so the daemon's per-task status +// poller (watchTaskCancellation) interrupts the running agent gracefully. +// Returns the affected rows so the caller can broadcast task:cancelled and +// reconcile per-agent status. +func (q *Queries) CancelAgentTasksByRuntimeOrAgent(ctx context.Context, arg CancelAgentTasksByRuntimeOrAgentParams) ([]AgentTaskQueue, error) { + rows, err := q.db.Query(ctx, cancelAgentTasksByRuntimeOrAgent, arg.RuntimeIds, arg.AgentIds) + if err != nil { + return nil, err + } + defer rows.Close() + items := []AgentTaskQueue{} + for rows.Next() { + var i AgentTaskQueue + if err := rows.Scan( + &i.ID, + &i.AgentID, + &i.IssueID, + &i.Status, + &i.Priority, + &i.DispatchedAt, + &i.StartedAt, + &i.CompletedAt, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.Context, + &i.RuntimeID, + &i.SessionID, + &i.WorkDir, + &i.TriggerCommentID, + &i.ChatSessionID, + &i.AutopilotRunID, + &i.Attempt, + &i.MaxAttempts, + &i.ParentTaskID, + &i.FailureReason, + &i.TriggerSummary, + &i.ForceFreshSession, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const countActiveAgentsByRuntime = `-- name: CountActiveAgentsByRuntime :one SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL ` @@ -228,6 +300,53 @@ func (q *Queries) FindLegacyRuntimesByDaemonID(ctx context.Context, arg FindLega return items, nil } +const forceOfflineRuntimesByIDs = `-- name: ForceOfflineRuntimesByIDs :many +UPDATE agent_runtime +SET status = 'offline', updated_at = now() +WHERE id = ANY($1::uuid[]) AND status = 'online' +RETURNING id, workspace_id, owner_id, daemon_id, provider +` + +type ForceOfflineRuntimesByIDsRow struct { + ID pgtype.UUID `json:"id"` + WorkspaceID pgtype.UUID `json:"workspace_id"` + OwnerID pgtype.UUID `json:"owner_id"` + DaemonID pgtype.Text `json:"daemon_id"` + Provider string `json:"provider"` +} + +// Unconditionally flips a known set of runtime IDs to offline. Distinct from +// MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the +// sweeper cannot demote a runtime that just heartbeated): this variant is +// used by intentional revocation paths — e.g. removing a workspace member — +// where the caller has already decided the runtime should be offline +// regardless of recent liveness. +func (q *Queries) ForceOfflineRuntimesByIDs(ctx context.Context, runtimeIds []pgtype.UUID) ([]ForceOfflineRuntimesByIDsRow, error) { + rows, err := q.db.Query(ctx, forceOfflineRuntimesByIDs, runtimeIds) + if err != nil { + return nil, err + } + defer rows.Close() + items := []ForceOfflineRuntimesByIDsRow{} + for rows.Next() { + var i ForceOfflineRuntimesByIDsRow + if err := rows.Scan( + &i.ID, + &i.WorkspaceID, + &i.OwnerID, + &i.DaemonID, + &i.Provider, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const getAgentRuntime = `-- name: GetAgentRuntime :one SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone FROM agent_runtime WHERE id = $1 diff --git a/server/pkg/db/queries/agent.sql b/server/pkg/db/queries/agent.sql index 32f3b1410..188e2546d 100644 --- a/server/pkg/db/queries/agent.sql +++ b/server/pkg/db/queries/agent.sql @@ -54,6 +54,16 @@ UPDATE agent SET archived_at = now(), archived_by = $2, updated_at = now() WHERE id = $1 RETURNING *; +-- name: ArchiveAgentsByRuntime :many +-- Bulk-archives every active agent bound to any runtime in the given set. +-- Used when revoking a leaving member's runtimes so agents pinned to those +-- runtimes can no longer be assigned new work. Returns the affected rows so +-- the caller can broadcast agent:archived per agent. +UPDATE agent +SET archived_at = now(), archived_by = @archived_by, updated_at = now() +WHERE runtime_id = ANY(@runtime_ids::uuid[]) AND archived_at IS NULL +RETURNING *; + -- name: RestoreAgent :one UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now() WHERE id = $1 diff --git a/server/pkg/db/queries/daemon_token.sql b/server/pkg/db/queries/daemon_token.sql index 3544bfea9..ec4a139b5 100644 --- a/server/pkg/db/queries/daemon_token.sql +++ b/server/pkg/db/queries/daemon_token.sql @@ -7,14 +7,18 @@ RETURNING *; SELECT * FROM daemon_token WHERE token_hash = $1 AND expires_at > now(); --- name: DeleteDaemonTokensByWorkspaceAndDaemon :exec --- Callers MUST also invalidate auth.DaemonTokenCache for each affected --- token_hash so the deletion takes effect before the cache TTL expires. --- Today this query has no caller; when a deregister / rotate flow lands, --- change this to :many RETURNING token_hash and call --- DaemonTokenCache.Invalidate(hash) for each row. +-- name: DeleteDaemonTokensByWorkspaceAndDaemons :many +-- Deletes every daemon_token row matching the (workspace_id, daemon_id) +-- pairs implied by `daemon_ids`. Used by the member-revocation flow to +-- nuke tokens for all runtimes a leaving member owned in one shot. +-- Returns token_hash so the caller can invalidate auth.DaemonTokenCache +-- before the 10-minute TTL expires — without that invalidate, a daemon +-- can keep using its stale token until cache eviction even though the +-- DB row is gone. DELETE FROM daemon_token -WHERE workspace_id = $1 AND daemon_id = $2; +WHERE workspace_id = @workspace_id + AND daemon_id = ANY(@daemon_ids::text[]) +RETURNING token_hash; -- name: DeleteExpiredDaemonTokens :exec DELETE FROM daemon_token diff --git a/server/pkg/db/queries/runtime.sql b/server/pkg/db/queries/runtime.sql index 5172bb2d9..350658125 100644 --- a/server/pkg/db/queries/runtime.sql +++ b/server/pkg/db/queries/runtime.sql @@ -197,6 +197,39 @@ SELECT * FROM agent_runtime WHERE workspace_id = $1 AND owner_id = $2 ORDER BY created_at ASC; +-- name: ForceOfflineRuntimesByIDs :many +-- Unconditionally flips a known set of runtime IDs to offline. Distinct from +-- MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the +-- sweeper cannot demote a runtime that just heartbeated): this variant is +-- used by intentional revocation paths — e.g. removing a workspace member — +-- where the caller has already decided the runtime should be offline +-- regardless of recent liveness. +UPDATE agent_runtime +SET status = 'offline', updated_at = now() +WHERE id = ANY(@runtime_ids::uuid[]) AND status = 'online' +RETURNING id, workspace_id, owner_id, daemon_id, provider; + +-- name: CancelAgentTasksByRuntimeOrAgent :many +-- Cancels every active task that either lives on one of the given runtimes +-- OR belongs to one of the given agents. Used by the member-revocation flow: +-- the runtime-side covers tasks queued against the leaving member's runtimes; +-- the agent-side covers tasks pinned to a different runtime that those agents +-- left behind from a prior UpdateAgent (agent.runtime_id can change, but +-- agent_task_queue.runtime_id does not get rewritten when it does, so a task +-- queued on runtime A by agent X — later moved to runtime B — survives the +-- runtime-only revoke and could still be claimed because ClaimAgentTask does +-- not gate on agent.archived_at). +-- +-- We use 'cancelled' rather than 'failed' so the daemon's per-task status +-- poller (watchTaskCancellation) interrupts the running agent gracefully. +-- Returns the affected rows so the caller can broadcast task:cancelled and +-- reconcile per-agent status. +UPDATE agent_task_queue +SET status = 'cancelled', completed_at = now() +WHERE (runtime_id = ANY(@runtime_ids::uuid[]) OR agent_id = ANY(@agent_ids::uuid[])) + AND status IN ('queued', 'dispatched', 'running') +RETURNING *; + -- name: DeleteAgentRuntime :exec DELETE FROM agent_runtime WHERE id = $1;