-- name: ListAgentRuntimes :many SELECT * FROM agent_runtime WHERE workspace_id = $1 ORDER BY created_at ASC; -- name: GetAgentRuntime :one SELECT * FROM agent_runtime WHERE id = $1; -- name: GetAgentRuntimeForWorkspace :one SELECT * FROM agent_runtime WHERE id = $1 AND workspace_id = $2; -- name: UpsertAgentRuntime :one -- (xmax = 0) AS inserted distinguishes a fresh insert (true) from an upsert -- that updated an existing row (false). Analytics reads this to fire -- runtime_registered/runtime_ready only on first-time registration. -- -- @timezone is set on INSERT only. On conflict we deliberately KEEP the -- existing agent_runtime.timezone — once an operator overrides the tz via -- the web UI we don't want a daemon reconnect (which sends its own system -- tz) to silently revert it. Daemons can still set the initial value when -- they're the first to register a brand-new runtime row. INSERT INTO agent_runtime ( workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, owner_id, timezone, last_seen_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, @timezone, now()) ON CONFLICT (workspace_id, daemon_id, provider) DO UPDATE SET name = EXCLUDED.name, runtime_mode = EXCLUDED.runtime_mode, status = EXCLUDED.status, device_info = EXCLUDED.device_info, metadata = EXCLUDED.metadata, owner_id = COALESCE(EXCLUDED.owner_id, agent_runtime.owner_id), last_seen_at = now(), updated_at = now() RETURNING *, (xmax = 0) AS inserted; -- name: LockTaskUsageDailyRollup :exec -- Serialize explicit timezone rebuilds with rollup_task_usage_daily(), which -- uses the same advisory key in migration 073. This prevents cron from -- writing old-timezone buckets while PATCH is deleting/rebuilding rows. SELECT pg_advisory_xact_lock(4242); -- name: UpdateAgentRuntimeTimezone :one -- Operator-driven override of the runtime's reporting timezone (MUL-1950). UPDATE agent_runtime SET timezone = @timezone, updated_at = now() WHERE id = @id RETURNING *; -- name: DeleteTaskUsageDailyForRuntime :execrows -- First step of an explicit user timezone edit rebuild. Delete old materialized -- rows before re-inserting under the runtime's new timezone. DELETE FROM task_usage_daily WHERE runtime_id = $1; -- name: DeleteTaskUsageDailyDirtyForRuntime :execrows -- Drop queued dirty keys computed under the old timezone; the ordered rebuild -- in the same transaction will write the current aggregate instead. DELETE FROM task_usage_daily_dirty WHERE runtime_id = $1; -- name: InsertTaskUsageDailyForRuntime :execrows -- Final step of an explicit user timezone edit rebuild. This is intentionally -- called only for user edits, not by the migration itself: deploys do not -- backfill history, but a user-driven change must not leave old UTC rows next -- to newly computed local rows. This scans all history for the edited runtime; -- timezone edits are owner/admin operations and are expected to be rare. INSERT INTO task_usage_daily AS d ( bucket_date, workspace_id, runtime_id, provider, model, input_tokens, output_tokens, cache_read_tokens, cache_write_tokens, event_count ) SELECT DATE(tu.created_at AT TIME ZONE rt.timezone) AS bucket_date, a.workspace_id, atq.runtime_id, tu.provider, tu.model, SUM(tu.input_tokens)::bigint AS input_tokens, SUM(tu.output_tokens)::bigint AS output_tokens, SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens, SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens, COUNT(*)::bigint AS event_count FROM task_usage tu JOIN agent_task_queue atq ON atq.id = tu.task_id JOIN agent a ON a.id = atq.agent_id JOIN agent_runtime rt ON rt.id = atq.runtime_id WHERE atq.runtime_id = $1 GROUP BY 1, 2, 3, 4, 5 ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE SET input_tokens = EXCLUDED.input_tokens, output_tokens = EXCLUDED.output_tokens, cache_read_tokens = EXCLUDED.cache_read_tokens, cache_write_tokens = EXCLUDED.cache_write_tokens, event_count = EXCLUDED.event_count, updated_at = now(); -- name: TouchAgentRuntimeLastSeen :execrows -- Bumps last_seen_at on an already-online runtime. Deliberately does NOT -- touch status or updated_at: status is unchanged on the hot heartbeat path, -- and avoiding updated_at keeps the row HOT-eligible (no index columns -- change) and avoids invalidating any downstream consumer that watches -- updated_at. -- -- The status='online' predicate is load-bearing: callers read rt.Status from -- a prior SELECT and may race with the sweeper, which can flip the row to -- offline between that SELECT and this UPDATE. Without the predicate this -- query would silently leave a freshly-heartbeated runtime stuck in offline. -- Returning affected rows lets callers detect that race and fall back to -- MarkAgentRuntimeOnline to flip the row back online. UPDATE agent_runtime SET last_seen_at = now() WHERE id = $1 AND status = 'online'; -- name: TouchAgentRuntimesLastSeenBatch :execrows -- Bulk variant of TouchAgentRuntimeLastSeen used by the BatchedHeartbeatScheduler: -- coalesces N per-runtime "bump last_seen_at" requests into a single UPDATE so a -- fleet beating every 15s costs ~1 DB transaction per batch tick instead of N. -- -- Same load-bearing predicate as the single-id form: status='online' avoids -- silently un-deleting a sweeper-flipped offline row, and we deliberately do -- NOT touch updated_at so the rows stay HOT-eligible. Affected-rows < len(ids) -- means some IDs raced to offline between Schedule and flush; their next beat -- will fall through the recordHeartbeat sync path and call MarkAgentRuntimeOnline. UPDATE agent_runtime SET last_seen_at = now() WHERE id = ANY(@ids::uuid[]) AND status = 'online'; -- name: MarkAgentRuntimeOnline :one -- Used on the offline→online transition (and on first heartbeat after -- registration). Writes status, last_seen_at, and updated_at because the -- status flip is a real state change and we want updated_at to reflect it. UPDATE agent_runtime SET status = 'online', last_seen_at = now(), updated_at = now() WHERE id = $1 RETURNING *; -- name: SetAgentRuntimeOffline :exec UPDATE agent_runtime SET status = 'offline', updated_at = now() WHERE id = $1; -- name: SelectStaleOnlineRuntimes :many -- Lists online runtimes whose last_seen_at exceeds the stale window. The -- sweeper uses this as a candidate set, then optionally filters via the -- LivenessStore before flipping rows to offline (a fresh Redis liveness -- record means the DB row is just lagging, not actually dead). SELECT id, workspace_id, owner_id, daemon_id, provider FROM agent_runtime WHERE status = 'online' AND last_seen_at < now() - make_interval(secs => @stale_seconds::double precision); -- name: MarkRuntimesOfflineByIDs :many -- Flips a known set of runtime IDs from online to offline. Paired with -- SelectStaleOnlineRuntimes in the sweeper so the candidate selection and -- the actual write are decoupled (the LivenessStore filter sits between). -- -- Re-checks the stale predicate inside the UPDATE so a concurrent heartbeat -- between the SELECT (candidate gather), the LivenessStore filter, and this -- UPDATE cannot demote a runtime that just refreshed last_seen_at. The -- legacy MarkStaleRuntimesOffline UPDATE had this property implicitly -- because the predicate and the write lived in one statement; here we -- carry it forward explicitly so the SELECT/filter/UPDATE pipeline retains -- the same race-freedom. UPDATE agent_runtime SET status = 'offline', updated_at = now() WHERE status = 'online' AND id = ANY(@ids::uuid[]) AND last_seen_at < now() - make_interval(secs => @stale_seconds::double precision) RETURNING id, workspace_id, owner_id, daemon_id, provider; -- name: FailTasksForOfflineRuntimes :many -- Marks dispatched/running tasks as failed when their runtime is offline. -- This cleans up orphaned tasks after a daemon crash or network partition. UPDATE agent_task_queue SET status = 'failed', completed_at = now(), error = 'runtime went offline', failure_reason = 'runtime_offline' WHERE status IN ('dispatched', 'running') AND runtime_id IN ( SELECT id FROM agent_runtime WHERE status = 'offline' ) RETURNING *; -- name: ListAgentRuntimesByOwner :many SELECT * FROM agent_runtime WHERE workspace_id = $1 AND owner_id = $2 ORDER BY created_at ASC; -- name: ForceOfflineRuntimesByIDs :many -- Unconditionally flips a known set of runtime IDs to offline. Distinct from -- MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the -- sweeper cannot demote a runtime that just heartbeated): this variant is -- used by intentional revocation paths — e.g. removing a workspace member — -- where the caller has already decided the runtime should be offline -- regardless of recent liveness. UPDATE agent_runtime SET status = 'offline', updated_at = now() WHERE id = ANY(@runtime_ids::uuid[]) AND status = 'online' RETURNING id, workspace_id, owner_id, daemon_id, provider; -- name: CancelAgentTasksByRuntimeOrAgent :many -- Cancels every active task that either lives on one of the given runtimes -- OR belongs to one of the given agents. Used by the member-revocation flow: -- the runtime-side covers tasks queued against the leaving member's runtimes; -- the agent-side covers tasks pinned to a different runtime that those agents -- left behind from a prior UpdateAgent (agent.runtime_id can change, but -- agent_task_queue.runtime_id does not get rewritten when it does, so a task -- queued on runtime A by agent X — later moved to runtime B — survives the -- runtime-only revoke and could still be claimed because ClaimAgentTask does -- not gate on agent.archived_at). -- -- We use 'cancelled' rather than 'failed' so the daemon's per-task status -- poller (watchTaskCancellation) interrupts the running agent gracefully. -- Returns the affected rows so the caller can broadcast task:cancelled and -- reconcile per-agent status. UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE (runtime_id = ANY(@runtime_ids::uuid[]) OR agent_id = ANY(@agent_ids::uuid[])) AND status IN ('queued', 'dispatched', 'running') RETURNING *; -- name: DeleteAgentRuntime :exec DELETE FROM agent_runtime WHERE id = $1; -- name: CountActiveAgentsByRuntime :one SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL; -- name: DeleteArchivedAgentsByRuntime :exec DELETE FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL; -- name: FindLegacyRuntimesByDaemonID :many -- Looks up runtime rows keyed on a prior (hostname-derived) daemon_id. Used -- at register-time to find rows owned by the same machine under its old -- identity so agents/tasks can be re-pointed at the new UUID-keyed row. -- -- Comparison is case-insensitive because os.Hostname() has been observed to -- return different casings on the same machine (e.g. `Jiayuans-MacBook-Pro` -- vs `jiayuans-macbook-pro`) across reboots/mDNS state changes. A case- -- sensitive `=` would strand the old row; LOWER() on both sides handles drift -- without forcing the daemon to enumerate cased permutations. -- -- Returns many rather than one because case drift may have already minted -- duplicate rows historically (e.g. `Foo.local` AND `foo.local` under the -- same workspace+provider). A single-row lookup would consolidate only one -- of them and leave the rest orphaned. Callers must merge every returned -- row into the new UUID-keyed runtime. SELECT * FROM agent_runtime WHERE workspace_id = @workspace_id AND provider = @provider AND LOWER(daemon_id) = LOWER(@daemon_id); -- name: ReassignAgentsToRuntime :execrows -- Re-points every agent referencing old_runtime_id at new_runtime_id. UPDATE agent SET runtime_id = @new_runtime_id WHERE runtime_id = @old_runtime_id; -- name: ReassignTasksToRuntime :execrows -- Re-points every queued/running/completed task referencing old_runtime_id. -- Required before deleting the old runtime row because agent_task_queue has -- an ON DELETE CASCADE FK that would otherwise drop historical tasks. UPDATE agent_task_queue SET runtime_id = @new_runtime_id WHERE runtime_id = @old_runtime_id; -- name: RecordRuntimeLegacyDaemonID :exec -- Remembers the most recent hostname-derived daemon_id that was merged into -- this row. Useful for debugging when tracing back why a given runtime row -- subsumed an old one, and only overwrites NULL so the earliest merge is -- preserved. UPDATE agent_runtime SET legacy_daemon_id = COALESCE(legacy_daemon_id, $2) WHERE id = $1; -- name: DeleteStaleOfflineRuntimes :many -- Deletes runtimes that have been offline for longer than the TTL and have -- no agents bound (active or archived). The FK constraint on agent.runtime_id -- is ON DELETE RESTRICT, so we must exclude all agent references. DELETE FROM agent_runtime WHERE status = 'offline' AND last_seen_at < now() - make_interval(secs => @stale_seconds::double precision) AND id NOT IN (SELECT DISTINCT runtime_id FROM agent) RETURNING id, workspace_id;