mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
feat(workspace): revoke a member's runtimes when they leave or are removed (#2401)
* feat(workspace): revoke a member's runtimes when they leave or are removed Previously, leaving or being removed from a workspace only deleted the member row — every runtime the departed user owned in that workspace remained in the DB, kept its daemon_token valid, and stayed reachable to the workspace's other members. The departed user lost access but their machine kept doing work. This change converges the runtime state in the same transaction as the member-row deletion: agents pinned to those runtimes are archived, in-flight tasks are cancelled (so the daemon's per-task status poller interrupts the running agent gracefully), the runtimes are forced offline, and the daemon_token rows are deleted. After commit the DaemonTokenCache is invalidated and agent:archived / daemon:register events fire so connected clients reconcile immediately. Server-side state convergence is the production safety net; the daemon_token revoke takes effect once the mdt_ flow is live (today most daemons fall back to PAT/JWT, and the member-row deletion is what stops those requests via requireWorkspaceMember). Daemon-side handling (recognising the resulting 401/404 and tearing down the local pairing for that workspace) lands in a follow-up. Co-authored-by: multica-agent <github@multica.ai> * fix(workspace): also cancel tasks for archived agents on member revoke CancelAgentTasksByRuntime only matched tasks whose runtime_id was in the revoked set, missing a real path: agent.runtime_id can be reassigned via UpdateAgent, but agent_task_queue.runtime_id keeps the value from when the task was queued. So an agent currently bound to the leaving member's runtime gets archived correctly, but its older tasks still pinned to a prior runtime stay 'queued' — and ClaimAgentTask does not gate on agent.archived_at, so those orphaned tasks remain claimable by the prior runtime. Replace CancelAgentTasksByRuntime with CancelAgentTasksByRuntimeOrAgent, which OR-matches runtime_ids and the archived agent IDs in one UPDATE. Pass the archived agent IDs through from revokeAndRemoveMember. Adds TestDeleteMember_CancelsTasksFromAgentReassignment as a regression guard: same agent, two runtimes, the older task on the surviving runtime must end up cancelled while the surviving runtime stays online. Co-authored-by: multica-agent <github@multica.ai> --------- Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
@@ -572,17 +572,22 @@ func (h *Handler) DeleteMember(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.Queries.DeleteMember(r.Context(), target.ID); err != nil {
|
||||
requesterUserID := requestUserID(r)
|
||||
result, err := h.revokeAndRemoveMember(r.Context(), target.WorkspaceID, target.UserID, target.ID, parseUUID(requesterUserID))
|
||||
if err != nil {
|
||||
slog.Warn("delete member failed", append(logger.RequestAttrs(r), "error", err, "member_id", memberID, "workspace_id", workspaceID)...)
|
||||
writeError(w, http.StatusInternalServerError, "failed to delete member")
|
||||
return
|
||||
}
|
||||
|
||||
wsIDStr := uuidToString(requester.WorkspaceID)
|
||||
logRevocation(result, wsIDStr, uuidToString(target.UserID))
|
||||
h.publishRevocation(r.Context(), result, wsIDStr, "member", requesterUserID)
|
||||
|
||||
slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(target.ID), "workspace_id", workspaceID, "user_id", uuidToString(target.UserID))...)
|
||||
userID := requestUserID(r)
|
||||
h.publish(protocol.EventMemberRemoved, uuidToString(requester.WorkspaceID), "member", userID, map[string]any{
|
||||
h.publish(protocol.EventMemberRemoved, wsIDStr, "member", requesterUserID, map[string]any{
|
||||
"member_id": uuidToString(target.ID),
|
||||
"workspace_id": uuidToString(requester.WorkspaceID),
|
||||
"workspace_id": wsIDStr,
|
||||
"user_id": uuidToString(target.UserID),
|
||||
})
|
||||
|
||||
@@ -608,14 +613,18 @@ func (h *Handler) LeaveWorkspace(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.Queries.DeleteMember(r.Context(), member.ID); err != nil {
|
||||
result, err := h.revokeAndRemoveMember(r.Context(), member.WorkspaceID, member.UserID, member.ID, member.UserID)
|
||||
if err != nil {
|
||||
slog.Warn("leave workspace failed", append(logger.RequestAttrs(r), "error", err, "workspace_id", workspaceID)...)
|
||||
writeError(w, http.StatusInternalServerError, "failed to leave workspace")
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(member.ID), "workspace_id", workspaceID, "user_id", uuidToString(member.UserID))...)
|
||||
userID := requestUserID(r)
|
||||
logRevocation(result, workspaceID, uuidToString(member.UserID))
|
||||
h.publishRevocation(r.Context(), result, workspaceID, "member", userID)
|
||||
|
||||
slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(member.ID), "workspace_id", workspaceID, "user_id", uuidToString(member.UserID))...)
|
||||
h.publish(protocol.EventMemberRemoved, workspaceID, "member", userID, map[string]any{
|
||||
"member_id": uuidToString(member.ID),
|
||||
"workspace_id": workspaceID,
|
||||
|
||||
201
server/internal/handler/workspace_revoke.go
Normal file
201
server/internal/handler/workspace_revoke.go
Normal file
@@ -0,0 +1,201 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// revokeAndRemoveMember converges all server-side state that should follow a
|
||||
// member leaving a workspace: every runtime they own becomes unusable, every
|
||||
// agent pinned to one of those runtimes is archived, every in-flight task on
|
||||
// those runtimes is cancelled (cancelled rather than failed so the daemon's
|
||||
// per-task status poller interrupts the running agent gracefully), the
|
||||
// daemon_token rows for those runtimes are deleted, and finally the member row
|
||||
// itself is removed.
|
||||
//
|
||||
// All DB writes run inside a single transaction so a partial revocation never
|
||||
// leaves the workspace half-converged — e.g. a member who is "gone" but whose
|
||||
// runtime row is still active. Once the transaction commits, daemon_token
|
||||
// cache entries are invalidated and events are published (see
|
||||
// publishRevocation) so connected clients and other workspace members observe
|
||||
// the new state immediately.
|
||||
//
|
||||
// Note on scope: this revokes every runtime whose owner_id matches userID,
|
||||
// regardless of how the daemon authenticates. Today most daemons fall back to
|
||||
// PAT/JWT and `daemon_token` rows are unused in production; deleting them is
|
||||
// a no-op for those daemons but takes effect once the mdt_ flow is live.
|
||||
// Either way the agent-archive + task-cancel + force-offline writes are the
|
||||
// actual production safety net: even if the daemon races back online with a
|
||||
// still-valid PAT, it finds no agent it can run for, no queued task to claim,
|
||||
// and the dispatcher (which gates on agent.archived_at IS NULL) won't hand it
|
||||
// new work — and the member-row deletion in the same tx means subsequent
|
||||
// requireWorkspaceMember checks will reject the daemon's PAT-authenticated
|
||||
// requests with 404.
|
||||
//
|
||||
// archivedBy is the actor who triggered the revocation. For DeleteMember it's
|
||||
// the requester (the admin doing the kick); for LeaveWorkspace it's the leaver
|
||||
// themselves.
|
||||
func (h *Handler) revokeAndRemoveMember(ctx context.Context, workspaceID, userID, memberID, archivedBy pgtype.UUID) (revocationResult, error) {
|
||||
var empty revocationResult
|
||||
|
||||
tx, err := h.TxStarter.Begin(ctx)
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
|
||||
qtx := h.Queries.WithTx(tx)
|
||||
|
||||
runtimes, err := qtx.ListAgentRuntimesByOwner(ctx, db.ListAgentRuntimesByOwnerParams{
|
||||
WorkspaceID: workspaceID,
|
||||
OwnerID: userID,
|
||||
})
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
result := revocationResult{Runtimes: runtimes}
|
||||
|
||||
if len(runtimes) > 0 {
|
||||
runtimeIDs := make([]pgtype.UUID, len(runtimes))
|
||||
daemonIDs := make([]string, 0, len(runtimes))
|
||||
for i, rt := range runtimes {
|
||||
runtimeIDs[i] = rt.ID
|
||||
if rt.DaemonID.Valid && rt.DaemonID.String != "" {
|
||||
daemonIDs = append(daemonIDs, rt.DaemonID.String)
|
||||
}
|
||||
}
|
||||
|
||||
result.ArchivedAgents, err = qtx.ArchiveAgentsByRuntime(ctx, db.ArchiveAgentsByRuntimeParams{
|
||||
ArchivedBy: archivedBy,
|
||||
RuntimeIds: runtimeIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
// Cancel by runtime AND by archived agent. agent.runtime_id can be
|
||||
// reassigned via UpdateAgent without rewriting the runtime_id on
|
||||
// historical agent_task_queue rows, so an archived agent may still
|
||||
// have queued/running tasks pinned to a different runtime — and
|
||||
// ClaimAgentTask does not gate on agent.archived_at, so those tasks
|
||||
// would otherwise stay claimable after the agent is gone.
|
||||
archivedAgentIDs := make([]pgtype.UUID, len(result.ArchivedAgents))
|
||||
for i, a := range result.ArchivedAgents {
|
||||
archivedAgentIDs[i] = a.ID
|
||||
}
|
||||
result.CancelledTasks, err = qtx.CancelAgentTasksByRuntimeOrAgent(ctx, db.CancelAgentTasksByRuntimeOrAgentParams{
|
||||
RuntimeIds: runtimeIDs,
|
||||
AgentIds: archivedAgentIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
result.OfflineRuntimeIDs, err = qtx.ForceOfflineRuntimesByIDs(ctx, runtimeIDs)
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
if len(daemonIDs) > 0 {
|
||||
result.RevokedTokenHashes, err = qtx.DeleteDaemonTokensByWorkspaceAndDaemons(ctx, db.DeleteDaemonTokensByWorkspaceAndDaemonsParams{
|
||||
WorkspaceID: workspaceID,
|
||||
DaemonIds: daemonIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Member row deletion lives inside the same tx so a successful revoke is
|
||||
// never followed by a failed member-delete (which would leave the user
|
||||
// still a member with a dead runtime), and a failed revoke never leaves
|
||||
// the user out of the workspace with a still-online runtime.
|
||||
if err := qtx.DeleteMember(ctx, memberID); err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
if err := tx.Commit(ctx); err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// revocationResult captures everything revokeMemberRuntimes touched so the
|
||||
// caller can fan out events and analytics after the transaction commits.
|
||||
// Publishing inside the transaction would let subscribers observe a state the
|
||||
// tx might still roll back (see TaskService.BroadcastCancelledTasks docstring).
|
||||
type revocationResult struct {
|
||||
Runtimes []db.AgentRuntime
|
||||
ArchivedAgents []db.Agent
|
||||
CancelledTasks []db.AgentTaskQueue
|
||||
OfflineRuntimeIDs []db.ForceOfflineRuntimesByIDsRow
|
||||
RevokedTokenHashes []string
|
||||
}
|
||||
|
||||
func (r revocationResult) isEmpty() bool {
|
||||
return len(r.Runtimes) == 0
|
||||
}
|
||||
|
||||
// publishRevocation runs all post-commit side effects: invalidate daemon token
|
||||
// cache, broadcast task:cancelled with per-agent reconciliation, broadcast
|
||||
// agent:archived, and signal a runtime-list refresh. Safe to call on an empty
|
||||
// result — it returns immediately.
|
||||
func (h *Handler) publishRevocation(ctx context.Context, result revocationResult, workspaceIDStr, actorType, actorIDStr string) {
|
||||
if result.isEmpty() {
|
||||
return
|
||||
}
|
||||
|
||||
for _, hash := range result.RevokedTokenHashes {
|
||||
h.DaemonTokenCache.Invalidate(ctx, hash)
|
||||
}
|
||||
|
||||
// Per-task cancellation: TaskService handles status reconciliation and
|
||||
// per-task event broadcast. Run this before the agent:archived burst so
|
||||
// subscribers see "task cancelled" before the parent agent disappears
|
||||
// from active lists, matching the order ArchiveAgent uses.
|
||||
if h.TaskService != nil && len(result.CancelledTasks) > 0 {
|
||||
h.TaskService.BroadcastCancelledTasks(ctx, result.CancelledTasks)
|
||||
}
|
||||
|
||||
for _, agent := range result.ArchivedAgents {
|
||||
h.publish(protocol.EventAgentArchived, workspaceIDStr, actorType, actorIDStr, map[string]any{
|
||||
"agent": agentToResponse(agent),
|
||||
})
|
||||
}
|
||||
|
||||
// Tell connected clients to refresh the runtime list. We piggyback on
|
||||
// EventDaemonRegister with a "revoke" action — same channel the runtime
|
||||
// delete handler uses — so the frontend invalidates its cached list
|
||||
// without us having to introduce a new event type the desktop app would
|
||||
// need a build to learn about.
|
||||
if len(result.OfflineRuntimeIDs) > 0 {
|
||||
h.publish(protocol.EventDaemonRegister, workspaceIDStr, actorType, actorIDStr, map[string]any{
|
||||
"action": "revoke",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// logRevocation emits a structured info line summarising the revocation. Kept
|
||||
// separate from publish so the log is identical whether or not the bus is wired.
|
||||
func logRevocation(result revocationResult, workspaceID, userID string, attrs ...any) {
|
||||
if result.isEmpty() {
|
||||
return
|
||||
}
|
||||
base := []any{
|
||||
"workspace_id", workspaceID,
|
||||
"user_id", userID,
|
||||
"runtimes_revoked", len(result.Runtimes),
|
||||
"agents_archived", len(result.ArchivedAgents),
|
||||
"tasks_cancelled", len(result.CancelledTasks),
|
||||
"runtimes_taken_offline", len(result.OfflineRuntimeIDs),
|
||||
"daemon_tokens_revoked", len(result.RevokedTokenHashes),
|
||||
}
|
||||
slog.Info("member runtimes revoked", append(base, attrs...)...)
|
||||
}
|
||||
@@ -2,6 +2,8 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -128,3 +130,344 @@ VALUES ($1, $2, 'owner')
|
||||
t.Fatal("workspace still exists after owner DELETE")
|
||||
}
|
||||
}
|
||||
|
||||
// revocationFixture is a minimal (workspace, member-to-revoke, runtime,
|
||||
// agent, queued-task, daemon-token) bundle used to drive the revocation
|
||||
// tests. The "requester" is always testUserID (owner of the workspace) so
|
||||
// `newRequest` passes the existing fixtures' auth context unchanged.
|
||||
type revocationFixture struct {
|
||||
WorkspaceID string
|
||||
TargetUserID string
|
||||
MemberID string
|
||||
RuntimeID string
|
||||
AgentID string
|
||||
TaskID string
|
||||
DaemonID string
|
||||
TokenHash string
|
||||
}
|
||||
|
||||
func setupRevocationFixture(t *testing.T, slug, daemonID string) revocationFixture {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
_, _ = testPool.Exec(ctx, `DELETE FROM workspace WHERE slug = $1`, slug)
|
||||
|
||||
var wsID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO workspace (name, slug, description, issue_prefix)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id
|
||||
`, "Revocation "+slug, slug, "revocation test", "REV").Scan(&wsID); err != nil {
|
||||
t.Fatalf("create workspace: %v", err)
|
||||
}
|
||||
|
||||
// Requester (= testUserID) is always an owner so DeleteMember authorization
|
||||
// passes. Two owners total so LeaveWorkspace doesn't trip the "must keep
|
||||
// at least one owner" guard.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner')
|
||||
`, wsID, testUserID); err != nil {
|
||||
t.Fatalf("create requester member: %v", err)
|
||||
}
|
||||
|
||||
targetEmail := fmt.Sprintf("revocation-%s@multica.ai", slug)
|
||||
var targetUserID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO "user" (name, email) VALUES ($1, $2) RETURNING id
|
||||
`, "Revocation Target "+slug, targetEmail).Scan(&targetUserID); err != nil {
|
||||
t.Fatalf("create target user: %v", err)
|
||||
}
|
||||
|
||||
// Cleanup ordering: workspace first (cascade clears agent_runtime,
|
||||
// agent, member, daemon_token), then user (whose deletion would
|
||||
// otherwise be blocked by agent.owner_id / agent_runtime.owner_id FKs).
|
||||
t.Cleanup(func() {
|
||||
_, _ = testPool.Exec(context.Background(), `DELETE FROM workspace WHERE id = $1`, wsID)
|
||||
_, _ = testPool.Exec(context.Background(), `DELETE FROM "user" WHERE id = $1`, targetUserID)
|
||||
})
|
||||
|
||||
var memberID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner') RETURNING id
|
||||
`, wsID, targetUserID).Scan(&memberID); err != nil {
|
||||
t.Fatalf("create target member: %v", err)
|
||||
}
|
||||
|
||||
var runtimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider, status,
|
||||
device_info, metadata, owner_id, last_seen_at
|
||||
)
|
||||
VALUES ($1, $2, 'Target Runtime', 'local', 'multica_daemon', 'online', '', '{}'::jsonb, $3, now())
|
||||
RETURNING id
|
||||
`, wsID, daemonID, targetUserID).Scan(&runtimeID); err != nil {
|
||||
t.Fatalf("insert runtime: %v", err)
|
||||
}
|
||||
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent (
|
||||
workspace_id, name, description, runtime_mode, runtime_config,
|
||||
runtime_id, visibility, max_concurrent_tasks, owner_id
|
||||
)
|
||||
VALUES ($1, 'Target Agent', '', 'local', '{}'::jsonb, $2, 'workspace', 1, $3)
|
||||
RETURNING id
|
||||
`, wsID, runtimeID, targetUserID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("insert agent: %v", err)
|
||||
}
|
||||
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority)
|
||||
VALUES ($1, $2, 'queued', 0)
|
||||
RETURNING id
|
||||
`, agentID, runtimeID).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
|
||||
// daemon_token row — paired with the runtime's daemon_id so the
|
||||
// revocation should sweep its hash up via DeleteDaemonTokensByWorkspaceAndDaemons.
|
||||
rawToken := "mdt_test_" + slug
|
||||
sum := sha256.Sum256([]byte(rawToken))
|
||||
tokenHash := hex.EncodeToString(sum[:])
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO daemon_token (token_hash, workspace_id, daemon_id, expires_at)
|
||||
VALUES ($1, $2, $3, now() + interval '1 day')
|
||||
`, tokenHash, wsID, daemonID); err != nil {
|
||||
t.Fatalf("insert daemon_token: %v", err)
|
||||
}
|
||||
|
||||
return revocationFixture{
|
||||
WorkspaceID: wsID,
|
||||
TargetUserID: targetUserID,
|
||||
MemberID: memberID,
|
||||
RuntimeID: runtimeID,
|
||||
AgentID: agentID,
|
||||
TaskID: taskID,
|
||||
DaemonID: daemonID,
|
||||
TokenHash: tokenHash,
|
||||
}
|
||||
}
|
||||
|
||||
func assertRevoked(t *testing.T, fx revocationFixture) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
var memberExists bool
|
||||
if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM member WHERE id = $1)`, fx.MemberID).Scan(&memberExists); err != nil {
|
||||
t.Fatalf("query member: %v", err)
|
||||
}
|
||||
if memberExists {
|
||||
t.Fatal("member row was not deleted")
|
||||
}
|
||||
|
||||
var runtimeStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_runtime WHERE id = $1`, fx.RuntimeID).Scan(&runtimeStatus); err != nil {
|
||||
t.Fatalf("query runtime: %v", err)
|
||||
}
|
||||
if runtimeStatus != "offline" {
|
||||
t.Fatalf("expected runtime offline, got %q", runtimeStatus)
|
||||
}
|
||||
|
||||
var archivedAt *string
|
||||
if err := testPool.QueryRow(ctx, `SELECT archived_at::text FROM agent WHERE id = $1`, fx.AgentID).Scan(&archivedAt); err != nil {
|
||||
t.Fatalf("query agent: %v", err)
|
||||
}
|
||||
if archivedAt == nil {
|
||||
t.Fatal("agent was not archived")
|
||||
}
|
||||
|
||||
var taskStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_task_queue WHERE id = $1`, fx.TaskID).Scan(&taskStatus); err != nil {
|
||||
t.Fatalf("query task: %v", err)
|
||||
}
|
||||
if taskStatus != "cancelled" {
|
||||
t.Fatalf("expected task cancelled, got %q", taskStatus)
|
||||
}
|
||||
|
||||
var tokenExists bool
|
||||
if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM daemon_token WHERE token_hash = $1)`, fx.TokenHash).Scan(&tokenExists); err != nil {
|
||||
t.Fatalf("query daemon_token: %v", err)
|
||||
}
|
||||
if tokenExists {
|
||||
t.Fatal("daemon_token row was not deleted")
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeleteMember_RevokesTargetRuntimes verifies that when an admin removes
|
||||
// another member from a workspace, every runtime owned by the removed member
|
||||
// has its agents archived, its in-flight tasks cancelled, its row flipped
|
||||
// offline, and its daemon_token rows deleted — all atomically with the member
|
||||
// row deletion.
|
||||
func TestDeleteMember_RevokesTargetRuntimes(t *testing.T) {
|
||||
fx := setupRevocationFixture(t, "handler-tests-revoke-kick", "daemon-revoke-kick")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/members/"+fx.MemberID, nil)
|
||||
req.Header.Set("X-Workspace-ID", fx.WorkspaceID)
|
||||
req = withURLParams(req, "id", fx.WorkspaceID, "memberId", fx.MemberID)
|
||||
testHandler.DeleteMember(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
assertRevoked(t, fx)
|
||||
}
|
||||
|
||||
// TestLeaveWorkspace_RevokesOwnRuntimes is the self-removal counterpart: when
|
||||
// a member leaves a workspace voluntarily, their own runtimes are revoked
|
||||
// with the same atomic write set as DeleteMember.
|
||||
func TestLeaveWorkspace_RevokesOwnRuntimes(t *testing.T) {
|
||||
fx := setupRevocationFixture(t, "handler-tests-revoke-leave", "daemon-revoke-leave")
|
||||
|
||||
// Re-target the request from the leaving member's perspective: the
|
||||
// leaver is the request actor, not the workspace owner.
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/leave", nil)
|
||||
req.Header.Set("X-User-ID", fx.TargetUserID)
|
||||
req.Header.Set("X-Workspace-ID", fx.WorkspaceID)
|
||||
req = withURLParam(req, "id", fx.WorkspaceID)
|
||||
testHandler.LeaveWorkspace(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("LeaveWorkspace: expected 204, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
assertRevoked(t, fx)
|
||||
}
|
||||
|
||||
// TestDeleteMember_CancelsTasksFromAgentReassignment covers a subtle
|
||||
// case: an agent's runtime_id can be changed via UpdateAgent, but
|
||||
// agent_task_queue.runtime_id keeps the value from when the task was
|
||||
// queued. So after a leaving member is removed, an agent currently bound
|
||||
// to their runtime gets archived — but tasks that agent queued under a
|
||||
// PRIOR runtime (still owned by another active member) keep their old
|
||||
// runtime_id and would not be caught by a runtime-only sweep. Because
|
||||
// ClaimAgentTask does not gate on agent.archived_at, those orphaned
|
||||
// queued tasks would remain claimable.
|
||||
func TestDeleteMember_CancelsTasksFromAgentReassignment(t *testing.T) {
|
||||
fx := setupRevocationFixture(t, "handler-tests-revoke-reassign", "daemon-revoke-reassign")
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a SECOND runtime in the workspace owned by the requester
|
||||
// (not the leaving member). The agent originally lived here.
|
||||
var otherRuntimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider, status,
|
||||
device_info, metadata, owner_id, last_seen_at
|
||||
)
|
||||
VALUES ($1, $2, 'Other Runtime', 'local', 'multica_daemon', 'online', '', '{}'::jsonb, $3, now())
|
||||
RETURNING id
|
||||
`, fx.WorkspaceID, "daemon-revoke-reassign-other", testUserID).Scan(&otherRuntimeID); err != nil {
|
||||
t.Fatalf("insert other runtime: %v", err)
|
||||
}
|
||||
|
||||
// Queue a task on the agent while it was still pinned to the OTHER
|
||||
// runtime (simulating a task created before the agent was reassigned
|
||||
// to the leaving member's runtime).
|
||||
var orphanTaskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority)
|
||||
VALUES ($1, $2, 'queued', 0)
|
||||
RETURNING id
|
||||
`, fx.AgentID, otherRuntimeID).Scan(&orphanTaskID); err != nil {
|
||||
t.Fatalf("insert orphan task: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/members/"+fx.MemberID, nil)
|
||||
req.Header.Set("X-Workspace-ID", fx.WorkspaceID)
|
||||
req = withURLParams(req, "id", fx.WorkspaceID, "memberId", fx.MemberID)
|
||||
testHandler.DeleteMember(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
assertRevoked(t, fx)
|
||||
|
||||
// The orphan task — same agent, different runtime — must also be
|
||||
// cancelled. Without the by-agent leg in CancelAgentTasksByRuntimeOrAgent
|
||||
// this stays 'queued' and would be picked up by the other runtime.
|
||||
var orphanStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_task_queue WHERE id = $1`, orphanTaskID).Scan(&orphanStatus); err != nil {
|
||||
t.Fatalf("query orphan task: %v", err)
|
||||
}
|
||||
if orphanStatus != "cancelled" {
|
||||
t.Fatalf("expected orphan task cancelled (archived agent leftover on other runtime), got %q", orphanStatus)
|
||||
}
|
||||
|
||||
// And the OTHER runtime — owned by an active member — must still be
|
||||
// online: revocation is scoped to the leaving member's owned runtimes.
|
||||
var otherStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_runtime WHERE id = $1`, otherRuntimeID).Scan(&otherStatus); err != nil {
|
||||
t.Fatalf("query other runtime: %v", err)
|
||||
}
|
||||
if otherStatus != "online" {
|
||||
t.Fatalf("expected other-member runtime to stay online, got %q", otherStatus)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeleteMember_NoRuntimes_DeletesMember covers the empty-revocation
|
||||
// path: a member with no owned runtimes should still have their member row
|
||||
// deleted by the same atomic transaction, with no spurious archive/cancel
|
||||
// writes.
|
||||
func TestDeleteMember_NoRuntimes_DeletesMember(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
const slug = "handler-tests-revoke-no-runtimes"
|
||||
_, _ = testPool.Exec(ctx, `DELETE FROM workspace WHERE slug = $1`, slug)
|
||||
|
||||
var wsID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO workspace (name, slug, description, issue_prefix)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id
|
||||
`, "Revocation no runtimes", slug, "revocation no-runtimes test", "REV").Scan(&wsID); err != nil {
|
||||
t.Fatalf("create workspace: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner')
|
||||
`, wsID, testUserID); err != nil {
|
||||
t.Fatalf("create requester member: %v", err)
|
||||
}
|
||||
|
||||
var targetUserID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO "user" (name, email) VALUES ($1, $2) RETURNING id
|
||||
`, "Revocation No Runtimes Target", "revocation-no-runtimes@multica.ai").Scan(&targetUserID); err != nil {
|
||||
t.Fatalf("create target user: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
_, _ = testPool.Exec(context.Background(), `DELETE FROM workspace WHERE id = $1`, wsID)
|
||||
_, _ = testPool.Exec(context.Background(), `DELETE FROM "user" WHERE id = $1`, targetUserID)
|
||||
})
|
||||
|
||||
var memberID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'admin') RETURNING id
|
||||
`, wsID, targetUserID).Scan(&memberID); err != nil {
|
||||
t.Fatalf("create target member: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/workspaces/"+wsID+"/members/"+memberID, nil)
|
||||
req.Header.Set("X-Workspace-ID", wsID)
|
||||
req = withURLParams(req, "id", wsID, "memberId", memberID)
|
||||
testHandler.DeleteMember(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var memberExists bool
|
||||
if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM member WHERE id = $1)`, memberID).Scan(&memberExists); err != nil {
|
||||
t.Fatalf("query member: %v", err)
|
||||
}
|
||||
if memberExists {
|
||||
t.Fatal("member row was not deleted")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,64 @@ func (q *Queries) ArchiveAgent(ctx context.Context, arg ArchiveAgentParams) (Age
|
||||
return i, err
|
||||
}
|
||||
|
||||
const archiveAgentsByRuntime = `-- name: ArchiveAgentsByRuntime :many
|
||||
UPDATE agent
|
||||
SET archived_at = now(), archived_by = $1, updated_at = now()
|
||||
WHERE runtime_id = ANY($2::uuid[]) AND archived_at IS NULL
|
||||
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
||||
`
|
||||
|
||||
type ArchiveAgentsByRuntimeParams struct {
|
||||
ArchivedBy pgtype.UUID `json:"archived_by"`
|
||||
RuntimeIds []pgtype.UUID `json:"runtime_ids"`
|
||||
}
|
||||
|
||||
// Bulk-archives every active agent bound to any runtime in the given set.
|
||||
// Used when revoking a leaving member's runtimes so agents pinned to those
|
||||
// runtimes can no longer be assigned new work. Returns the affected rows so
|
||||
// the caller can broadcast agent:archived per agent.
|
||||
func (q *Queries) ArchiveAgentsByRuntime(ctx context.Context, arg ArchiveAgentsByRuntimeParams) ([]Agent, error) {
|
||||
rows, err := q.db.Query(ctx, archiveAgentsByRuntime, arg.ArchivedBy, arg.RuntimeIds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []Agent{}
|
||||
for rows.Next() {
|
||||
var i Agent
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Name,
|
||||
&i.AvatarUrl,
|
||||
&i.RuntimeMode,
|
||||
&i.RuntimeConfig,
|
||||
&i.Visibility,
|
||||
&i.Status,
|
||||
&i.MaxConcurrentTasks,
|
||||
&i.OwnerID,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.Description,
|
||||
&i.RuntimeID,
|
||||
&i.Instructions,
|
||||
&i.ArchivedAt,
|
||||
&i.ArchivedBy,
|
||||
&i.CustomEnv,
|
||||
&i.CustomArgs,
|
||||
&i.McpConfig,
|
||||
&i.Model,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const cancelAgentTask = `-- name: CancelAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
|
||||
@@ -43,24 +43,43 @@ func (q *Queries) CreateDaemonToken(ctx context.Context, arg CreateDaemonTokenPa
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteDaemonTokensByWorkspaceAndDaemon = `-- name: DeleteDaemonTokensByWorkspaceAndDaemon :exec
|
||||
const deleteDaemonTokensByWorkspaceAndDaemons = `-- name: DeleteDaemonTokensByWorkspaceAndDaemons :many
|
||||
DELETE FROM daemon_token
|
||||
WHERE workspace_id = $1 AND daemon_id = $2
|
||||
WHERE workspace_id = $1
|
||||
AND daemon_id = ANY($2::text[])
|
||||
RETURNING token_hash
|
||||
`
|
||||
|
||||
type DeleteDaemonTokensByWorkspaceAndDaemonParams struct {
|
||||
type DeleteDaemonTokensByWorkspaceAndDaemonsParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
DaemonID string `json:"daemon_id"`
|
||||
DaemonIds []string `json:"daemon_ids"`
|
||||
}
|
||||
|
||||
// Callers MUST also invalidate auth.DaemonTokenCache for each affected
|
||||
// token_hash so the deletion takes effect before the cache TTL expires.
|
||||
// Today this query has no caller; when a deregister / rotate flow lands,
|
||||
// change this to :many RETURNING token_hash and call
|
||||
// DaemonTokenCache.Invalidate(hash) for each row.
|
||||
func (q *Queries) DeleteDaemonTokensByWorkspaceAndDaemon(ctx context.Context, arg DeleteDaemonTokensByWorkspaceAndDaemonParams) error {
|
||||
_, err := q.db.Exec(ctx, deleteDaemonTokensByWorkspaceAndDaemon, arg.WorkspaceID, arg.DaemonID)
|
||||
return err
|
||||
// Deletes every daemon_token row matching the (workspace_id, daemon_id)
|
||||
// pairs implied by `daemon_ids`. Used by the member-revocation flow to
|
||||
// nuke tokens for all runtimes a leaving member owned in one shot.
|
||||
// Returns token_hash so the caller can invalidate auth.DaemonTokenCache
|
||||
// before the 10-minute TTL expires — without that invalidate, a daemon
|
||||
// can keep using its stale token until cache eviction even though the
|
||||
// DB row is gone.
|
||||
func (q *Queries) DeleteDaemonTokensByWorkspaceAndDaemons(ctx context.Context, arg DeleteDaemonTokensByWorkspaceAndDaemonsParams) ([]string, error) {
|
||||
rows, err := q.db.Query(ctx, deleteDaemonTokensByWorkspaceAndDaemons, arg.WorkspaceID, arg.DaemonIds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []string{}
|
||||
for rows.Next() {
|
||||
var token_hash string
|
||||
if err := rows.Scan(&token_hash); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, token_hash)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const deleteExpiredDaemonTokens = `-- name: DeleteExpiredDaemonTokens :exec
|
||||
|
||||
@@ -11,6 +11,78 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const cancelAgentTasksByRuntimeOrAgent = `-- name: CancelAgentTasksByRuntimeOrAgent :many
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE (runtime_id = ANY($1::uuid[]) OR agent_id = ANY($2::uuid[]))
|
||||
AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type CancelAgentTasksByRuntimeOrAgentParams struct {
|
||||
RuntimeIds []pgtype.UUID `json:"runtime_ids"`
|
||||
AgentIds []pgtype.UUID `json:"agent_ids"`
|
||||
}
|
||||
|
||||
// Cancels every active task that either lives on one of the given runtimes
|
||||
// OR belongs to one of the given agents. Used by the member-revocation flow:
|
||||
// the runtime-side covers tasks queued against the leaving member's runtimes;
|
||||
// the agent-side covers tasks pinned to a different runtime that those agents
|
||||
// left behind from a prior UpdateAgent (agent.runtime_id can change, but
|
||||
// agent_task_queue.runtime_id does not get rewritten when it does, so a task
|
||||
// queued on runtime A by agent X — later moved to runtime B — survives the
|
||||
// runtime-only revoke and could still be claimed because ClaimAgentTask does
|
||||
// not gate on agent.archived_at).
|
||||
//
|
||||
// We use 'cancelled' rather than 'failed' so the daemon's per-task status
|
||||
// poller (watchTaskCancellation) interrupts the running agent gracefully.
|
||||
// Returns the affected rows so the caller can broadcast task:cancelled and
|
||||
// reconcile per-agent status.
|
||||
func (q *Queries) CancelAgentTasksByRuntimeOrAgent(ctx context.Context, arg CancelAgentTasksByRuntimeOrAgentParams) ([]AgentTaskQueue, error) {
|
||||
rows, err := q.db.Query(ctx, cancelAgentTasksByRuntimeOrAgent, arg.RuntimeIds, arg.AgentIds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []AgentTaskQueue{}
|
||||
for rows.Next() {
|
||||
var i AgentTaskQueue
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.AgentID,
|
||||
&i.IssueID,
|
||||
&i.Status,
|
||||
&i.Priority,
|
||||
&i.DispatchedAt,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.Result,
|
||||
&i.Error,
|
||||
&i.CreatedAt,
|
||||
&i.Context,
|
||||
&i.RuntimeID,
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.ChatSessionID,
|
||||
&i.AutopilotRunID,
|
||||
&i.Attempt,
|
||||
&i.MaxAttempts,
|
||||
&i.ParentTaskID,
|
||||
&i.FailureReason,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const countActiveAgentsByRuntime = `-- name: CountActiveAgentsByRuntime :one
|
||||
SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL
|
||||
`
|
||||
@@ -228,6 +300,53 @@ func (q *Queries) FindLegacyRuntimesByDaemonID(ctx context.Context, arg FindLega
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const forceOfflineRuntimesByIDs = `-- name: ForceOfflineRuntimesByIDs :many
|
||||
UPDATE agent_runtime
|
||||
SET status = 'offline', updated_at = now()
|
||||
WHERE id = ANY($1::uuid[]) AND status = 'online'
|
||||
RETURNING id, workspace_id, owner_id, daemon_id, provider
|
||||
`
|
||||
|
||||
type ForceOfflineRuntimesByIDsRow struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
OwnerID pgtype.UUID `json:"owner_id"`
|
||||
DaemonID pgtype.Text `json:"daemon_id"`
|
||||
Provider string `json:"provider"`
|
||||
}
|
||||
|
||||
// Unconditionally flips a known set of runtime IDs to offline. Distinct from
|
||||
// MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the
|
||||
// sweeper cannot demote a runtime that just heartbeated): this variant is
|
||||
// used by intentional revocation paths — e.g. removing a workspace member —
|
||||
// where the caller has already decided the runtime should be offline
|
||||
// regardless of recent liveness.
|
||||
func (q *Queries) ForceOfflineRuntimesByIDs(ctx context.Context, runtimeIds []pgtype.UUID) ([]ForceOfflineRuntimesByIDsRow, error) {
|
||||
rows, err := q.db.Query(ctx, forceOfflineRuntimesByIDs, runtimeIds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ForceOfflineRuntimesByIDsRow{}
|
||||
for rows.Next() {
|
||||
var i ForceOfflineRuntimesByIDsRow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.OwnerID,
|
||||
&i.DaemonID,
|
||||
&i.Provider,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getAgentRuntime = `-- name: GetAgentRuntime :one
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone FROM agent_runtime
|
||||
WHERE id = $1
|
||||
|
||||
@@ -54,6 +54,16 @@ UPDATE agent SET archived_at = now(), archived_by = $2, updated_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING *;
|
||||
|
||||
-- name: ArchiveAgentsByRuntime :many
|
||||
-- Bulk-archives every active agent bound to any runtime in the given set.
|
||||
-- Used when revoking a leaving member's runtimes so agents pinned to those
|
||||
-- runtimes can no longer be assigned new work. Returns the affected rows so
|
||||
-- the caller can broadcast agent:archived per agent.
|
||||
UPDATE agent
|
||||
SET archived_at = now(), archived_by = @archived_by, updated_at = now()
|
||||
WHERE runtime_id = ANY(@runtime_ids::uuid[]) AND archived_at IS NULL
|
||||
RETURNING *;
|
||||
|
||||
-- name: RestoreAgent :one
|
||||
UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now()
|
||||
WHERE id = $1
|
||||
|
||||
@@ -7,14 +7,18 @@ RETURNING *;
|
||||
SELECT * FROM daemon_token
|
||||
WHERE token_hash = $1 AND expires_at > now();
|
||||
|
||||
-- name: DeleteDaemonTokensByWorkspaceAndDaemon :exec
|
||||
-- Callers MUST also invalidate auth.DaemonTokenCache for each affected
|
||||
-- token_hash so the deletion takes effect before the cache TTL expires.
|
||||
-- Today this query has no caller; when a deregister / rotate flow lands,
|
||||
-- change this to :many RETURNING token_hash and call
|
||||
-- DaemonTokenCache.Invalidate(hash) for each row.
|
||||
-- name: DeleteDaemonTokensByWorkspaceAndDaemons :many
|
||||
-- Deletes every daemon_token row matching the (workspace_id, daemon_id)
|
||||
-- pairs implied by `daemon_ids`. Used by the member-revocation flow to
|
||||
-- nuke tokens for all runtimes a leaving member owned in one shot.
|
||||
-- Returns token_hash so the caller can invalidate auth.DaemonTokenCache
|
||||
-- before the 10-minute TTL expires — without that invalidate, a daemon
|
||||
-- can keep using its stale token until cache eviction even though the
|
||||
-- DB row is gone.
|
||||
DELETE FROM daemon_token
|
||||
WHERE workspace_id = $1 AND daemon_id = $2;
|
||||
WHERE workspace_id = @workspace_id
|
||||
AND daemon_id = ANY(@daemon_ids::text[])
|
||||
RETURNING token_hash;
|
||||
|
||||
-- name: DeleteExpiredDaemonTokens :exec
|
||||
DELETE FROM daemon_token
|
||||
|
||||
@@ -197,6 +197,39 @@ SELECT * FROM agent_runtime
|
||||
WHERE workspace_id = $1 AND owner_id = $2
|
||||
ORDER BY created_at ASC;
|
||||
|
||||
-- name: ForceOfflineRuntimesByIDs :many
|
||||
-- Unconditionally flips a known set of runtime IDs to offline. Distinct from
|
||||
-- MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the
|
||||
-- sweeper cannot demote a runtime that just heartbeated): this variant is
|
||||
-- used by intentional revocation paths — e.g. removing a workspace member —
|
||||
-- where the caller has already decided the runtime should be offline
|
||||
-- regardless of recent liveness.
|
||||
UPDATE agent_runtime
|
||||
SET status = 'offline', updated_at = now()
|
||||
WHERE id = ANY(@runtime_ids::uuid[]) AND status = 'online'
|
||||
RETURNING id, workspace_id, owner_id, daemon_id, provider;
|
||||
|
||||
-- name: CancelAgentTasksByRuntimeOrAgent :many
|
||||
-- Cancels every active task that either lives on one of the given runtimes
|
||||
-- OR belongs to one of the given agents. Used by the member-revocation flow:
|
||||
-- the runtime-side covers tasks queued against the leaving member's runtimes;
|
||||
-- the agent-side covers tasks pinned to a different runtime that those agents
|
||||
-- left behind from a prior UpdateAgent (agent.runtime_id can change, but
|
||||
-- agent_task_queue.runtime_id does not get rewritten when it does, so a task
|
||||
-- queued on runtime A by agent X — later moved to runtime B — survives the
|
||||
-- runtime-only revoke and could still be claimed because ClaimAgentTask does
|
||||
-- not gate on agent.archived_at).
|
||||
--
|
||||
-- We use 'cancelled' rather than 'failed' so the daemon's per-task status
|
||||
-- poller (watchTaskCancellation) interrupts the running agent gracefully.
|
||||
-- Returns the affected rows so the caller can broadcast task:cancelled and
|
||||
-- reconcile per-agent status.
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE (runtime_id = ANY(@runtime_ids::uuid[]) OR agent_id = ANY(@agent_ids::uuid[]))
|
||||
AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING *;
|
||||
|
||||
-- name: DeleteAgentRuntime :exec
|
||||
DELETE FROM agent_runtime WHERE id = $1;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user