mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-28 01:49:18 +02:00
Compare commits
2 Commits
agent/lamb
...
agent/j/26
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7fd3058a33 | ||
|
|
231d639a47 |
@@ -572,17 +572,22 @@ func (h *Handler) DeleteMember(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.Queries.DeleteMember(r.Context(), target.ID); err != nil {
|
||||
requesterUserID := requestUserID(r)
|
||||
result, err := h.revokeAndRemoveMember(r.Context(), target.WorkspaceID, target.UserID, target.ID, parseUUID(requesterUserID))
|
||||
if err != nil {
|
||||
slog.Warn("delete member failed", append(logger.RequestAttrs(r), "error", err, "member_id", memberID, "workspace_id", workspaceID)...)
|
||||
writeError(w, http.StatusInternalServerError, "failed to delete member")
|
||||
return
|
||||
}
|
||||
|
||||
wsIDStr := uuidToString(requester.WorkspaceID)
|
||||
logRevocation(result, wsIDStr, uuidToString(target.UserID))
|
||||
h.publishRevocation(r.Context(), result, wsIDStr, "member", requesterUserID)
|
||||
|
||||
slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(target.ID), "workspace_id", workspaceID, "user_id", uuidToString(target.UserID))...)
|
||||
userID := requestUserID(r)
|
||||
h.publish(protocol.EventMemberRemoved, uuidToString(requester.WorkspaceID), "member", userID, map[string]any{
|
||||
h.publish(protocol.EventMemberRemoved, wsIDStr, "member", requesterUserID, map[string]any{
|
||||
"member_id": uuidToString(target.ID),
|
||||
"workspace_id": uuidToString(requester.WorkspaceID),
|
||||
"workspace_id": wsIDStr,
|
||||
"user_id": uuidToString(target.UserID),
|
||||
})
|
||||
|
||||
@@ -608,14 +613,18 @@ func (h *Handler) LeaveWorkspace(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
if err := h.Queries.DeleteMember(r.Context(), member.ID); err != nil {
|
||||
result, err := h.revokeAndRemoveMember(r.Context(), member.WorkspaceID, member.UserID, member.ID, member.UserID)
|
||||
if err != nil {
|
||||
slog.Warn("leave workspace failed", append(logger.RequestAttrs(r), "error", err, "workspace_id", workspaceID)...)
|
||||
writeError(w, http.StatusInternalServerError, "failed to leave workspace")
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(member.ID), "workspace_id", workspaceID, "user_id", uuidToString(member.UserID))...)
|
||||
userID := requestUserID(r)
|
||||
logRevocation(result, workspaceID, uuidToString(member.UserID))
|
||||
h.publishRevocation(r.Context(), result, workspaceID, "member", userID)
|
||||
|
||||
slog.Info("member removed", append(logger.RequestAttrs(r), "member_id", uuidToString(member.ID), "workspace_id", workspaceID, "user_id", uuidToString(member.UserID))...)
|
||||
h.publish(protocol.EventMemberRemoved, workspaceID, "member", userID, map[string]any{
|
||||
"member_id": uuidToString(member.ID),
|
||||
"workspace_id": workspaceID,
|
||||
|
||||
201
server/internal/handler/workspace_revoke.go
Normal file
201
server/internal/handler/workspace_revoke.go
Normal file
@@ -0,0 +1,201 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// revokeAndRemoveMember converges all server-side state that should follow a
|
||||
// member leaving a workspace: every runtime they own becomes unusable, every
|
||||
// agent pinned to one of those runtimes is archived, every in-flight task on
|
||||
// those runtimes is cancelled (cancelled rather than failed so the daemon's
|
||||
// per-task status poller interrupts the running agent gracefully), the
|
||||
// daemon_token rows for those runtimes are deleted, and finally the member row
|
||||
// itself is removed.
|
||||
//
|
||||
// All DB writes run inside a single transaction so a partial revocation never
|
||||
// leaves the workspace half-converged — e.g. a member who is "gone" but whose
|
||||
// runtime row is still active. Once the transaction commits, daemon_token
|
||||
// cache entries are invalidated and events are published (see
|
||||
// publishRevocation) so connected clients and other workspace members observe
|
||||
// the new state immediately.
|
||||
//
|
||||
// Note on scope: this revokes every runtime whose owner_id matches userID,
|
||||
// regardless of how the daemon authenticates. Today most daemons fall back to
|
||||
// PAT/JWT and `daemon_token` rows are unused in production; deleting them is
|
||||
// a no-op for those daemons but takes effect once the mdt_ flow is live.
|
||||
// Either way the agent-archive + task-cancel + force-offline writes are the
|
||||
// actual production safety net: even if the daemon races back online with a
|
||||
// still-valid PAT, it finds no agent it can run for, no queued task to claim,
|
||||
// and the dispatcher (which gates on agent.archived_at IS NULL) won't hand it
|
||||
// new work — and the member-row deletion in the same tx means subsequent
|
||||
// requireWorkspaceMember checks will reject the daemon's PAT-authenticated
|
||||
// requests with 404.
|
||||
//
|
||||
// archivedBy is the actor who triggered the revocation. For DeleteMember it's
|
||||
// the requester (the admin doing the kick); for LeaveWorkspace it's the leaver
|
||||
// themselves.
|
||||
func (h *Handler) revokeAndRemoveMember(ctx context.Context, workspaceID, userID, memberID, archivedBy pgtype.UUID) (revocationResult, error) {
|
||||
var empty revocationResult
|
||||
|
||||
tx, err := h.TxStarter.Begin(ctx)
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
defer tx.Rollback(ctx)
|
||||
|
||||
qtx := h.Queries.WithTx(tx)
|
||||
|
||||
runtimes, err := qtx.ListAgentRuntimesByOwner(ctx, db.ListAgentRuntimesByOwnerParams{
|
||||
WorkspaceID: workspaceID,
|
||||
OwnerID: userID,
|
||||
})
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
result := revocationResult{Runtimes: runtimes}
|
||||
|
||||
if len(runtimes) > 0 {
|
||||
runtimeIDs := make([]pgtype.UUID, len(runtimes))
|
||||
daemonIDs := make([]string, 0, len(runtimes))
|
||||
for i, rt := range runtimes {
|
||||
runtimeIDs[i] = rt.ID
|
||||
if rt.DaemonID.Valid && rt.DaemonID.String != "" {
|
||||
daemonIDs = append(daemonIDs, rt.DaemonID.String)
|
||||
}
|
||||
}
|
||||
|
||||
result.ArchivedAgents, err = qtx.ArchiveAgentsByRuntime(ctx, db.ArchiveAgentsByRuntimeParams{
|
||||
ArchivedBy: archivedBy,
|
||||
RuntimeIds: runtimeIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
// Cancel by runtime AND by archived agent. agent.runtime_id can be
|
||||
// reassigned via UpdateAgent without rewriting the runtime_id on
|
||||
// historical agent_task_queue rows, so an archived agent may still
|
||||
// have queued/running tasks pinned to a different runtime — and
|
||||
// ClaimAgentTask does not gate on agent.archived_at, so those tasks
|
||||
// would otherwise stay claimable after the agent is gone.
|
||||
archivedAgentIDs := make([]pgtype.UUID, len(result.ArchivedAgents))
|
||||
for i, a := range result.ArchivedAgents {
|
||||
archivedAgentIDs[i] = a.ID
|
||||
}
|
||||
result.CancelledTasks, err = qtx.CancelAgentTasksByRuntimeOrAgent(ctx, db.CancelAgentTasksByRuntimeOrAgentParams{
|
||||
RuntimeIds: runtimeIDs,
|
||||
AgentIds: archivedAgentIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
result.OfflineRuntimeIDs, err = qtx.ForceOfflineRuntimesByIDs(ctx, runtimeIDs)
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
if len(daemonIDs) > 0 {
|
||||
result.RevokedTokenHashes, err = qtx.DeleteDaemonTokensByWorkspaceAndDaemons(ctx, db.DeleteDaemonTokensByWorkspaceAndDaemonsParams{
|
||||
WorkspaceID: workspaceID,
|
||||
DaemonIds: daemonIDs,
|
||||
})
|
||||
if err != nil {
|
||||
return empty, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Member row deletion lives inside the same tx so a successful revoke is
|
||||
// never followed by a failed member-delete (which would leave the user
|
||||
// still a member with a dead runtime), and a failed revoke never leaves
|
||||
// the user out of the workspace with a still-online runtime.
|
||||
if err := qtx.DeleteMember(ctx, memberID); err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
if err := tx.Commit(ctx); err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// revocationResult captures everything revokeMemberRuntimes touched so the
|
||||
// caller can fan out events and analytics after the transaction commits.
|
||||
// Publishing inside the transaction would let subscribers observe a state the
|
||||
// tx might still roll back (see TaskService.BroadcastCancelledTasks docstring).
|
||||
type revocationResult struct {
|
||||
Runtimes []db.AgentRuntime
|
||||
ArchivedAgents []db.Agent
|
||||
CancelledTasks []db.AgentTaskQueue
|
||||
OfflineRuntimeIDs []db.ForceOfflineRuntimesByIDsRow
|
||||
RevokedTokenHashes []string
|
||||
}
|
||||
|
||||
func (r revocationResult) isEmpty() bool {
|
||||
return len(r.Runtimes) == 0
|
||||
}
|
||||
|
||||
// publishRevocation runs all post-commit side effects: invalidate daemon token
|
||||
// cache, broadcast task:cancelled with per-agent reconciliation, broadcast
|
||||
// agent:archived, and signal a runtime-list refresh. Safe to call on an empty
|
||||
// result — it returns immediately.
|
||||
func (h *Handler) publishRevocation(ctx context.Context, result revocationResult, workspaceIDStr, actorType, actorIDStr string) {
|
||||
if result.isEmpty() {
|
||||
return
|
||||
}
|
||||
|
||||
for _, hash := range result.RevokedTokenHashes {
|
||||
h.DaemonTokenCache.Invalidate(ctx, hash)
|
||||
}
|
||||
|
||||
// Per-task cancellation: TaskService handles status reconciliation and
|
||||
// per-task event broadcast. Run this before the agent:archived burst so
|
||||
// subscribers see "task cancelled" before the parent agent disappears
|
||||
// from active lists, matching the order ArchiveAgent uses.
|
||||
if h.TaskService != nil && len(result.CancelledTasks) > 0 {
|
||||
h.TaskService.BroadcastCancelledTasks(ctx, result.CancelledTasks)
|
||||
}
|
||||
|
||||
for _, agent := range result.ArchivedAgents {
|
||||
h.publish(protocol.EventAgentArchived, workspaceIDStr, actorType, actorIDStr, map[string]any{
|
||||
"agent": agentToResponse(agent),
|
||||
})
|
||||
}
|
||||
|
||||
// Tell connected clients to refresh the runtime list. We piggyback on
|
||||
// EventDaemonRegister with a "revoke" action — same channel the runtime
|
||||
// delete handler uses — so the frontend invalidates its cached list
|
||||
// without us having to introduce a new event type the desktop app would
|
||||
// need a build to learn about.
|
||||
if len(result.OfflineRuntimeIDs) > 0 {
|
||||
h.publish(protocol.EventDaemonRegister, workspaceIDStr, actorType, actorIDStr, map[string]any{
|
||||
"action": "revoke",
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// logRevocation emits a structured info line summarising the revocation. Kept
|
||||
// separate from publish so the log is identical whether or not the bus is wired.
|
||||
func logRevocation(result revocationResult, workspaceID, userID string, attrs ...any) {
|
||||
if result.isEmpty() {
|
||||
return
|
||||
}
|
||||
base := []any{
|
||||
"workspace_id", workspaceID,
|
||||
"user_id", userID,
|
||||
"runtimes_revoked", len(result.Runtimes),
|
||||
"agents_archived", len(result.ArchivedAgents),
|
||||
"tasks_cancelled", len(result.CancelledTasks),
|
||||
"runtimes_taken_offline", len(result.OfflineRuntimeIDs),
|
||||
"daemon_tokens_revoked", len(result.RevokedTokenHashes),
|
||||
}
|
||||
slog.Info("member runtimes revoked", append(base, attrs...)...)
|
||||
}
|
||||
@@ -2,6 +2,8 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -128,3 +130,344 @@ VALUES ($1, $2, 'owner')
|
||||
t.Fatal("workspace still exists after owner DELETE")
|
||||
}
|
||||
}
|
||||
|
||||
// revocationFixture is a minimal (workspace, member-to-revoke, runtime,
|
||||
// agent, queued-task, daemon-token) bundle used to drive the revocation
|
||||
// tests. The "requester" is always testUserID (owner of the workspace) so
|
||||
// `newRequest` passes the existing fixtures' auth context unchanged.
|
||||
type revocationFixture struct {
|
||||
WorkspaceID string
|
||||
TargetUserID string
|
||||
MemberID string
|
||||
RuntimeID string
|
||||
AgentID string
|
||||
TaskID string
|
||||
DaemonID string
|
||||
TokenHash string
|
||||
}
|
||||
|
||||
func setupRevocationFixture(t *testing.T, slug, daemonID string) revocationFixture {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
_, _ = testPool.Exec(ctx, `DELETE FROM workspace WHERE slug = $1`, slug)
|
||||
|
||||
var wsID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO workspace (name, slug, description, issue_prefix)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id
|
||||
`, "Revocation "+slug, slug, "revocation test", "REV").Scan(&wsID); err != nil {
|
||||
t.Fatalf("create workspace: %v", err)
|
||||
}
|
||||
|
||||
// Requester (= testUserID) is always an owner so DeleteMember authorization
|
||||
// passes. Two owners total so LeaveWorkspace doesn't trip the "must keep
|
||||
// at least one owner" guard.
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner')
|
||||
`, wsID, testUserID); err != nil {
|
||||
t.Fatalf("create requester member: %v", err)
|
||||
}
|
||||
|
||||
targetEmail := fmt.Sprintf("revocation-%s@multica.ai", slug)
|
||||
var targetUserID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO "user" (name, email) VALUES ($1, $2) RETURNING id
|
||||
`, "Revocation Target "+slug, targetEmail).Scan(&targetUserID); err != nil {
|
||||
t.Fatalf("create target user: %v", err)
|
||||
}
|
||||
|
||||
// Cleanup ordering: workspace first (cascade clears agent_runtime,
|
||||
// agent, member, daemon_token), then user (whose deletion would
|
||||
// otherwise be blocked by agent.owner_id / agent_runtime.owner_id FKs).
|
||||
t.Cleanup(func() {
|
||||
_, _ = testPool.Exec(context.Background(), `DELETE FROM workspace WHERE id = $1`, wsID)
|
||||
_, _ = testPool.Exec(context.Background(), `DELETE FROM "user" WHERE id = $1`, targetUserID)
|
||||
})
|
||||
|
||||
var memberID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner') RETURNING id
|
||||
`, wsID, targetUserID).Scan(&memberID); err != nil {
|
||||
t.Fatalf("create target member: %v", err)
|
||||
}
|
||||
|
||||
var runtimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider, status,
|
||||
device_info, metadata, owner_id, last_seen_at
|
||||
)
|
||||
VALUES ($1, $2, 'Target Runtime', 'local', 'multica_daemon', 'online', '', '{}'::jsonb, $3, now())
|
||||
RETURNING id
|
||||
`, wsID, daemonID, targetUserID).Scan(&runtimeID); err != nil {
|
||||
t.Fatalf("insert runtime: %v", err)
|
||||
}
|
||||
|
||||
var agentID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent (
|
||||
workspace_id, name, description, runtime_mode, runtime_config,
|
||||
runtime_id, visibility, max_concurrent_tasks, owner_id
|
||||
)
|
||||
VALUES ($1, 'Target Agent', '', 'local', '{}'::jsonb, $2, 'workspace', 1, $3)
|
||||
RETURNING id
|
||||
`, wsID, runtimeID, targetUserID).Scan(&agentID); err != nil {
|
||||
t.Fatalf("insert agent: %v", err)
|
||||
}
|
||||
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority)
|
||||
VALUES ($1, $2, 'queued', 0)
|
||||
RETURNING id
|
||||
`, agentID, runtimeID).Scan(&taskID); err != nil {
|
||||
t.Fatalf("insert task: %v", err)
|
||||
}
|
||||
|
||||
// daemon_token row — paired with the runtime's daemon_id so the
|
||||
// revocation should sweep its hash up via DeleteDaemonTokensByWorkspaceAndDaemons.
|
||||
rawToken := "mdt_test_" + slug
|
||||
sum := sha256.Sum256([]byte(rawToken))
|
||||
tokenHash := hex.EncodeToString(sum[:])
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO daemon_token (token_hash, workspace_id, daemon_id, expires_at)
|
||||
VALUES ($1, $2, $3, now() + interval '1 day')
|
||||
`, tokenHash, wsID, daemonID); err != nil {
|
||||
t.Fatalf("insert daemon_token: %v", err)
|
||||
}
|
||||
|
||||
return revocationFixture{
|
||||
WorkspaceID: wsID,
|
||||
TargetUserID: targetUserID,
|
||||
MemberID: memberID,
|
||||
RuntimeID: runtimeID,
|
||||
AgentID: agentID,
|
||||
TaskID: taskID,
|
||||
DaemonID: daemonID,
|
||||
TokenHash: tokenHash,
|
||||
}
|
||||
}
|
||||
|
||||
func assertRevoked(t *testing.T, fx revocationFixture) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
var memberExists bool
|
||||
if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM member WHERE id = $1)`, fx.MemberID).Scan(&memberExists); err != nil {
|
||||
t.Fatalf("query member: %v", err)
|
||||
}
|
||||
if memberExists {
|
||||
t.Fatal("member row was not deleted")
|
||||
}
|
||||
|
||||
var runtimeStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_runtime WHERE id = $1`, fx.RuntimeID).Scan(&runtimeStatus); err != nil {
|
||||
t.Fatalf("query runtime: %v", err)
|
||||
}
|
||||
if runtimeStatus != "offline" {
|
||||
t.Fatalf("expected runtime offline, got %q", runtimeStatus)
|
||||
}
|
||||
|
||||
var archivedAt *string
|
||||
if err := testPool.QueryRow(ctx, `SELECT archived_at::text FROM agent WHERE id = $1`, fx.AgentID).Scan(&archivedAt); err != nil {
|
||||
t.Fatalf("query agent: %v", err)
|
||||
}
|
||||
if archivedAt == nil {
|
||||
t.Fatal("agent was not archived")
|
||||
}
|
||||
|
||||
var taskStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_task_queue WHERE id = $1`, fx.TaskID).Scan(&taskStatus); err != nil {
|
||||
t.Fatalf("query task: %v", err)
|
||||
}
|
||||
if taskStatus != "cancelled" {
|
||||
t.Fatalf("expected task cancelled, got %q", taskStatus)
|
||||
}
|
||||
|
||||
var tokenExists bool
|
||||
if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM daemon_token WHERE token_hash = $1)`, fx.TokenHash).Scan(&tokenExists); err != nil {
|
||||
t.Fatalf("query daemon_token: %v", err)
|
||||
}
|
||||
if tokenExists {
|
||||
t.Fatal("daemon_token row was not deleted")
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeleteMember_RevokesTargetRuntimes verifies that when an admin removes
|
||||
// another member from a workspace, every runtime owned by the removed member
|
||||
// has its agents archived, its in-flight tasks cancelled, its row flipped
|
||||
// offline, and its daemon_token rows deleted — all atomically with the member
|
||||
// row deletion.
|
||||
func TestDeleteMember_RevokesTargetRuntimes(t *testing.T) {
|
||||
fx := setupRevocationFixture(t, "handler-tests-revoke-kick", "daemon-revoke-kick")
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/members/"+fx.MemberID, nil)
|
||||
req.Header.Set("X-Workspace-ID", fx.WorkspaceID)
|
||||
req = withURLParams(req, "id", fx.WorkspaceID, "memberId", fx.MemberID)
|
||||
testHandler.DeleteMember(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
assertRevoked(t, fx)
|
||||
}
|
||||
|
||||
// TestLeaveWorkspace_RevokesOwnRuntimes is the self-removal counterpart: when
|
||||
// a member leaves a workspace voluntarily, their own runtimes are revoked
|
||||
// with the same atomic write set as DeleteMember.
|
||||
func TestLeaveWorkspace_RevokesOwnRuntimes(t *testing.T) {
|
||||
fx := setupRevocationFixture(t, "handler-tests-revoke-leave", "daemon-revoke-leave")
|
||||
|
||||
// Re-target the request from the leaving member's perspective: the
|
||||
// leaver is the request actor, not the workspace owner.
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/leave", nil)
|
||||
req.Header.Set("X-User-ID", fx.TargetUserID)
|
||||
req.Header.Set("X-Workspace-ID", fx.WorkspaceID)
|
||||
req = withURLParam(req, "id", fx.WorkspaceID)
|
||||
testHandler.LeaveWorkspace(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("LeaveWorkspace: expected 204, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
assertRevoked(t, fx)
|
||||
}
|
||||
|
||||
// TestDeleteMember_CancelsTasksFromAgentReassignment covers a subtle
|
||||
// case: an agent's runtime_id can be changed via UpdateAgent, but
|
||||
// agent_task_queue.runtime_id keeps the value from when the task was
|
||||
// queued. So after a leaving member is removed, an agent currently bound
|
||||
// to their runtime gets archived — but tasks that agent queued under a
|
||||
// PRIOR runtime (still owned by another active member) keep their old
|
||||
// runtime_id and would not be caught by a runtime-only sweep. Because
|
||||
// ClaimAgentTask does not gate on agent.archived_at, those orphaned
|
||||
// queued tasks would remain claimable.
|
||||
func TestDeleteMember_CancelsTasksFromAgentReassignment(t *testing.T) {
|
||||
fx := setupRevocationFixture(t, "handler-tests-revoke-reassign", "daemon-revoke-reassign")
|
||||
ctx := context.Background()
|
||||
|
||||
// Create a SECOND runtime in the workspace owned by the requester
|
||||
// (not the leaving member). The agent originally lived here.
|
||||
var otherRuntimeID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_runtime (
|
||||
workspace_id, daemon_id, name, runtime_mode, provider, status,
|
||||
device_info, metadata, owner_id, last_seen_at
|
||||
)
|
||||
VALUES ($1, $2, 'Other Runtime', 'local', 'multica_daemon', 'online', '', '{}'::jsonb, $3, now())
|
||||
RETURNING id
|
||||
`, fx.WorkspaceID, "daemon-revoke-reassign-other", testUserID).Scan(&otherRuntimeID); err != nil {
|
||||
t.Fatalf("insert other runtime: %v", err)
|
||||
}
|
||||
|
||||
// Queue a task on the agent while it was still pinned to the OTHER
|
||||
// runtime (simulating a task created before the agent was reassigned
|
||||
// to the leaving member's runtime).
|
||||
var orphanTaskID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority)
|
||||
VALUES ($1, $2, 'queued', 0)
|
||||
RETURNING id
|
||||
`, fx.AgentID, otherRuntimeID).Scan(&orphanTaskID); err != nil {
|
||||
t.Fatalf("insert orphan task: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/workspaces/"+fx.WorkspaceID+"/members/"+fx.MemberID, nil)
|
||||
req.Header.Set("X-Workspace-ID", fx.WorkspaceID)
|
||||
req = withURLParams(req, "id", fx.WorkspaceID, "memberId", fx.MemberID)
|
||||
testHandler.DeleteMember(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
assertRevoked(t, fx)
|
||||
|
||||
// The orphan task — same agent, different runtime — must also be
|
||||
// cancelled. Without the by-agent leg in CancelAgentTasksByRuntimeOrAgent
|
||||
// this stays 'queued' and would be picked up by the other runtime.
|
||||
var orphanStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_task_queue WHERE id = $1`, orphanTaskID).Scan(&orphanStatus); err != nil {
|
||||
t.Fatalf("query orphan task: %v", err)
|
||||
}
|
||||
if orphanStatus != "cancelled" {
|
||||
t.Fatalf("expected orphan task cancelled (archived agent leftover on other runtime), got %q", orphanStatus)
|
||||
}
|
||||
|
||||
// And the OTHER runtime — owned by an active member — must still be
|
||||
// online: revocation is scoped to the leaving member's owned runtimes.
|
||||
var otherStatus string
|
||||
if err := testPool.QueryRow(ctx, `SELECT status FROM agent_runtime WHERE id = $1`, otherRuntimeID).Scan(&otherStatus); err != nil {
|
||||
t.Fatalf("query other runtime: %v", err)
|
||||
}
|
||||
if otherStatus != "online" {
|
||||
t.Fatalf("expected other-member runtime to stay online, got %q", otherStatus)
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeleteMember_NoRuntimes_DeletesMember covers the empty-revocation
|
||||
// path: a member with no owned runtimes should still have their member row
|
||||
// deleted by the same atomic transaction, with no spurious archive/cancel
|
||||
// writes.
|
||||
func TestDeleteMember_NoRuntimes_DeletesMember(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
const slug = "handler-tests-revoke-no-runtimes"
|
||||
_, _ = testPool.Exec(ctx, `DELETE FROM workspace WHERE slug = $1`, slug)
|
||||
|
||||
var wsID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO workspace (name, slug, description, issue_prefix)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
RETURNING id
|
||||
`, "Revocation no runtimes", slug, "revocation no-runtimes test", "REV").Scan(&wsID); err != nil {
|
||||
t.Fatalf("create workspace: %v", err)
|
||||
}
|
||||
|
||||
if _, err := testPool.Exec(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'owner')
|
||||
`, wsID, testUserID); err != nil {
|
||||
t.Fatalf("create requester member: %v", err)
|
||||
}
|
||||
|
||||
var targetUserID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO "user" (name, email) VALUES ($1, $2) RETURNING id
|
||||
`, "Revocation No Runtimes Target", "revocation-no-runtimes@multica.ai").Scan(&targetUserID); err != nil {
|
||||
t.Fatalf("create target user: %v", err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
_, _ = testPool.Exec(context.Background(), `DELETE FROM workspace WHERE id = $1`, wsID)
|
||||
_, _ = testPool.Exec(context.Background(), `DELETE FROM "user" WHERE id = $1`, targetUserID)
|
||||
})
|
||||
|
||||
var memberID string
|
||||
if err := testPool.QueryRow(ctx, `
|
||||
INSERT INTO member (workspace_id, user_id, role) VALUES ($1, $2, 'admin') RETURNING id
|
||||
`, wsID, targetUserID).Scan(&memberID); err != nil {
|
||||
t.Fatalf("create target member: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
req := newRequest("DELETE", "/api/workspaces/"+wsID+"/members/"+memberID, nil)
|
||||
req.Header.Set("X-Workspace-ID", wsID)
|
||||
req = withURLParams(req, "id", wsID, "memberId", memberID)
|
||||
testHandler.DeleteMember(w, req)
|
||||
|
||||
if w.Code != http.StatusNoContent {
|
||||
t.Fatalf("DeleteMember: expected 204, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
|
||||
var memberExists bool
|
||||
if err := testPool.QueryRow(ctx, `SELECT EXISTS (SELECT 1 FROM member WHERE id = $1)`, memberID).Scan(&memberExists); err != nil {
|
||||
t.Fatalf("query member: %v", err)
|
||||
}
|
||||
if memberExists {
|
||||
t.Fatal("member row was not deleted")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,6 +51,64 @@ func (q *Queries) ArchiveAgent(ctx context.Context, arg ArchiveAgentParams) (Age
|
||||
return i, err
|
||||
}
|
||||
|
||||
const archiveAgentsByRuntime = `-- name: ArchiveAgentsByRuntime :many
|
||||
UPDATE agent
|
||||
SET archived_at = now(), archived_by = $1, updated_at = now()
|
||||
WHERE runtime_id = ANY($2::uuid[]) AND archived_at IS NULL
|
||||
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
||||
`
|
||||
|
||||
type ArchiveAgentsByRuntimeParams struct {
|
||||
ArchivedBy pgtype.UUID `json:"archived_by"`
|
||||
RuntimeIds []pgtype.UUID `json:"runtime_ids"`
|
||||
}
|
||||
|
||||
// Bulk-archives every active agent bound to any runtime in the given set.
|
||||
// Used when revoking a leaving member's runtimes so agents pinned to those
|
||||
// runtimes can no longer be assigned new work. Returns the affected rows so
|
||||
// the caller can broadcast agent:archived per agent.
|
||||
func (q *Queries) ArchiveAgentsByRuntime(ctx context.Context, arg ArchiveAgentsByRuntimeParams) ([]Agent, error) {
|
||||
rows, err := q.db.Query(ctx, archiveAgentsByRuntime, arg.ArchivedBy, arg.RuntimeIds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []Agent{}
|
||||
for rows.Next() {
|
||||
var i Agent
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Name,
|
||||
&i.AvatarUrl,
|
||||
&i.RuntimeMode,
|
||||
&i.RuntimeConfig,
|
||||
&i.Visibility,
|
||||
&i.Status,
|
||||
&i.MaxConcurrentTasks,
|
||||
&i.OwnerID,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
&i.Description,
|
||||
&i.RuntimeID,
|
||||
&i.Instructions,
|
||||
&i.ArchivedAt,
|
||||
&i.ArchivedBy,
|
||||
&i.CustomEnv,
|
||||
&i.CustomArgs,
|
||||
&i.McpConfig,
|
||||
&i.Model,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const cancelAgentTask = `-- name: CancelAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
|
||||
@@ -43,24 +43,43 @@ func (q *Queries) CreateDaemonToken(ctx context.Context, arg CreateDaemonTokenPa
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteDaemonTokensByWorkspaceAndDaemon = `-- name: DeleteDaemonTokensByWorkspaceAndDaemon :exec
|
||||
const deleteDaemonTokensByWorkspaceAndDaemons = `-- name: DeleteDaemonTokensByWorkspaceAndDaemons :many
|
||||
DELETE FROM daemon_token
|
||||
WHERE workspace_id = $1 AND daemon_id = $2
|
||||
WHERE workspace_id = $1
|
||||
AND daemon_id = ANY($2::text[])
|
||||
RETURNING token_hash
|
||||
`
|
||||
|
||||
type DeleteDaemonTokensByWorkspaceAndDaemonParams struct {
|
||||
type DeleteDaemonTokensByWorkspaceAndDaemonsParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
DaemonID string `json:"daemon_id"`
|
||||
DaemonIds []string `json:"daemon_ids"`
|
||||
}
|
||||
|
||||
// Callers MUST also invalidate auth.DaemonTokenCache for each affected
|
||||
// token_hash so the deletion takes effect before the cache TTL expires.
|
||||
// Today this query has no caller; when a deregister / rotate flow lands,
|
||||
// change this to :many RETURNING token_hash and call
|
||||
// DaemonTokenCache.Invalidate(hash) for each row.
|
||||
func (q *Queries) DeleteDaemonTokensByWorkspaceAndDaemon(ctx context.Context, arg DeleteDaemonTokensByWorkspaceAndDaemonParams) error {
|
||||
_, err := q.db.Exec(ctx, deleteDaemonTokensByWorkspaceAndDaemon, arg.WorkspaceID, arg.DaemonID)
|
||||
return err
|
||||
// Deletes every daemon_token row matching the (workspace_id, daemon_id)
|
||||
// pairs implied by `daemon_ids`. Used by the member-revocation flow to
|
||||
// nuke tokens for all runtimes a leaving member owned in one shot.
|
||||
// Returns token_hash so the caller can invalidate auth.DaemonTokenCache
|
||||
// before the 10-minute TTL expires — without that invalidate, a daemon
|
||||
// can keep using its stale token until cache eviction even though the
|
||||
// DB row is gone.
|
||||
func (q *Queries) DeleteDaemonTokensByWorkspaceAndDaemons(ctx context.Context, arg DeleteDaemonTokensByWorkspaceAndDaemonsParams) ([]string, error) {
|
||||
rows, err := q.db.Query(ctx, deleteDaemonTokensByWorkspaceAndDaemons, arg.WorkspaceID, arg.DaemonIds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []string{}
|
||||
for rows.Next() {
|
||||
var token_hash string
|
||||
if err := rows.Scan(&token_hash); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, token_hash)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const deleteExpiredDaemonTokens = `-- name: DeleteExpiredDaemonTokens :exec
|
||||
|
||||
@@ -11,6 +11,78 @@ import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const cancelAgentTasksByRuntimeOrAgent = `-- name: CancelAgentTasksByRuntimeOrAgent :many
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE (runtime_id = ANY($1::uuid[]) OR agent_id = ANY($2::uuid[]))
|
||||
AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
||||
`
|
||||
|
||||
type CancelAgentTasksByRuntimeOrAgentParams struct {
|
||||
RuntimeIds []pgtype.UUID `json:"runtime_ids"`
|
||||
AgentIds []pgtype.UUID `json:"agent_ids"`
|
||||
}
|
||||
|
||||
// Cancels every active task that either lives on one of the given runtimes
|
||||
// OR belongs to one of the given agents. Used by the member-revocation flow:
|
||||
// the runtime-side covers tasks queued against the leaving member's runtimes;
|
||||
// the agent-side covers tasks pinned to a different runtime that those agents
|
||||
// left behind from a prior UpdateAgent (agent.runtime_id can change, but
|
||||
// agent_task_queue.runtime_id does not get rewritten when it does, so a task
|
||||
// queued on runtime A by agent X — later moved to runtime B — survives the
|
||||
// runtime-only revoke and could still be claimed because ClaimAgentTask does
|
||||
// not gate on agent.archived_at).
|
||||
//
|
||||
// We use 'cancelled' rather than 'failed' so the daemon's per-task status
|
||||
// poller (watchTaskCancellation) interrupts the running agent gracefully.
|
||||
// Returns the affected rows so the caller can broadcast task:cancelled and
|
||||
// reconcile per-agent status.
|
||||
func (q *Queries) CancelAgentTasksByRuntimeOrAgent(ctx context.Context, arg CancelAgentTasksByRuntimeOrAgentParams) ([]AgentTaskQueue, error) {
|
||||
rows, err := q.db.Query(ctx, cancelAgentTasksByRuntimeOrAgent, arg.RuntimeIds, arg.AgentIds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []AgentTaskQueue{}
|
||||
for rows.Next() {
|
||||
var i AgentTaskQueue
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.AgentID,
|
||||
&i.IssueID,
|
||||
&i.Status,
|
||||
&i.Priority,
|
||||
&i.DispatchedAt,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.Result,
|
||||
&i.Error,
|
||||
&i.CreatedAt,
|
||||
&i.Context,
|
||||
&i.RuntimeID,
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.ChatSessionID,
|
||||
&i.AutopilotRunID,
|
||||
&i.Attempt,
|
||||
&i.MaxAttempts,
|
||||
&i.ParentTaskID,
|
||||
&i.FailureReason,
|
||||
&i.TriggerSummary,
|
||||
&i.ForceFreshSession,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const countActiveAgentsByRuntime = `-- name: CountActiveAgentsByRuntime :one
|
||||
SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL
|
||||
`
|
||||
@@ -228,6 +300,53 @@ func (q *Queries) FindLegacyRuntimesByDaemonID(ctx context.Context, arg FindLega
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const forceOfflineRuntimesByIDs = `-- name: ForceOfflineRuntimesByIDs :many
|
||||
UPDATE agent_runtime
|
||||
SET status = 'offline', updated_at = now()
|
||||
WHERE id = ANY($1::uuid[]) AND status = 'online'
|
||||
RETURNING id, workspace_id, owner_id, daemon_id, provider
|
||||
`
|
||||
|
||||
type ForceOfflineRuntimesByIDsRow struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
OwnerID pgtype.UUID `json:"owner_id"`
|
||||
DaemonID pgtype.Text `json:"daemon_id"`
|
||||
Provider string `json:"provider"`
|
||||
}
|
||||
|
||||
// Unconditionally flips a known set of runtime IDs to offline. Distinct from
|
||||
// MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the
|
||||
// sweeper cannot demote a runtime that just heartbeated): this variant is
|
||||
// used by intentional revocation paths — e.g. removing a workspace member —
|
||||
// where the caller has already decided the runtime should be offline
|
||||
// regardless of recent liveness.
|
||||
func (q *Queries) ForceOfflineRuntimesByIDs(ctx context.Context, runtimeIds []pgtype.UUID) ([]ForceOfflineRuntimesByIDsRow, error) {
|
||||
rows, err := q.db.Query(ctx, forceOfflineRuntimesByIDs, runtimeIds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ForceOfflineRuntimesByIDsRow{}
|
||||
for rows.Next() {
|
||||
var i ForceOfflineRuntimesByIDsRow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.OwnerID,
|
||||
&i.DaemonID,
|
||||
&i.Provider,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getAgentRuntime = `-- name: GetAgentRuntime :one
|
||||
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone FROM agent_runtime
|
||||
WHERE id = $1
|
||||
|
||||
@@ -54,6 +54,16 @@ UPDATE agent SET archived_at = now(), archived_by = $2, updated_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING *;
|
||||
|
||||
-- name: ArchiveAgentsByRuntime :many
|
||||
-- Bulk-archives every active agent bound to any runtime in the given set.
|
||||
-- Used when revoking a leaving member's runtimes so agents pinned to those
|
||||
-- runtimes can no longer be assigned new work. Returns the affected rows so
|
||||
-- the caller can broadcast agent:archived per agent.
|
||||
UPDATE agent
|
||||
SET archived_at = now(), archived_by = @archived_by, updated_at = now()
|
||||
WHERE runtime_id = ANY(@runtime_ids::uuid[]) AND archived_at IS NULL
|
||||
RETURNING *;
|
||||
|
||||
-- name: RestoreAgent :one
|
||||
UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now()
|
||||
WHERE id = $1
|
||||
|
||||
@@ -7,14 +7,18 @@ RETURNING *;
|
||||
SELECT * FROM daemon_token
|
||||
WHERE token_hash = $1 AND expires_at > now();
|
||||
|
||||
-- name: DeleteDaemonTokensByWorkspaceAndDaemon :exec
|
||||
-- Callers MUST also invalidate auth.DaemonTokenCache for each affected
|
||||
-- token_hash so the deletion takes effect before the cache TTL expires.
|
||||
-- Today this query has no caller; when a deregister / rotate flow lands,
|
||||
-- change this to :many RETURNING token_hash and call
|
||||
-- DaemonTokenCache.Invalidate(hash) for each row.
|
||||
-- name: DeleteDaemonTokensByWorkspaceAndDaemons :many
|
||||
-- Deletes every daemon_token row matching the (workspace_id, daemon_id)
|
||||
-- pairs implied by `daemon_ids`. Used by the member-revocation flow to
|
||||
-- nuke tokens for all runtimes a leaving member owned in one shot.
|
||||
-- Returns token_hash so the caller can invalidate auth.DaemonTokenCache
|
||||
-- before the 10-minute TTL expires — without that invalidate, a daemon
|
||||
-- can keep using its stale token until cache eviction even though the
|
||||
-- DB row is gone.
|
||||
DELETE FROM daemon_token
|
||||
WHERE workspace_id = $1 AND daemon_id = $2;
|
||||
WHERE workspace_id = @workspace_id
|
||||
AND daemon_id = ANY(@daemon_ids::text[])
|
||||
RETURNING token_hash;
|
||||
|
||||
-- name: DeleteExpiredDaemonTokens :exec
|
||||
DELETE FROM daemon_token
|
||||
|
||||
@@ -197,6 +197,39 @@ SELECT * FROM agent_runtime
|
||||
WHERE workspace_id = $1 AND owner_id = $2
|
||||
ORDER BY created_at ASC;
|
||||
|
||||
-- name: ForceOfflineRuntimesByIDs :many
|
||||
-- Unconditionally flips a known set of runtime IDs to offline. Distinct from
|
||||
-- MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the
|
||||
-- sweeper cannot demote a runtime that just heartbeated): this variant is
|
||||
-- used by intentional revocation paths — e.g. removing a workspace member —
|
||||
-- where the caller has already decided the runtime should be offline
|
||||
-- regardless of recent liveness.
|
||||
UPDATE agent_runtime
|
||||
SET status = 'offline', updated_at = now()
|
||||
WHERE id = ANY(@runtime_ids::uuid[]) AND status = 'online'
|
||||
RETURNING id, workspace_id, owner_id, daemon_id, provider;
|
||||
|
||||
-- name: CancelAgentTasksByRuntimeOrAgent :many
|
||||
-- Cancels every active task that either lives on one of the given runtimes
|
||||
-- OR belongs to one of the given agents. Used by the member-revocation flow:
|
||||
-- the runtime-side covers tasks queued against the leaving member's runtimes;
|
||||
-- the agent-side covers tasks pinned to a different runtime that those agents
|
||||
-- left behind from a prior UpdateAgent (agent.runtime_id can change, but
|
||||
-- agent_task_queue.runtime_id does not get rewritten when it does, so a task
|
||||
-- queued on runtime A by agent X — later moved to runtime B — survives the
|
||||
-- runtime-only revoke and could still be claimed because ClaimAgentTask does
|
||||
-- not gate on agent.archived_at).
|
||||
--
|
||||
-- We use 'cancelled' rather than 'failed' so the daemon's per-task status
|
||||
-- poller (watchTaskCancellation) interrupts the running agent gracefully.
|
||||
-- Returns the affected rows so the caller can broadcast task:cancelled and
|
||||
-- reconcile per-agent status.
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE (runtime_id = ANY(@runtime_ids::uuid[]) OR agent_id = ANY(@agent_ids::uuid[]))
|
||||
AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING *;
|
||||
|
||||
-- name: DeleteAgentRuntime :exec
|
||||
DELETE FROM agent_runtime WHERE id = $1;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user