mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
Addresses three review findings on PR #2888: 1. Archived squad handling: validateAutopilotAssignee now rejects squads with archived_at set; resolveAutopilotLeader returns errSquadArchived so the admission gate fails closed; DeleteSquad now mirrors the issue transfer for autopilot rows (TransferSquadAutopilotsToLeader) so surviving autopilots flip to assignee_type='agent' (leader) instead of dangling at the archived squad. 2. dispatchRunOnly post-admission readiness: introduces errDispatchSkipped sentinel, recognised by DispatchAutopilot via handleDispatchSkip so the run is recorded as `skipped` (not `failed`). Manual triggers no longer 500 when the leader's runtime goes offline between admission and task creation. New TestManualTriggerDoesNotErrorOnPostAdmissionSkip locks the behaviour in. 3. Dangling agent assignee after migration 096 dropped the FK: shouldSkipDispatch now distinguishes pgx.ErrNoRows / errSquadArchived (hard skip — retrying won't help) from transient DB errors (fail-open). DeleteAgentRuntime pauses autopilots that target agents about to be hard-deleted (ListArchivedAgentIDsByRuntime + PauseAutopilotsByAgentAssignees) so the breakage surfaces as a paused row in the UI instead of a quiet skip-burning loop. Unit tests cover the sentinel unwrap contract and errSquadArchived errors.Is behaviour. Integration test TestAutopilotDispatchSkipsWhenRuntimeOffline re-verified against a fresh DB with migration 096 applied. Co-authored-by: multica-agent <github@multica.ai>
324 lines
14 KiB
SQL
324 lines
14 KiB
SQL
-- name: ListAgentRuntimes :many
|
|
SELECT * FROM agent_runtime
|
|
WHERE workspace_id = $1
|
|
ORDER BY created_at ASC;
|
|
|
|
-- name: GetAgentRuntime :one
|
|
SELECT * FROM agent_runtime
|
|
WHERE id = $1;
|
|
|
|
-- name: GetAgentRuntimeForWorkspace :one
|
|
SELECT * FROM agent_runtime
|
|
WHERE id = $1 AND workspace_id = $2;
|
|
|
|
-- name: UpsertAgentRuntime :one
|
|
-- (xmax = 0) AS inserted distinguishes a fresh insert (true) from an upsert
|
|
-- that updated an existing row (false). Analytics reads this to fire
|
|
-- runtime_registered/runtime_ready only on first-time registration.
|
|
--
|
|
-- @timezone is set on INSERT only. On conflict we deliberately KEEP the
|
|
-- existing agent_runtime.timezone — once an operator overrides the tz via
|
|
-- the web UI we don't want a daemon reconnect (which sends its own system
|
|
-- tz) to silently revert it. Daemons can still set the initial value when
|
|
-- they're the first to register a brand-new runtime row.
|
|
INSERT INTO agent_runtime (
|
|
workspace_id,
|
|
daemon_id,
|
|
name,
|
|
runtime_mode,
|
|
provider,
|
|
status,
|
|
device_info,
|
|
metadata,
|
|
owner_id,
|
|
timezone,
|
|
last_seen_at
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, @timezone, now())
|
|
ON CONFLICT (workspace_id, daemon_id, provider)
|
|
DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
runtime_mode = EXCLUDED.runtime_mode,
|
|
status = EXCLUDED.status,
|
|
device_info = EXCLUDED.device_info,
|
|
metadata = EXCLUDED.metadata,
|
|
owner_id = COALESCE(EXCLUDED.owner_id, agent_runtime.owner_id),
|
|
last_seen_at = now(),
|
|
updated_at = now()
|
|
RETURNING *, (xmax = 0) AS inserted;
|
|
|
|
-- name: LockTaskUsageDailyRollup :exec
|
|
-- Serialize explicit timezone rebuilds with rollup_task_usage_daily(), which
|
|
-- uses the same advisory key in migration 073. This prevents cron from
|
|
-- writing old-timezone buckets while PATCH is deleting/rebuilding rows.
|
|
SELECT pg_advisory_xact_lock(4242);
|
|
|
|
-- name: UpdateAgentRuntimeTimezone :one
|
|
-- Operator-driven override of the runtime's reporting timezone (MUL-1950).
|
|
UPDATE agent_runtime
|
|
SET timezone = @timezone, updated_at = now()
|
|
WHERE id = @id
|
|
RETURNING *;
|
|
|
|
-- name: UpdateAgentRuntimeVisibility :one
|
|
-- Toggles a runtime between 'private' (only owner can bind agents) and
|
|
-- 'public' (any workspace member can). Default for new rows is 'private'
|
|
-- (see migration 083). Gated at the handler layer to owner / workspace
|
|
-- admin only.
|
|
UPDATE agent_runtime
|
|
SET visibility = @visibility, updated_at = now()
|
|
WHERE id = @id
|
|
RETURNING *;
|
|
|
|
-- name: DeleteTaskUsageDailyForRuntime :execrows
|
|
-- First step of an explicit user timezone edit rebuild. Delete old materialized
|
|
-- rows before re-inserting under the runtime's new timezone.
|
|
DELETE FROM task_usage_daily
|
|
WHERE runtime_id = $1;
|
|
|
|
-- name: DeleteTaskUsageDailyDirtyForRuntime :execrows
|
|
-- Drop queued dirty keys computed under the old timezone; the ordered rebuild
|
|
-- in the same transaction will write the current aggregate instead.
|
|
DELETE FROM task_usage_daily_dirty
|
|
WHERE runtime_id = $1;
|
|
|
|
-- name: InsertTaskUsageDailyForRuntime :execrows
|
|
-- Final step of an explicit user timezone edit rebuild. This is intentionally
|
|
-- called only for user edits, not by the migration itself: deploys do not
|
|
-- backfill history, but a user-driven change must not leave old UTC rows next
|
|
-- to newly computed local rows. This scans all history for the edited runtime;
|
|
-- timezone edits are owner/admin operations and are expected to be rare.
|
|
INSERT INTO task_usage_daily AS d (
|
|
bucket_date, workspace_id, runtime_id, provider, model,
|
|
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
|
|
event_count
|
|
)
|
|
SELECT
|
|
DATE(tu.created_at AT TIME ZONE rt.timezone) AS bucket_date,
|
|
a.workspace_id,
|
|
atq.runtime_id,
|
|
tu.provider,
|
|
tu.model,
|
|
SUM(tu.input_tokens)::bigint AS input_tokens,
|
|
SUM(tu.output_tokens)::bigint AS output_tokens,
|
|
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
|
|
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
|
|
COUNT(*)::bigint AS event_count
|
|
FROM task_usage tu
|
|
JOIN agent_task_queue atq ON atq.id = tu.task_id
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
JOIN agent_runtime rt ON rt.id = atq.runtime_id
|
|
WHERE atq.runtime_id = $1
|
|
GROUP BY 1, 2, 3, 4, 5
|
|
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
|
|
SET input_tokens = EXCLUDED.input_tokens,
|
|
output_tokens = EXCLUDED.output_tokens,
|
|
cache_read_tokens = EXCLUDED.cache_read_tokens,
|
|
cache_write_tokens = EXCLUDED.cache_write_tokens,
|
|
event_count = EXCLUDED.event_count,
|
|
updated_at = now();
|
|
|
|
-- name: TouchAgentRuntimeLastSeen :execrows
|
|
-- Bumps last_seen_at on an already-online runtime. Deliberately does NOT
|
|
-- touch status or updated_at: status is unchanged on the hot heartbeat path,
|
|
-- and avoiding updated_at keeps the row HOT-eligible (no index columns
|
|
-- change) and avoids invalidating any downstream consumer that watches
|
|
-- updated_at.
|
|
--
|
|
-- The status='online' predicate is load-bearing: callers read rt.Status from
|
|
-- a prior SELECT and may race with the sweeper, which can flip the row to
|
|
-- offline between that SELECT and this UPDATE. Without the predicate this
|
|
-- query would silently leave a freshly-heartbeated runtime stuck in offline.
|
|
-- Returning affected rows lets callers detect that race and fall back to
|
|
-- MarkAgentRuntimeOnline to flip the row back online.
|
|
UPDATE agent_runtime
|
|
SET last_seen_at = now()
|
|
WHERE id = $1 AND status = 'online';
|
|
|
|
-- name: TouchAgentRuntimesLastSeenBatch :execrows
|
|
-- Bulk variant of TouchAgentRuntimeLastSeen used by the BatchedHeartbeatScheduler:
|
|
-- coalesces N per-runtime "bump last_seen_at" requests into a single UPDATE so a
|
|
-- fleet beating every 15s costs ~1 DB transaction per batch tick instead of N.
|
|
--
|
|
-- Same load-bearing predicate as the single-id form: status='online' avoids
|
|
-- silently un-deleting a sweeper-flipped offline row, and we deliberately do
|
|
-- NOT touch updated_at so the rows stay HOT-eligible. Affected-rows < len(ids)
|
|
-- means some IDs raced to offline between Schedule and flush; their next beat
|
|
-- will fall through the recordHeartbeat sync path and call MarkAgentRuntimeOnline.
|
|
UPDATE agent_runtime
|
|
SET last_seen_at = now()
|
|
WHERE id = ANY(@ids::uuid[]) AND status = 'online';
|
|
|
|
-- name: MarkAgentRuntimeOnline :one
|
|
-- Used on the offline→online transition (and on first heartbeat after
|
|
-- registration). Writes status, last_seen_at, and updated_at because the
|
|
-- status flip is a real state change and we want updated_at to reflect it.
|
|
UPDATE agent_runtime
|
|
SET status = 'online', last_seen_at = now(), updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING *;
|
|
|
|
-- name: SetAgentRuntimeOffline :exec
|
|
UPDATE agent_runtime
|
|
SET status = 'offline', updated_at = now()
|
|
WHERE id = $1;
|
|
|
|
-- name: SelectStaleOnlineRuntimes :many
|
|
-- Lists online runtimes whose last_seen_at exceeds the stale window. The
|
|
-- sweeper uses this as a candidate set, then optionally filters via the
|
|
-- LivenessStore before flipping rows to offline (a fresh Redis liveness
|
|
-- record means the DB row is just lagging, not actually dead).
|
|
SELECT id, workspace_id, owner_id, daemon_id, provider FROM agent_runtime
|
|
WHERE status = 'online'
|
|
AND last_seen_at < now() - make_interval(secs => @stale_seconds::double precision);
|
|
|
|
-- name: MarkRuntimesOfflineByIDs :many
|
|
-- Flips a known set of runtime IDs from online to offline. Paired with
|
|
-- SelectStaleOnlineRuntimes in the sweeper so the candidate selection and
|
|
-- the actual write are decoupled (the LivenessStore filter sits between).
|
|
--
|
|
-- Re-checks the stale predicate inside the UPDATE so a concurrent heartbeat
|
|
-- between the SELECT (candidate gather), the LivenessStore filter, and this
|
|
-- UPDATE cannot demote a runtime that just refreshed last_seen_at. The
|
|
-- legacy MarkStaleRuntimesOffline UPDATE had this property implicitly
|
|
-- because the predicate and the write lived in one statement; here we
|
|
-- carry it forward explicitly so the SELECT/filter/UPDATE pipeline retains
|
|
-- the same race-freedom.
|
|
UPDATE agent_runtime
|
|
SET status = 'offline', updated_at = now()
|
|
WHERE status = 'online'
|
|
AND id = ANY(@ids::uuid[])
|
|
AND last_seen_at < now() - make_interval(secs => @stale_seconds::double precision)
|
|
RETURNING id, workspace_id, owner_id, daemon_id, provider;
|
|
|
|
-- name: FailTasksForOfflineRuntimes :many
|
|
-- Marks dispatched/running tasks as failed when their runtime is offline.
|
|
-- This cleans up orphaned tasks after a daemon crash or network partition.
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed', completed_at = now(), error = 'runtime went offline',
|
|
failure_reason = 'runtime_offline'
|
|
WHERE status IN ('dispatched', 'running')
|
|
AND runtime_id IN (
|
|
SELECT id FROM agent_runtime WHERE status = 'offline'
|
|
)
|
|
RETURNING *;
|
|
|
|
-- name: ListAgentRuntimesByOwner :many
|
|
SELECT * FROM agent_runtime
|
|
WHERE workspace_id = $1 AND owner_id = $2
|
|
ORDER BY created_at ASC;
|
|
|
|
-- name: ForceOfflineRuntimesByIDs :many
|
|
-- Unconditionally flips a known set of runtime IDs to offline. Distinct from
|
|
-- MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the
|
|
-- sweeper cannot demote a runtime that just heartbeated): this variant is
|
|
-- used by intentional revocation paths — e.g. removing a workspace member —
|
|
-- where the caller has already decided the runtime should be offline
|
|
-- regardless of recent liveness.
|
|
UPDATE agent_runtime
|
|
SET status = 'offline', updated_at = now()
|
|
WHERE id = ANY(@runtime_ids::uuid[]) AND status = 'online'
|
|
RETURNING id, workspace_id, owner_id, daemon_id, provider;
|
|
|
|
-- name: CancelAgentTasksByRuntimeOrAgent :many
|
|
-- Cancels every active task that either lives on one of the given runtimes
|
|
-- OR belongs to one of the given agents. Used by the member-revocation flow:
|
|
-- the runtime-side covers tasks queued against the leaving member's runtimes;
|
|
-- the agent-side covers tasks pinned to a different runtime that those agents
|
|
-- left behind from a prior UpdateAgent (agent.runtime_id can change, but
|
|
-- agent_task_queue.runtime_id does not get rewritten when it does, so a task
|
|
-- queued on runtime A by agent X — later moved to runtime B — survives the
|
|
-- runtime-only revoke and could still be claimed because ClaimAgentTask does
|
|
-- not gate on agent.archived_at).
|
|
--
|
|
-- We use 'cancelled' rather than 'failed' so the daemon's per-task status
|
|
-- poller (watchTaskCancellation) interrupts the running agent gracefully.
|
|
-- Returns the affected rows so the caller can broadcast task:cancelled and
|
|
-- reconcile per-agent status.
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE (runtime_id = ANY(@runtime_ids::uuid[]) OR agent_id = ANY(@agent_ids::uuid[]))
|
|
AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING *;
|
|
|
|
-- name: DeleteAgentRuntime :exec
|
|
DELETE FROM agent_runtime WHERE id = $1;
|
|
|
|
-- name: CountActiveAgentsByRuntime :one
|
|
SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL;
|
|
|
|
-- name: DeleteArchivedAgentsByRuntime :exec
|
|
DELETE FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL;
|
|
|
|
-- name: PauseAutopilotsByAgentAssignees :exec
|
|
-- Pauses every active autopilot whose agent assignee is in the supplied list.
|
|
-- Called before hard-deleting archived agents on runtime teardown so the rows
|
|
-- do not become dangling (autopilot.assignee_id no longer has an agent FK
|
|
-- since migration 096). Status='paused' makes the breakage visible in the UI
|
|
-- — operators can re-point the autopilot at a live agent or delete it —
|
|
-- rather than silently piling skipped runs.
|
|
UPDATE autopilot
|
|
SET status = 'paused', updated_at = now()
|
|
WHERE status = 'active'
|
|
AND assignee_type = 'agent'
|
|
AND assignee_id = ANY(@assignee_ids::uuid[]);
|
|
|
|
-- name: ListArchivedAgentIDsByRuntime :many
|
|
-- Companion to DeleteArchivedAgentsByRuntime: enumerates the archived agents
|
|
-- about to be hard-deleted so the runtime teardown can pause autopilots that
|
|
-- still point at them. Returns ids only — the caller only needs the set.
|
|
SELECT id FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL;
|
|
|
|
-- name: FindLegacyRuntimesByDaemonID :many
|
|
-- Looks up runtime rows keyed on a prior (hostname-derived) daemon_id. Used
|
|
-- at register-time to find rows owned by the same machine under its old
|
|
-- identity so agents/tasks can be re-pointed at the new UUID-keyed row.
|
|
--
|
|
-- Comparison is case-insensitive because os.Hostname() has been observed to
|
|
-- return different casings on the same machine (e.g. `Jiayuans-MacBook-Pro`
|
|
-- vs `jiayuans-macbook-pro`) across reboots/mDNS state changes. A case-
|
|
-- sensitive `=` would strand the old row; LOWER() on both sides handles drift
|
|
-- without forcing the daemon to enumerate cased permutations.
|
|
--
|
|
-- Returns many rather than one because case drift may have already minted
|
|
-- duplicate rows historically (e.g. `Foo.local` AND `foo.local` under the
|
|
-- same workspace+provider). A single-row lookup would consolidate only one
|
|
-- of them and leave the rest orphaned. Callers must merge every returned
|
|
-- row into the new UUID-keyed runtime.
|
|
SELECT * FROM agent_runtime
|
|
WHERE workspace_id = @workspace_id
|
|
AND provider = @provider
|
|
AND LOWER(daemon_id) = LOWER(@daemon_id);
|
|
|
|
-- name: ReassignAgentsToRuntime :execrows
|
|
-- Re-points every agent referencing old_runtime_id at new_runtime_id.
|
|
UPDATE agent
|
|
SET runtime_id = @new_runtime_id
|
|
WHERE runtime_id = @old_runtime_id;
|
|
|
|
-- name: ReassignTasksToRuntime :execrows
|
|
-- Re-points every queued/running/completed task referencing old_runtime_id.
|
|
-- Required before deleting the old runtime row because agent_task_queue has
|
|
-- an ON DELETE CASCADE FK that would otherwise drop historical tasks.
|
|
UPDATE agent_task_queue
|
|
SET runtime_id = @new_runtime_id
|
|
WHERE runtime_id = @old_runtime_id;
|
|
|
|
-- name: RecordRuntimeLegacyDaemonID :exec
|
|
-- Remembers the most recent hostname-derived daemon_id that was merged into
|
|
-- this row. Useful for debugging when tracing back why a given runtime row
|
|
-- subsumed an old one, and only overwrites NULL so the earliest merge is
|
|
-- preserved.
|
|
UPDATE agent_runtime
|
|
SET legacy_daemon_id = COALESCE(legacy_daemon_id, $2)
|
|
WHERE id = $1;
|
|
|
|
-- name: DeleteStaleOfflineRuntimes :many
|
|
-- Deletes runtimes that have been offline for longer than the TTL and have
|
|
-- no agents bound (active or archived). The FK constraint on agent.runtime_id
|
|
-- is ON DELETE RESTRICT, so we must exclude all agent references.
|
|
DELETE FROM agent_runtime
|
|
WHERE status = 'offline'
|
|
AND last_seen_at < now() - make_interval(secs => @stale_seconds::double precision)
|
|
AND id NOT IN (SELECT DISTINCT runtime_id FROM agent)
|
|
RETURNING id, workspace_id;
|