mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 21:39:54 +02:00
654 lines
20 KiB
Go
654 lines
20 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.30.0
|
|
// source: runtime.sql
|
|
|
|
package db
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const countActiveAgentsByRuntime = `-- name: CountActiveAgentsByRuntime :one
|
|
SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL
|
|
`
|
|
|
|
func (q *Queries) CountActiveAgentsByRuntime(ctx context.Context, runtimeID pgtype.UUID) (int64, error) {
|
|
row := q.db.QueryRow(ctx, countActiveAgentsByRuntime, runtimeID)
|
|
var count int64
|
|
err := row.Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
const deleteAgentRuntime = `-- name: DeleteAgentRuntime :exec
|
|
DELETE FROM agent_runtime WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) DeleteAgentRuntime(ctx context.Context, id pgtype.UUID) error {
|
|
_, err := q.db.Exec(ctx, deleteAgentRuntime, id)
|
|
return err
|
|
}
|
|
|
|
const deleteArchivedAgentsByRuntime = `-- name: DeleteArchivedAgentsByRuntime :exec
|
|
DELETE FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL
|
|
`
|
|
|
|
func (q *Queries) DeleteArchivedAgentsByRuntime(ctx context.Context, runtimeID pgtype.UUID) error {
|
|
_, err := q.db.Exec(ctx, deleteArchivedAgentsByRuntime, runtimeID)
|
|
return err
|
|
}
|
|
|
|
const deleteStaleOfflineRuntimes = `-- name: DeleteStaleOfflineRuntimes :many
|
|
DELETE FROM agent_runtime
|
|
WHERE status = 'offline'
|
|
AND last_seen_at < now() - make_interval(secs => $1::double precision)
|
|
AND id NOT IN (SELECT DISTINCT runtime_id FROM agent)
|
|
RETURNING id, workspace_id
|
|
`
|
|
|
|
type DeleteStaleOfflineRuntimesRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
// Deletes runtimes that have been offline for longer than the TTL and have
|
|
// no agents bound (active or archived). The FK constraint on agent.runtime_id
|
|
// is ON DELETE RESTRICT, so we must exclude all agent references.
|
|
func (q *Queries) DeleteStaleOfflineRuntimes(ctx context.Context, staleSeconds float64) ([]DeleteStaleOfflineRuntimesRow, error) {
|
|
rows, err := q.db.Query(ctx, deleteStaleOfflineRuntimes, staleSeconds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []DeleteStaleOfflineRuntimesRow{}
|
|
for rows.Next() {
|
|
var i DeleteStaleOfflineRuntimesRow
|
|
if err := rows.Scan(&i.ID, &i.WorkspaceID); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const failTasksForOfflineRuntimes = `-- name: FailTasksForOfflineRuntimes :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed', completed_at = now(), error = 'runtime went offline',
|
|
failure_reason = 'runtime_offline'
|
|
WHERE status IN ('dispatched', 'running')
|
|
AND runtime_id IN (
|
|
SELECT id FROM agent_runtime WHERE status = 'offline'
|
|
)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
// Marks dispatched/running tasks as failed when their runtime is offline.
|
|
// This cleans up orphaned tasks after a daemon crash or network partition.
|
|
func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, failTasksForOfflineRuntimes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const findLegacyRuntimesByDaemonID = `-- name: FindLegacyRuntimesByDaemonID :many
|
|
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
|
WHERE workspace_id = $1
|
|
AND provider = $2
|
|
AND LOWER(daemon_id) = LOWER($3)
|
|
`
|
|
|
|
type FindLegacyRuntimesByDaemonIDParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Provider string `json:"provider"`
|
|
DaemonID string `json:"daemon_id"`
|
|
}
|
|
|
|
// Looks up runtime rows keyed on a prior (hostname-derived) daemon_id. Used
|
|
// at register-time to find rows owned by the same machine under its old
|
|
// identity so agents/tasks can be re-pointed at the new UUID-keyed row.
|
|
//
|
|
// Comparison is case-insensitive because os.Hostname() has been observed to
|
|
// return different casings on the same machine (e.g. `Jiayuans-MacBook-Pro`
|
|
// vs `jiayuans-macbook-pro`) across reboots/mDNS state changes. A case-
|
|
// sensitive `=` would strand the old row; LOWER() on both sides handles drift
|
|
// without forcing the daemon to enumerate cased permutations.
|
|
//
|
|
// Returns many rather than one because case drift may have already minted
|
|
// duplicate rows historically (e.g. `Foo.local` AND `foo.local` under the
|
|
// same workspace+provider). A single-row lookup would consolidate only one
|
|
// of them and leave the rest orphaned. Callers must merge every returned
|
|
// row into the new UUID-keyed runtime.
|
|
func (q *Queries) FindLegacyRuntimesByDaemonID(ctx context.Context, arg FindLegacyRuntimesByDaemonIDParams) ([]AgentRuntime, error) {
|
|
rows, err := q.db.Query(ctx, findLegacyRuntimesByDaemonID, arg.WorkspaceID, arg.Provider, arg.DaemonID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentRuntime{}
|
|
for rows.Next() {
|
|
var i AgentRuntime
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.OwnerID,
|
|
&i.LegacyDaemonID,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getAgentRuntime = `-- name: GetAgentRuntime :one
|
|
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
|
|
row := q.db.QueryRow(ctx, getAgentRuntime, id)
|
|
var i AgentRuntime
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.OwnerID,
|
|
&i.LegacyDaemonID,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentRuntimeForWorkspace = `-- name: GetAgentRuntimeForWorkspace :one
|
|
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
|
WHERE id = $1 AND workspace_id = $2
|
|
`
|
|
|
|
type GetAgentRuntimeForWorkspaceParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
func (q *Queries) GetAgentRuntimeForWorkspace(ctx context.Context, arg GetAgentRuntimeForWorkspaceParams) (AgentRuntime, error) {
|
|
row := q.db.QueryRow(ctx, getAgentRuntimeForWorkspace, arg.ID, arg.WorkspaceID)
|
|
var i AgentRuntime
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.OwnerID,
|
|
&i.LegacyDaemonID,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const listAgentRuntimes = `-- name: ListAgentRuntimes :many
|
|
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
|
WHERE workspace_id = $1
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID) ([]AgentRuntime, error) {
|
|
rows, err := q.db.Query(ctx, listAgentRuntimes, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentRuntime{}
|
|
for rows.Next() {
|
|
var i AgentRuntime
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.OwnerID,
|
|
&i.LegacyDaemonID,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgentRuntimesByOwner = `-- name: ListAgentRuntimesByOwner :many
|
|
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id FROM agent_runtime
|
|
WHERE workspace_id = $1 AND owner_id = $2
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
type ListAgentRuntimesByOwnerParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
OwnerID pgtype.UUID `json:"owner_id"`
|
|
}
|
|
|
|
func (q *Queries) ListAgentRuntimesByOwner(ctx context.Context, arg ListAgentRuntimesByOwnerParams) ([]AgentRuntime, error) {
|
|
rows, err := q.db.Query(ctx, listAgentRuntimesByOwner, arg.WorkspaceID, arg.OwnerID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentRuntime{}
|
|
for rows.Next() {
|
|
var i AgentRuntime
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.OwnerID,
|
|
&i.LegacyDaemonID,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const markAgentRuntimeOnline = `-- name: MarkAgentRuntimeOnline :one
|
|
UPDATE agent_runtime
|
|
SET status = 'online', last_seen_at = now(), updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id
|
|
`
|
|
|
|
// Used on the offline→online transition (and on first heartbeat after
|
|
// registration). Writes status, last_seen_at, and updated_at because the
|
|
// status flip is a real state change and we want updated_at to reflect it.
|
|
func (q *Queries) MarkAgentRuntimeOnline(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
|
|
row := q.db.QueryRow(ctx, markAgentRuntimeOnline, id)
|
|
var i AgentRuntime
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.OwnerID,
|
|
&i.LegacyDaemonID,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const markRuntimesOfflineByIDs = `-- name: MarkRuntimesOfflineByIDs :many
|
|
UPDATE agent_runtime
|
|
SET status = 'offline', updated_at = now()
|
|
WHERE status = 'online'
|
|
AND id = ANY($1::uuid[])
|
|
AND last_seen_at < now() - make_interval(secs => $2::double precision)
|
|
RETURNING id, workspace_id
|
|
`
|
|
|
|
type MarkRuntimesOfflineByIDsParams struct {
|
|
Ids []pgtype.UUID `json:"ids"`
|
|
StaleSeconds float64 `json:"stale_seconds"`
|
|
}
|
|
|
|
type MarkRuntimesOfflineByIDsRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
// Flips a known set of runtime IDs from online to offline. Paired with
|
|
// SelectStaleOnlineRuntimes in the sweeper so the candidate selection and
|
|
// the actual write are decoupled (the LivenessStore filter sits between).
|
|
//
|
|
// Re-checks the stale predicate inside the UPDATE so a concurrent heartbeat
|
|
// between the SELECT (candidate gather), the LivenessStore filter, and this
|
|
// UPDATE cannot demote a runtime that just refreshed last_seen_at. The
|
|
// legacy MarkStaleRuntimesOffline UPDATE had this property implicitly
|
|
// because the predicate and the write lived in one statement; here we
|
|
// carry it forward explicitly so the SELECT/filter/UPDATE pipeline retains
|
|
// the same race-freedom.
|
|
func (q *Queries) MarkRuntimesOfflineByIDs(ctx context.Context, arg MarkRuntimesOfflineByIDsParams) ([]MarkRuntimesOfflineByIDsRow, error) {
|
|
rows, err := q.db.Query(ctx, markRuntimesOfflineByIDs, arg.Ids, arg.StaleSeconds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []MarkRuntimesOfflineByIDsRow{}
|
|
for rows.Next() {
|
|
var i MarkRuntimesOfflineByIDsRow
|
|
if err := rows.Scan(&i.ID, &i.WorkspaceID); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const reassignAgentsToRuntime = `-- name: ReassignAgentsToRuntime :execrows
|
|
UPDATE agent
|
|
SET runtime_id = $1
|
|
WHERE runtime_id = $2
|
|
`
|
|
|
|
type ReassignAgentsToRuntimeParams struct {
|
|
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
|
|
OldRuntimeID pgtype.UUID `json:"old_runtime_id"`
|
|
}
|
|
|
|
// Re-points every agent referencing old_runtime_id at new_runtime_id.
|
|
func (q *Queries) ReassignAgentsToRuntime(ctx context.Context, arg ReassignAgentsToRuntimeParams) (int64, error) {
|
|
result, err := q.db.Exec(ctx, reassignAgentsToRuntime, arg.NewRuntimeID, arg.OldRuntimeID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.RowsAffected(), nil
|
|
}
|
|
|
|
const reassignTasksToRuntime = `-- name: ReassignTasksToRuntime :execrows
|
|
UPDATE agent_task_queue
|
|
SET runtime_id = $1
|
|
WHERE runtime_id = $2
|
|
`
|
|
|
|
type ReassignTasksToRuntimeParams struct {
|
|
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
|
|
OldRuntimeID pgtype.UUID `json:"old_runtime_id"`
|
|
}
|
|
|
|
// Re-points every queued/running/completed task referencing old_runtime_id.
|
|
// Required before deleting the old runtime row because agent_task_queue has
|
|
// an ON DELETE CASCADE FK that would otherwise drop historical tasks.
|
|
func (q *Queries) ReassignTasksToRuntime(ctx context.Context, arg ReassignTasksToRuntimeParams) (int64, error) {
|
|
result, err := q.db.Exec(ctx, reassignTasksToRuntime, arg.NewRuntimeID, arg.OldRuntimeID)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.RowsAffected(), nil
|
|
}
|
|
|
|
const recordRuntimeLegacyDaemonID = `-- name: RecordRuntimeLegacyDaemonID :exec
|
|
UPDATE agent_runtime
|
|
SET legacy_daemon_id = COALESCE(legacy_daemon_id, $2)
|
|
WHERE id = $1
|
|
`
|
|
|
|
type RecordRuntimeLegacyDaemonIDParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
|
|
}
|
|
|
|
// Remembers the most recent hostname-derived daemon_id that was merged into
|
|
// this row. Useful for debugging when tracing back why a given runtime row
|
|
// subsumed an old one, and only overwrites NULL so the earliest merge is
|
|
// preserved.
|
|
func (q *Queries) RecordRuntimeLegacyDaemonID(ctx context.Context, arg RecordRuntimeLegacyDaemonIDParams) error {
|
|
_, err := q.db.Exec(ctx, recordRuntimeLegacyDaemonID, arg.ID, arg.LegacyDaemonID)
|
|
return err
|
|
}
|
|
|
|
const selectStaleOnlineRuntimes = `-- name: SelectStaleOnlineRuntimes :many
|
|
SELECT id, workspace_id FROM agent_runtime
|
|
WHERE status = 'online'
|
|
AND last_seen_at < now() - make_interval(secs => $1::double precision)
|
|
`
|
|
|
|
type SelectStaleOnlineRuntimesRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
// Lists online runtimes whose last_seen_at exceeds the stale window. The
|
|
// sweeper uses this as a candidate set, then optionally filters via the
|
|
// LivenessStore before flipping rows to offline (a fresh Redis liveness
|
|
// record means the DB row is just lagging, not actually dead).
|
|
func (q *Queries) SelectStaleOnlineRuntimes(ctx context.Context, staleSeconds float64) ([]SelectStaleOnlineRuntimesRow, error) {
|
|
rows, err := q.db.Query(ctx, selectStaleOnlineRuntimes, staleSeconds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []SelectStaleOnlineRuntimesRow{}
|
|
for rows.Next() {
|
|
var i SelectStaleOnlineRuntimesRow
|
|
if err := rows.Scan(&i.ID, &i.WorkspaceID); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const setAgentRuntimeOffline = `-- name: SetAgentRuntimeOffline :exec
|
|
UPDATE agent_runtime
|
|
SET status = 'offline', updated_at = now()
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) SetAgentRuntimeOffline(ctx context.Context, id pgtype.UUID) error {
|
|
_, err := q.db.Exec(ctx, setAgentRuntimeOffline, id)
|
|
return err
|
|
}
|
|
|
|
const touchAgentRuntimeLastSeen = `-- name: TouchAgentRuntimeLastSeen :execrows
|
|
UPDATE agent_runtime
|
|
SET last_seen_at = now()
|
|
WHERE id = $1 AND status = 'online'
|
|
`
|
|
|
|
// Bumps last_seen_at on an already-online runtime. Deliberately does NOT
|
|
// touch status or updated_at: status is unchanged on the hot heartbeat path,
|
|
// and avoiding updated_at keeps the row HOT-eligible (no index columns
|
|
// change) and avoids invalidating any downstream consumer that watches
|
|
// updated_at.
|
|
//
|
|
// The status='online' predicate is load-bearing: callers read rt.Status from
|
|
// a prior SELECT and may race with the sweeper, which can flip the row to
|
|
// offline between that SELECT and this UPDATE. Without the predicate this
|
|
// query would silently leave a freshly-heartbeated runtime stuck in offline.
|
|
// Returning affected rows lets callers detect that race and fall back to
|
|
// MarkAgentRuntimeOnline to flip the row back online.
|
|
func (q *Queries) TouchAgentRuntimeLastSeen(ctx context.Context, id pgtype.UUID) (int64, error) {
|
|
result, err := q.db.Exec(ctx, touchAgentRuntimeLastSeen, id)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return result.RowsAffected(), nil
|
|
}
|
|
|
|
const upsertAgentRuntime = `-- name: UpsertAgentRuntime :one
|
|
INSERT INTO agent_runtime (
|
|
workspace_id,
|
|
daemon_id,
|
|
name,
|
|
runtime_mode,
|
|
provider,
|
|
status,
|
|
device_info,
|
|
metadata,
|
|
owner_id,
|
|
last_seen_at
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, now())
|
|
ON CONFLICT (workspace_id, daemon_id, provider)
|
|
DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
runtime_mode = EXCLUDED.runtime_mode,
|
|
status = EXCLUDED.status,
|
|
device_info = EXCLUDED.device_info,
|
|
metadata = EXCLUDED.metadata,
|
|
owner_id = COALESCE(EXCLUDED.owner_id, agent_runtime.owner_id),
|
|
last_seen_at = now(),
|
|
updated_at = now()
|
|
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, (xmax = 0) AS inserted
|
|
`
|
|
|
|
type UpsertAgentRuntimeParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
DaemonID pgtype.Text `json:"daemon_id"`
|
|
Name string `json:"name"`
|
|
RuntimeMode string `json:"runtime_mode"`
|
|
Provider string `json:"provider"`
|
|
Status string `json:"status"`
|
|
DeviceInfo string `json:"device_info"`
|
|
Metadata []byte `json:"metadata"`
|
|
OwnerID pgtype.UUID `json:"owner_id"`
|
|
}
|
|
|
|
type UpsertAgentRuntimeRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
DaemonID pgtype.Text `json:"daemon_id"`
|
|
Name string `json:"name"`
|
|
RuntimeMode string `json:"runtime_mode"`
|
|
Provider string `json:"provider"`
|
|
Status string `json:"status"`
|
|
DeviceInfo string `json:"device_info"`
|
|
Metadata []byte `json:"metadata"`
|
|
LastSeenAt pgtype.Timestamptz `json:"last_seen_at"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
|
OwnerID pgtype.UUID `json:"owner_id"`
|
|
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
|
|
Inserted bool `json:"inserted"`
|
|
}
|
|
|
|
// (xmax = 0) AS inserted distinguishes a fresh insert (true) from an upsert
|
|
// that updated an existing row (false). Analytics reads this to fire the
|
|
// runtime_registered event only on first-time registration.
|
|
func (q *Queries) UpsertAgentRuntime(ctx context.Context, arg UpsertAgentRuntimeParams) (UpsertAgentRuntimeRow, error) {
|
|
row := q.db.QueryRow(ctx, upsertAgentRuntime,
|
|
arg.WorkspaceID,
|
|
arg.DaemonID,
|
|
arg.Name,
|
|
arg.RuntimeMode,
|
|
arg.Provider,
|
|
arg.Status,
|
|
arg.DeviceInfo,
|
|
arg.Metadata,
|
|
arg.OwnerID,
|
|
)
|
|
var i UpsertAgentRuntimeRow
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.DaemonID,
|
|
&i.Name,
|
|
&i.RuntimeMode,
|
|
&i.Provider,
|
|
&i.Status,
|
|
&i.DeviceInfo,
|
|
&i.Metadata,
|
|
&i.LastSeenAt,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.OwnerID,
|
|
&i.LegacyDaemonID,
|
|
&i.Inserted,
|
|
)
|
|
return i, err
|
|
}
|