// Code generated by sqlc. DO NOT EDIT. // versions: // sqlc v1.31.1 // source: runtime.sql package db import ( "context" "github.com/jackc/pgx/v5/pgtype" ) const cancelAgentTasksByRuntimeOrAgent = `-- name: CancelAgentTasksByRuntimeOrAgent :many UPDATE agent_task_queue SET status = 'cancelled', completed_at = now() WHERE (runtime_id = ANY($1::uuid[]) OR agent_id = ANY($2::uuid[])) AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory') RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay, escalation_for_task_id, fire_at, originator_user_id, runtime_connected_apps ` type CancelAgentTasksByRuntimeOrAgentParams struct { RuntimeIds []pgtype.UUID `json:"runtime_ids"` AgentIds []pgtype.UUID `json:"agent_ids"` } // Cancels every active task that either lives on one of the given runtimes // OR belongs to one of the given agents. Used by the member-revocation flow: // the runtime-side covers tasks queued against the leaving member's runtimes; // the agent-side covers tasks pinned to a different runtime that those agents // left behind from a prior UpdateAgent (agent.runtime_id can change, but // agent_task_queue.runtime_id does not get rewritten when it does, so a task // queued on runtime A by agent X — later moved to runtime B — survives the // runtime-only revoke and could still be claimed because ClaimAgentTask does // not gate on agent.archived_at). // // We use 'cancelled' rather than 'failed' so the daemon's per-task status // poller (watchTaskCancellation) interrupts the running agent gracefully. // Returns the affected rows so the caller can broadcast task:cancelled and // reconcile per-agent status. func (q *Queries) CancelAgentTasksByRuntimeOrAgent(ctx context.Context, arg CancelAgentTasksByRuntimeOrAgentParams) ([]AgentTaskQueue, error) { rows, err := q.db.Query(ctx, cancelAgentTasksByRuntimeOrAgent, arg.RuntimeIds, arg.AgentIds) if err != nil { return nil, err } defer rows.Close() items := []AgentTaskQueue{} for rows.Next() { var i AgentTaskQueue if err := rows.Scan( &i.ID, &i.AgentID, &i.IssueID, &i.Status, &i.Priority, &i.DispatchedAt, &i.StartedAt, &i.CompletedAt, &i.Result, &i.Error, &i.CreatedAt, &i.Context, &i.RuntimeID, &i.SessionID, &i.WorkDir, &i.TriggerCommentID, &i.ChatSessionID, &i.AutopilotRunID, &i.Attempt, &i.MaxAttempts, &i.ParentTaskID, &i.FailureReason, &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, &i.WaitReason, &i.InitiatorUserID, &i.HandoffNote, &i.PrepareLeaseExpiresAt, &i.SquadID, &i.RuntimeMcpOverlay, &i.EscalationForTaskID, &i.FireAt, &i.OriginatorUserID, &i.RuntimeConnectedApps, ); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const countActiveAgentsByRuntime = `-- name: CountActiveAgentsByRuntime :one SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL ` func (q *Queries) CountActiveAgentsByRuntime(ctx context.Context, runtimeID pgtype.UUID) (int64, error) { row := q.db.QueryRow(ctx, countActiveAgentsByRuntime, runtimeID) var count int64 err := row.Scan(&count) return count, err } const countActiveSquadsWithArchivedLeadersByRuntime = `-- name: CountActiveSquadsWithArchivedLeadersByRuntime :one SELECT count(*) FROM squad WHERE archived_at IS NULL AND leader_id IN ( SELECT id FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL ) ` func (q *Queries) CountActiveSquadsWithArchivedLeadersByRuntime(ctx context.Context, runtimeID pgtype.UUID) (int64, error) { row := q.db.QueryRow(ctx, countActiveSquadsWithArchivedLeadersByRuntime, runtimeID) var count int64 err := row.Scan(&count) return count, err } const deleteAgentRuntime = `-- name: DeleteAgentRuntime :exec DELETE FROM agent_runtime WHERE id = $1 ` func (q *Queries) DeleteAgentRuntime(ctx context.Context, id pgtype.UUID) error { _, err := q.db.Exec(ctx, deleteAgentRuntime, id) return err } const deleteArchivedAgentsByRuntime = `-- name: DeleteArchivedAgentsByRuntime :exec DELETE FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL ` func (q *Queries) DeleteArchivedAgentsByRuntime(ctx context.Context, runtimeID pgtype.UUID) error { _, err := q.db.Exec(ctx, deleteArchivedAgentsByRuntime, runtimeID) return err } const deleteSquadsByArchivedAgentsOnRuntime = `-- name: DeleteSquadsByArchivedAgentsOnRuntime :exec DELETE FROM squad WHERE leader_id IN ( SELECT id FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL ) AND archived_at IS NOT NULL ` // Removes archived squads whose leader_id references an archived agent on the // given runtime. Must run before DeleteArchivedAgentsByRuntime so the RESTRICT // FK on squad.leader_id does not block the agent deletion. Active squads are // handled separately by CountActiveSquadsWithArchivedLeadersByRuntime, which // returns a 409 until the caller archives them or assigns a new leader. func (q *Queries) DeleteSquadsByArchivedAgentsOnRuntime(ctx context.Context, runtimeID pgtype.UUID) error { _, err := q.db.Exec(ctx, deleteSquadsByArchivedAgentsOnRuntime, runtimeID) return err } const deleteStaleOfflineRuntimes = `-- name: DeleteStaleOfflineRuntimes :many DELETE FROM agent_runtime WHERE status = 'offline' AND last_seen_at < now() - make_interval(secs => $1::double precision) AND id NOT IN (SELECT DISTINCT runtime_id FROM agent) RETURNING id, workspace_id ` type DeleteStaleOfflineRuntimesRow struct { ID pgtype.UUID `json:"id"` WorkspaceID pgtype.UUID `json:"workspace_id"` } // Deletes runtimes that have been offline for longer than the TTL and have // no agents bound (active or archived). The FK constraint on agent.runtime_id // is ON DELETE RESTRICT, so we must exclude all agent references. func (q *Queries) DeleteStaleOfflineRuntimes(ctx context.Context, staleSeconds float64) ([]DeleteStaleOfflineRuntimesRow, error) { rows, err := q.db.Query(ctx, deleteStaleOfflineRuntimes, staleSeconds) if err != nil { return nil, err } defer rows.Close() items := []DeleteStaleOfflineRuntimesRow{} for rows.Next() { var i DeleteStaleOfflineRuntimesRow if err := rows.Scan(&i.ID, &i.WorkspaceID); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const failTasksForOfflineRuntimes = `-- name: FailTasksForOfflineRuntimes :many UPDATE agent_task_queue SET status = 'failed', completed_at = now(), error = 'runtime went offline', failure_reason = 'runtime_offline', wait_reason = NULL WHERE status IN ('dispatched', 'running', 'waiting_local_directory') AND runtime_id IN ( SELECT id FROM agent_runtime WHERE status = 'offline' ) RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason, initiator_user_id, handoff_note, prepare_lease_expires_at, squad_id, runtime_mcp_overlay, escalation_for_task_id, fire_at, originator_user_id, runtime_connected_apps ` // Marks dispatched/running/waiting_local_directory tasks as failed when // their runtime is offline. This cleans up orphaned tasks after a daemon // crash or network partition. func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]AgentTaskQueue, error) { rows, err := q.db.Query(ctx, failTasksForOfflineRuntimes) if err != nil { return nil, err } defer rows.Close() items := []AgentTaskQueue{} for rows.Next() { var i AgentTaskQueue if err := rows.Scan( &i.ID, &i.AgentID, &i.IssueID, &i.Status, &i.Priority, &i.DispatchedAt, &i.StartedAt, &i.CompletedAt, &i.Result, &i.Error, &i.CreatedAt, &i.Context, &i.RuntimeID, &i.SessionID, &i.WorkDir, &i.TriggerCommentID, &i.ChatSessionID, &i.AutopilotRunID, &i.Attempt, &i.MaxAttempts, &i.ParentTaskID, &i.FailureReason, &i.TriggerSummary, &i.ForceFreshSession, &i.IsLeaderTask, &i.WaitReason, &i.InitiatorUserID, &i.HandoffNote, &i.PrepareLeaseExpiresAt, &i.SquadID, &i.RuntimeMcpOverlay, &i.EscalationForTaskID, &i.FireAt, &i.OriginatorUserID, &i.RuntimeConnectedApps, ); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const findLegacyRuntimesByDaemonID = `-- name: FindLegacyRuntimesByDaemonID :many SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id FROM agent_runtime WHERE workspace_id = $1 AND provider = $2 AND LOWER(daemon_id) = LOWER($3) ` type FindLegacyRuntimesByDaemonIDParams struct { WorkspaceID pgtype.UUID `json:"workspace_id"` Provider string `json:"provider"` DaemonID string `json:"daemon_id"` } // Looks up runtime rows keyed on a prior (hostname-derived) daemon_id. Used // at register-time to find rows owned by the same machine under its old // identity so agents/tasks can be re-pointed at the new UUID-keyed row. // // Comparison is case-insensitive because os.Hostname() has been observed to // return different casings on the same machine (e.g. `Jiayuans-MacBook-Pro` // vs `jiayuans-macbook-pro`) across reboots/mDNS state changes. A case- // sensitive `=` would strand the old row; LOWER() on both sides handles drift // without forcing the daemon to enumerate cased permutations. // // Returns many rather than one because case drift may have already minted // duplicate rows historically (e.g. `Foo.local` AND `foo.local` under the // same workspace+provider). A single-row lookup would consolidate only one // of them and leave the rest orphaned. Callers must merge every returned // row into the new UUID-keyed runtime. func (q *Queries) FindLegacyRuntimesByDaemonID(ctx context.Context, arg FindLegacyRuntimesByDaemonIDParams) ([]AgentRuntime, error) { rows, err := q.db.Query(ctx, findLegacyRuntimesByDaemonID, arg.WorkspaceID, arg.Provider, arg.DaemonID) if err != nil { return nil, err } defer rows.Close() items := []AgentRuntime{} for rows.Next() { var i AgentRuntime if err := rows.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, ); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const forceOfflineRuntimesByIDs = `-- name: ForceOfflineRuntimesByIDs :many UPDATE agent_runtime SET status = 'offline', updated_at = now() WHERE id = ANY($1::uuid[]) AND status = 'online' RETURNING id, workspace_id, owner_id, daemon_id, provider ` type ForceOfflineRuntimesByIDsRow struct { ID pgtype.UUID `json:"id"` WorkspaceID pgtype.UUID `json:"workspace_id"` OwnerID pgtype.UUID `json:"owner_id"` DaemonID pgtype.Text `json:"daemon_id"` Provider string `json:"provider"` } // Unconditionally flips a known set of runtime IDs to offline. Distinct from // MarkRuntimesOfflineByIDs (which keeps a stale-window predicate so the // sweeper cannot demote a runtime that just heartbeated): this variant is // used by intentional revocation paths — e.g. removing a workspace member — // where the caller has already decided the runtime should be offline // regardless of recent liveness. func (q *Queries) ForceOfflineRuntimesByIDs(ctx context.Context, runtimeIds []pgtype.UUID) ([]ForceOfflineRuntimesByIDsRow, error) { rows, err := q.db.Query(ctx, forceOfflineRuntimesByIDs, runtimeIds) if err != nil { return nil, err } defer rows.Close() items := []ForceOfflineRuntimesByIDsRow{} for rows.Next() { var i ForceOfflineRuntimesByIDsRow if err := rows.Scan( &i.ID, &i.WorkspaceID, &i.OwnerID, &i.DaemonID, &i.Provider, ); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const getAgentRuntime = `-- name: GetAgentRuntime :one SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id FROM agent_runtime WHERE id = $1 ` func (q *Queries) GetAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) { row := q.db.QueryRow(ctx, getAgentRuntime, id) var i AgentRuntime err := row.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, ) return i, err } const getAgentRuntimeForWorkspace = `-- name: GetAgentRuntimeForWorkspace :one SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id FROM agent_runtime WHERE id = $1 AND workspace_id = $2 ` type GetAgentRuntimeForWorkspaceParams struct { ID pgtype.UUID `json:"id"` WorkspaceID pgtype.UUID `json:"workspace_id"` } func (q *Queries) GetAgentRuntimeForWorkspace(ctx context.Context, arg GetAgentRuntimeForWorkspaceParams) (AgentRuntime, error) { row := q.db.QueryRow(ctx, getAgentRuntimeForWorkspace, arg.ID, arg.WorkspaceID) var i AgentRuntime err := row.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, ) return i, err } const listAgentRuntimes = `-- name: ListAgentRuntimes :many SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id FROM agent_runtime WHERE workspace_id = $1 ORDER BY created_at ASC ` func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID) ([]AgentRuntime, error) { rows, err := q.db.Query(ctx, listAgentRuntimes, workspaceID) if err != nil { return nil, err } defer rows.Close() items := []AgentRuntime{} for rows.Next() { var i AgentRuntime if err := rows.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, ); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const listAgentRuntimesByOwner = `-- name: ListAgentRuntimesByOwner :many SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id FROM agent_runtime WHERE workspace_id = $1 AND owner_id = $2 ORDER BY created_at ASC ` type ListAgentRuntimesByOwnerParams struct { WorkspaceID pgtype.UUID `json:"workspace_id"` OwnerID pgtype.UUID `json:"owner_id"` } func (q *Queries) ListAgentRuntimesByOwner(ctx context.Context, arg ListAgentRuntimesByOwnerParams) ([]AgentRuntime, error) { rows, err := q.db.Query(ctx, listAgentRuntimesByOwner, arg.WorkspaceID, arg.OwnerID) if err != nil { return nil, err } defer rows.Close() items := []AgentRuntime{} for rows.Next() { var i AgentRuntime if err := rows.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, ); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const listArchivedAgentIDsByRuntime = `-- name: ListArchivedAgentIDsByRuntime :many SELECT id FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL ` // Companion to DeleteArchivedAgentsByRuntime: enumerates the archived agents // about to be hard-deleted so the runtime teardown can pause autopilots that // still point at them. Returns ids only — the caller only needs the set. func (q *Queries) ListArchivedAgentIDsByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]pgtype.UUID, error) { rows, err := q.db.Query(ctx, listArchivedAgentIDsByRuntime, runtimeID) if err != nil { return nil, err } defer rows.Close() items := []pgtype.UUID{} for rows.Next() { var id pgtype.UUID if err := rows.Scan(&id); err != nil { return nil, err } items = append(items, id) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const lockAgentRuntime = `-- name: LockAgentRuntime :one SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id FROM agent_runtime WHERE id = $1 FOR UPDATE ` // Acquires a row-level exclusive lock on the runtime row. Used at the // top of the cascade-delete transaction so that: // 1. PostgreSQL's FK validation on agent.runtime_id (FK ... ON DELETE // RESTRICT) needs FOR KEY SHARE on the parent runtime row, which // conflicts with FOR UPDATE — so any concurrent INSERT or UPDATE // that would point a new/moved agent at this runtime blocks until // our transaction finishes; and // 2. concurrent UPDATE/DELETE of the runtime row itself (e.g. another // delete attempt) waits for us to commit. // // Combined with ListActiveAgentsByRuntimeForUpdate (which row-locks the // existing active set) this closes the plan-compare → archive race that // was possible at read-committed isolation between the snapshot and the // bulk archive. func (q *Queries) LockAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) { row := q.db.QueryRow(ctx, lockAgentRuntime, id) var i AgentRuntime err := row.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, ) return i, err } const markAgentRuntimeOnline = `-- name: MarkAgentRuntimeOnline :one UPDATE agent_runtime SET status = 'online', last_seen_at = now(), updated_at = now() WHERE id = $1 RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id ` // Used on the offline→online transition (and on first heartbeat after // registration). Writes status, last_seen_at, and updated_at because the // status flip is a real state change and we want updated_at to reflect it. func (q *Queries) MarkAgentRuntimeOnline(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) { row := q.db.QueryRow(ctx, markAgentRuntimeOnline, id) var i AgentRuntime err := row.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, ) return i, err } const markRuntimesOfflineByIDs = `-- name: MarkRuntimesOfflineByIDs :many UPDATE agent_runtime SET status = 'offline', updated_at = now() WHERE status = 'online' AND id = ANY($1::uuid[]) AND last_seen_at < now() - make_interval(secs => $2::double precision) RETURNING id, workspace_id, owner_id, daemon_id, provider ` type MarkRuntimesOfflineByIDsParams struct { Ids []pgtype.UUID `json:"ids"` StaleSeconds float64 `json:"stale_seconds"` } type MarkRuntimesOfflineByIDsRow struct { ID pgtype.UUID `json:"id"` WorkspaceID pgtype.UUID `json:"workspace_id"` OwnerID pgtype.UUID `json:"owner_id"` DaemonID pgtype.Text `json:"daemon_id"` Provider string `json:"provider"` } // Flips a known set of runtime IDs from online to offline. Paired with // SelectStaleOnlineRuntimes in the sweeper so the candidate selection and // the actual write are decoupled (the LivenessStore filter sits between). // // Re-checks the stale predicate inside the UPDATE so a concurrent heartbeat // between the SELECT (candidate gather), the LivenessStore filter, and this // UPDATE cannot demote a runtime that just refreshed last_seen_at. The // legacy MarkStaleRuntimesOffline UPDATE had this property implicitly // because the predicate and the write lived in one statement; here we // carry it forward explicitly so the SELECT/filter/UPDATE pipeline retains // the same race-freedom. func (q *Queries) MarkRuntimesOfflineByIDs(ctx context.Context, arg MarkRuntimesOfflineByIDsParams) ([]MarkRuntimesOfflineByIDsRow, error) { rows, err := q.db.Query(ctx, markRuntimesOfflineByIDs, arg.Ids, arg.StaleSeconds) if err != nil { return nil, err } defer rows.Close() items := []MarkRuntimesOfflineByIDsRow{} for rows.Next() { var i MarkRuntimesOfflineByIDsRow if err := rows.Scan( &i.ID, &i.WorkspaceID, &i.OwnerID, &i.DaemonID, &i.Provider, ); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const pauseAutopilotsByAgentAssignees = `-- name: PauseAutopilotsByAgentAssignees :exec UPDATE autopilot SET status = 'paused', updated_at = now() WHERE status = 'active' AND assignee_type = 'agent' AND assignee_id = ANY($1::uuid[]) ` // Pauses every active autopilot whose agent assignee is in the supplied list. // Called before hard-deleting archived agents on runtime teardown so the rows // do not become dangling (autopilot.assignee_id no longer has an agent FK // since migration 096). Status='paused' makes the breakage visible in the UI // — operators can re-point the autopilot at a live agent or delete it — // rather than silently piling skipped runs. func (q *Queries) PauseAutopilotsByAgentAssignees(ctx context.Context, assigneeIds []pgtype.UUID) error { _, err := q.db.Exec(ctx, pauseAutopilotsByAgentAssignees, assigneeIds) return err } const reassignAgentsToRuntime = `-- name: ReassignAgentsToRuntime :execrows UPDATE agent SET runtime_id = $1 WHERE runtime_id = $2 ` type ReassignAgentsToRuntimeParams struct { NewRuntimeID pgtype.UUID `json:"new_runtime_id"` OldRuntimeID pgtype.UUID `json:"old_runtime_id"` } // Re-points every agent referencing old_runtime_id at new_runtime_id. func (q *Queries) ReassignAgentsToRuntime(ctx context.Context, arg ReassignAgentsToRuntimeParams) (int64, error) { result, err := q.db.Exec(ctx, reassignAgentsToRuntime, arg.NewRuntimeID, arg.OldRuntimeID) if err != nil { return 0, err } return result.RowsAffected(), nil } const reassignTasksToRuntime = `-- name: ReassignTasksToRuntime :execrows UPDATE agent_task_queue SET runtime_id = $1 WHERE runtime_id = $2 ` type ReassignTasksToRuntimeParams struct { NewRuntimeID pgtype.UUID `json:"new_runtime_id"` OldRuntimeID pgtype.UUID `json:"old_runtime_id"` } // Re-points every queued/running/completed task referencing old_runtime_id. // Required before deleting the old runtime row because agent_task_queue has // an ON DELETE CASCADE FK that would otherwise drop historical tasks. func (q *Queries) ReassignTasksToRuntime(ctx context.Context, arg ReassignTasksToRuntimeParams) (int64, error) { result, err := q.db.Exec(ctx, reassignTasksToRuntime, arg.NewRuntimeID, arg.OldRuntimeID) if err != nil { return 0, err } return result.RowsAffected(), nil } const recordRuntimeLegacyDaemonID = `-- name: RecordRuntimeLegacyDaemonID :exec UPDATE agent_runtime SET legacy_daemon_id = COALESCE(legacy_daemon_id, $2) WHERE id = $1 ` type RecordRuntimeLegacyDaemonIDParams struct { ID pgtype.UUID `json:"id"` LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"` } // Remembers the most recent hostname-derived daemon_id that was merged into // this row. Useful for debugging when tracing back why a given runtime row // subsumed an old one, and only overwrites NULL so the earliest merge is // preserved. func (q *Queries) RecordRuntimeLegacyDaemonID(ctx context.Context, arg RecordRuntimeLegacyDaemonIDParams) error { _, err := q.db.Exec(ctx, recordRuntimeLegacyDaemonID, arg.ID, arg.LegacyDaemonID) return err } const selectStaleOnlineRuntimes = `-- name: SelectStaleOnlineRuntimes :many SELECT id, workspace_id, owner_id, daemon_id, provider FROM agent_runtime WHERE status = 'online' AND last_seen_at < now() - make_interval(secs => $1::double precision) ` type SelectStaleOnlineRuntimesRow struct { ID pgtype.UUID `json:"id"` WorkspaceID pgtype.UUID `json:"workspace_id"` OwnerID pgtype.UUID `json:"owner_id"` DaemonID pgtype.Text `json:"daemon_id"` Provider string `json:"provider"` } // Lists online runtimes whose last_seen_at exceeds the stale window. The // sweeper uses this as a candidate set, then optionally filters via the // LivenessStore before flipping rows to offline (a fresh Redis liveness // record means the DB row is just lagging, not actually dead). func (q *Queries) SelectStaleOnlineRuntimes(ctx context.Context, staleSeconds float64) ([]SelectStaleOnlineRuntimesRow, error) { rows, err := q.db.Query(ctx, selectStaleOnlineRuntimes, staleSeconds) if err != nil { return nil, err } defer rows.Close() items := []SelectStaleOnlineRuntimesRow{} for rows.Next() { var i SelectStaleOnlineRuntimesRow if err := rows.Scan( &i.ID, &i.WorkspaceID, &i.OwnerID, &i.DaemonID, &i.Provider, ); err != nil { return nil, err } items = append(items, i) } if err := rows.Err(); err != nil { return nil, err } return items, nil } const setAgentRuntimeOffline = `-- name: SetAgentRuntimeOffline :exec UPDATE agent_runtime SET status = 'offline', updated_at = now() WHERE id = $1 ` func (q *Queries) SetAgentRuntimeOffline(ctx context.Context, id pgtype.UUID) error { _, err := q.db.Exec(ctx, setAgentRuntimeOffline, id) return err } const touchAgentRuntimeLastSeen = `-- name: TouchAgentRuntimeLastSeen :execrows UPDATE agent_runtime SET last_seen_at = now() WHERE id = $1 AND status = 'online' ` // Bumps last_seen_at on an already-online runtime. Deliberately does NOT // touch status or updated_at: status is unchanged on the hot heartbeat path, // and avoiding updated_at keeps the row HOT-eligible (no index columns // change) and avoids invalidating any downstream consumer that watches // updated_at. // // The status='online' predicate is load-bearing: callers read rt.Status from // a prior SELECT and may race with the sweeper, which can flip the row to // offline between that SELECT and this UPDATE. Without the predicate this // query would silently leave a freshly-heartbeated runtime stuck in offline. // Returning affected rows lets callers detect that race and fall back to // MarkAgentRuntimeOnline to flip the row back online. func (q *Queries) TouchAgentRuntimeLastSeen(ctx context.Context, id pgtype.UUID) (int64, error) { result, err := q.db.Exec(ctx, touchAgentRuntimeLastSeen, id) if err != nil { return 0, err } return result.RowsAffected(), nil } const touchAgentRuntimesLastSeenBatch = `-- name: TouchAgentRuntimesLastSeenBatch :execrows UPDATE agent_runtime SET last_seen_at = now() WHERE id = ANY($1::uuid[]) AND status = 'online' ` // Bulk variant of TouchAgentRuntimeLastSeen used by the BatchedHeartbeatScheduler: // coalesces N per-runtime "bump last_seen_at" requests into a single UPDATE so a // fleet beating every 15s costs ~1 DB transaction per batch tick instead of N. // // Same load-bearing predicate as the single-id form: status='online' avoids // silently un-deleting a sweeper-flipped offline row, and we deliberately do // NOT touch updated_at so the rows stay HOT-eligible. Affected-rows < len(ids) // means some IDs raced to offline between Schedule and flush; their next beat // will fall through the recordHeartbeat sync path and call MarkAgentRuntimeOnline. func (q *Queries) TouchAgentRuntimesLastSeenBatch(ctx context.Context, ids []pgtype.UUID) (int64, error) { result, err := q.db.Exec(ctx, touchAgentRuntimesLastSeenBatch, ids) if err != nil { return 0, err } return result.RowsAffected(), nil } const updateAgentRuntimeVisibility = `-- name: UpdateAgentRuntimeVisibility :one UPDATE agent_runtime SET visibility = $1, updated_at = now() WHERE id = $2 RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id ` type UpdateAgentRuntimeVisibilityParams struct { Visibility string `json:"visibility"` ID pgtype.UUID `json:"id"` } // Toggles a runtime between 'private' (only owner can bind agents) and // 'public' (any workspace member can). Default for new rows is 'private' // (see migration 083). Gated at the handler layer to owner / workspace // admin only. func (q *Queries) UpdateAgentRuntimeVisibility(ctx context.Context, arg UpdateAgentRuntimeVisibilityParams) (AgentRuntime, error) { row := q.db.QueryRow(ctx, updateAgentRuntimeVisibility, arg.Visibility, arg.ID) var i AgentRuntime err := row.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, ) return i, err } const upsertAgentRuntime = `-- name: UpsertAgentRuntime :one INSERT INTO agent_runtime ( workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, owner_id, last_seen_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, now()) ON CONFLICT (workspace_id, daemon_id, provider) WHERE profile_id IS NULL DO UPDATE SET name = EXCLUDED.name, runtime_mode = EXCLUDED.runtime_mode, status = EXCLUDED.status, device_info = EXCLUDED.device_info, metadata = EXCLUDED.metadata, owner_id = COALESCE(EXCLUDED.owner_id, agent_runtime.owner_id), last_seen_at = now(), updated_at = now() RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id, (xmax = 0) AS inserted ` type UpsertAgentRuntimeParams struct { WorkspaceID pgtype.UUID `json:"workspace_id"` DaemonID pgtype.Text `json:"daemon_id"` Name string `json:"name"` RuntimeMode string `json:"runtime_mode"` Provider string `json:"provider"` Status string `json:"status"` DeviceInfo string `json:"device_info"` Metadata []byte `json:"metadata"` OwnerID pgtype.UUID `json:"owner_id"` } type UpsertAgentRuntimeRow struct { ID pgtype.UUID `json:"id"` WorkspaceID pgtype.UUID `json:"workspace_id"` DaemonID pgtype.Text `json:"daemon_id"` Name string `json:"name"` RuntimeMode string `json:"runtime_mode"` Provider string `json:"provider"` Status string `json:"status"` DeviceInfo string `json:"device_info"` Metadata []byte `json:"metadata"` LastSeenAt pgtype.Timestamptz `json:"last_seen_at"` CreatedAt pgtype.Timestamptz `json:"created_at"` UpdatedAt pgtype.Timestamptz `json:"updated_at"` OwnerID pgtype.UUID `json:"owner_id"` LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"` Visibility string `json:"visibility"` ProfileID pgtype.UUID `json:"profile_id"` Inserted bool `json:"inserted"` } // (xmax = 0) AS inserted distinguishes a fresh insert (true) from an upsert // that updated an existing row (false). Analytics reads this to fire // runtime_registered/runtime_ready only on first-time registration. // Built-in runtimes carry no profile_id. The arbiter is the partial unique // index from migration 121 (WHERE profile_id IS NULL); the predicate must be // spelled out so Postgres selects that partial index, not the custom-runtime // one on (workspace_id, daemon_id, profile_id). func (q *Queries) UpsertAgentRuntime(ctx context.Context, arg UpsertAgentRuntimeParams) (UpsertAgentRuntimeRow, error) { row := q.db.QueryRow(ctx, upsertAgentRuntime, arg.WorkspaceID, arg.DaemonID, arg.Name, arg.RuntimeMode, arg.Provider, arg.Status, arg.DeviceInfo, arg.Metadata, arg.OwnerID, ) var i UpsertAgentRuntimeRow err := row.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, &i.Inserted, ) return i, err } const upsertAgentRuntimeWithProfile = `-- name: UpsertAgentRuntimeWithProfile :one INSERT INTO agent_runtime ( workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, owner_id, profile_id, last_seen_at ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, now()) ON CONFLICT (workspace_id, daemon_id, profile_id) WHERE profile_id IS NOT NULL DO UPDATE SET name = EXCLUDED.name, runtime_mode = EXCLUDED.runtime_mode, provider = EXCLUDED.provider, status = EXCLUDED.status, device_info = EXCLUDED.device_info, metadata = EXCLUDED.metadata, owner_id = COALESCE(EXCLUDED.owner_id, agent_runtime.owner_id), last_seen_at = now(), updated_at = now() RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, visibility, profile_id, (xmax = 0) AS inserted ` type UpsertAgentRuntimeWithProfileParams struct { WorkspaceID pgtype.UUID `json:"workspace_id"` DaemonID pgtype.Text `json:"daemon_id"` Name string `json:"name"` RuntimeMode string `json:"runtime_mode"` Provider string `json:"provider"` Status string `json:"status"` DeviceInfo string `json:"device_info"` Metadata []byte `json:"metadata"` OwnerID pgtype.UUID `json:"owner_id"` ProfileID pgtype.UUID `json:"profile_id"` } type UpsertAgentRuntimeWithProfileRow struct { ID pgtype.UUID `json:"id"` WorkspaceID pgtype.UUID `json:"workspace_id"` DaemonID pgtype.Text `json:"daemon_id"` Name string `json:"name"` RuntimeMode string `json:"runtime_mode"` Provider string `json:"provider"` Status string `json:"status"` DeviceInfo string `json:"device_info"` Metadata []byte `json:"metadata"` LastSeenAt pgtype.Timestamptz `json:"last_seen_at"` CreatedAt pgtype.Timestamptz `json:"created_at"` UpdatedAt pgtype.Timestamptz `json:"updated_at"` OwnerID pgtype.UUID `json:"owner_id"` LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"` Visibility string `json:"visibility"` ProfileID pgtype.UUID `json:"profile_id"` Inserted bool `json:"inserted"` } // Custom-runtime registration: a daemon resolved a workspace runtime_profile's // command_name on PATH and is registering an instance of it. The arbiter is the // partial unique index from migration 120 (WHERE profile_id IS NOT NULL), so a // single daemon can host the built-in provider AND any number of custom // profiles of the same protocol family. provider stays the protocol family so // task routing (agent.New(provider)) is unchanged; profile_id is the stable // identity. (xmax = 0) AS inserted mirrors UpsertAgentRuntime. func (q *Queries) UpsertAgentRuntimeWithProfile(ctx context.Context, arg UpsertAgentRuntimeWithProfileParams) (UpsertAgentRuntimeWithProfileRow, error) { row := q.db.QueryRow(ctx, upsertAgentRuntimeWithProfile, arg.WorkspaceID, arg.DaemonID, arg.Name, arg.RuntimeMode, arg.Provider, arg.Status, arg.DeviceInfo, arg.Metadata, arg.OwnerID, arg.ProfileID, ) var i UpsertAgentRuntimeWithProfileRow err := row.Scan( &i.ID, &i.WorkspaceID, &i.DaemonID, &i.Name, &i.RuntimeMode, &i.Provider, &i.Status, &i.DeviceInfo, &i.Metadata, &i.LastSeenAt, &i.CreatedAt, &i.UpdatedAt, &i.OwnerID, &i.LegacyDaemonID, &i.Visibility, &i.ProfileID, &i.Inserted, ) return i, err }