Files
multica/server/pkg/db/generated/runtime.sql.go
Multica Eve d6349c16ec feat(runtime): per-runtime timezone for token-usage aggregation (MUL-1950) (#2394)
* feat: per-runtime timezone for token usage aggregation

The runtime token-usage charts (daily and hourly tabs on the
runtime-detail page) bucketed every event by the Postgres session
timezone, which is UTC in production. For an operator in UTC+8 that
meant a Tuesday afternoon's tasks landed in Tuesday early-morning's
bar — the chart was always one off.

Fix: store an IANA timezone on agent_runtime and aggregate under it.

* migrations 081 / 082 add agent_runtime.timezone (TEXT NOT NULL
  DEFAULT 'UTC') and rebuild the rollup pipeline (window function
  and both trigger functions) to compute bucket_date with
  AT TIME ZONE rt.timezone instead of bare DATE().
* No historical backfill — task_usage_daily rows already on disk
  keep their UTC bucket_date; only future writes / re-touches
  recompute under the new tz. (Product call from MUL-1950: 'guarantee
  future correctness'.)
* runtime_usage.sql gains a @tz parameter on ListRuntimeUsage and
  GetRuntimeUsageByHour and threads tz through GetRuntimeTaskHourly  Activity. ListRuntimeUsageDaily reads bucket_date as-is since the
  rollup already wrote it in tz.
* parseSinceParamInTZ replaces the raw N×24h cutoff with start-of-
  day-N in the runtime's tz so 'last 7 days' lines up with bucket
  boundaries.
* Daemon registration sends the host's IANA tz (TZ env, then
  time.Local), and UpsertAgentRuntime preserves any user override
  via a CASE-on-existing-value pattern so a daemon reconnect can't
  silently revert the operator's setting.
* New PATCH /api/runtimes/:id endpoint (UpdateAgentRuntime) lets
  the runtime detail page edit the tz; the editor seeds with the
  browser tz on first interaction.

Refs: MUL-1950

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* fix: harden runtime timezone rollups

Co-authored-by: multica-agent <github@multica.ai>

* fix: address runtime timezone review nits

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: Eve <eve@multica.ai>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
Co-authored-by: Eve <eve@multica-ai.local>
2026-05-11 14:39:35 +08:00

834 lines
27 KiB
Go

// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: runtime.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const countActiveAgentsByRuntime = `-- name: CountActiveAgentsByRuntime :one
SELECT count(*) FROM agent WHERE runtime_id = $1 AND archived_at IS NULL
`
func (q *Queries) CountActiveAgentsByRuntime(ctx context.Context, runtimeID pgtype.UUID) (int64, error) {
row := q.db.QueryRow(ctx, countActiveAgentsByRuntime, runtimeID)
var count int64
err := row.Scan(&count)
return count, err
}
const deleteAgentRuntime = `-- name: DeleteAgentRuntime :exec
DELETE FROM agent_runtime WHERE id = $1
`
func (q *Queries) DeleteAgentRuntime(ctx context.Context, id pgtype.UUID) error {
_, err := q.db.Exec(ctx, deleteAgentRuntime, id)
return err
}
const deleteArchivedAgentsByRuntime = `-- name: DeleteArchivedAgentsByRuntime :exec
DELETE FROM agent WHERE runtime_id = $1 AND archived_at IS NOT NULL
`
func (q *Queries) DeleteArchivedAgentsByRuntime(ctx context.Context, runtimeID pgtype.UUID) error {
_, err := q.db.Exec(ctx, deleteArchivedAgentsByRuntime, runtimeID)
return err
}
const deleteStaleOfflineRuntimes = `-- name: DeleteStaleOfflineRuntimes :many
DELETE FROM agent_runtime
WHERE status = 'offline'
AND last_seen_at < now() - make_interval(secs => $1::double precision)
AND id NOT IN (SELECT DISTINCT runtime_id FROM agent)
RETURNING id, workspace_id
`
type DeleteStaleOfflineRuntimesRow struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
// Deletes runtimes that have been offline for longer than the TTL and have
// no agents bound (active or archived). The FK constraint on agent.runtime_id
// is ON DELETE RESTRICT, so we must exclude all agent references.
func (q *Queries) DeleteStaleOfflineRuntimes(ctx context.Context, staleSeconds float64) ([]DeleteStaleOfflineRuntimesRow, error) {
rows, err := q.db.Query(ctx, deleteStaleOfflineRuntimes, staleSeconds)
if err != nil {
return nil, err
}
defer rows.Close()
items := []DeleteStaleOfflineRuntimesRow{}
for rows.Next() {
var i DeleteStaleOfflineRuntimesRow
if err := rows.Scan(&i.ID, &i.WorkspaceID); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const deleteTaskUsageDailyDirtyForRuntime = `-- name: DeleteTaskUsageDailyDirtyForRuntime :execrows
DELETE FROM task_usage_daily_dirty
WHERE runtime_id = $1
`
// Drop queued dirty keys computed under the old timezone; the ordered rebuild
// in the same transaction will write the current aggregate instead.
func (q *Queries) DeleteTaskUsageDailyDirtyForRuntime(ctx context.Context, runtimeID pgtype.UUID) (int64, error) {
result, err := q.db.Exec(ctx, deleteTaskUsageDailyDirtyForRuntime, runtimeID)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const deleteTaskUsageDailyForRuntime = `-- name: DeleteTaskUsageDailyForRuntime :execrows
DELETE FROM task_usage_daily
WHERE runtime_id = $1
`
// First step of an explicit user timezone edit rebuild. Delete old materialized
// rows before re-inserting under the runtime's new timezone.
func (q *Queries) DeleteTaskUsageDailyForRuntime(ctx context.Context, runtimeID pgtype.UUID) (int64, error) {
result, err := q.db.Exec(ctx, deleteTaskUsageDailyForRuntime, runtimeID)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const failTasksForOfflineRuntimes = `-- name: FailTasksForOfflineRuntimes :many
UPDATE agent_task_queue
SET status = 'failed', completed_at = now(), error = 'runtime went offline',
failure_reason = 'runtime_offline'
WHERE status IN ('dispatched', 'running')
AND runtime_id IN (
SELECT id FROM agent_runtime WHERE status = 'offline'
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
`
// Marks dispatched/running tasks as failed when their runtime is offline.
// This cleans up orphaned tasks after a daemon crash or network partition.
func (q *Queries) FailTasksForOfflineRuntimes(ctx context.Context) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, failTasksForOfflineRuntimes)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const findLegacyRuntimesByDaemonID = `-- name: FindLegacyRuntimesByDaemonID :many
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone FROM agent_runtime
WHERE workspace_id = $1
AND provider = $2
AND LOWER(daemon_id) = LOWER($3)
`
type FindLegacyRuntimesByDaemonIDParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Provider string `json:"provider"`
DaemonID string `json:"daemon_id"`
}
// Looks up runtime rows keyed on a prior (hostname-derived) daemon_id. Used
// at register-time to find rows owned by the same machine under its old
// identity so agents/tasks can be re-pointed at the new UUID-keyed row.
//
// Comparison is case-insensitive because os.Hostname() has been observed to
// return different casings on the same machine (e.g. `Jiayuans-MacBook-Pro`
// vs `jiayuans-macbook-pro`) across reboots/mDNS state changes. A case-
// sensitive `=` would strand the old row; LOWER() on both sides handles drift
// without forcing the daemon to enumerate cased permutations.
//
// Returns many rather than one because case drift may have already minted
// duplicate rows historically (e.g. `Foo.local` AND `foo.local` under the
// same workspace+provider). A single-row lookup would consolidate only one
// of them and leave the rest orphaned. Callers must merge every returned
// row into the new UUID-keyed runtime.
func (q *Queries) FindLegacyRuntimesByDaemonID(ctx context.Context, arg FindLegacyRuntimesByDaemonIDParams) ([]AgentRuntime, error) {
rows, err := q.db.Query(ctx, findLegacyRuntimesByDaemonID, arg.WorkspaceID, arg.Provider, arg.DaemonID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentRuntime{}
for rows.Next() {
var i AgentRuntime
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
&i.Timezone,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getAgentRuntime = `-- name: GetAgentRuntime :one
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone FROM agent_runtime
WHERE id = $1
`
func (q *Queries) GetAgentRuntime(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
row := q.db.QueryRow(ctx, getAgentRuntime, id)
var i AgentRuntime
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
&i.Timezone,
)
return i, err
}
const getAgentRuntimeForWorkspace = `-- name: GetAgentRuntimeForWorkspace :one
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone FROM agent_runtime
WHERE id = $1 AND workspace_id = $2
`
type GetAgentRuntimeForWorkspaceParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
func (q *Queries) GetAgentRuntimeForWorkspace(ctx context.Context, arg GetAgentRuntimeForWorkspaceParams) (AgentRuntime, error) {
row := q.db.QueryRow(ctx, getAgentRuntimeForWorkspace, arg.ID, arg.WorkspaceID)
var i AgentRuntime
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
&i.Timezone,
)
return i, err
}
const insertTaskUsageDailyForRuntime = `-- name: InsertTaskUsageDailyForRuntime :execrows
INSERT INTO task_usage_daily AS d (
bucket_date, workspace_id, runtime_id, provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
event_count
)
SELECT
DATE(tu.created_at AT TIME ZONE rt.timezone) AS bucket_date,
a.workspace_id,
atq.runtime_id,
tu.provider,
tu.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
COUNT(*)::bigint AS event_count
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN agent a ON a.id = atq.agent_id
JOIN agent_runtime rt ON rt.id = atq.runtime_id
WHERE atq.runtime_id = $1
GROUP BY 1, 2, 3, 4, 5
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
event_count = EXCLUDED.event_count,
updated_at = now()
`
// Final step of an explicit user timezone edit rebuild. This is intentionally
// called only for user edits, not by the migration itself: deploys do not
// backfill history, but a user-driven change must not leave old UTC rows next
// to newly computed local rows. This scans all history for the edited runtime;
// timezone edits are owner/admin operations and are expected to be rare.
func (q *Queries) InsertTaskUsageDailyForRuntime(ctx context.Context, runtimeID pgtype.UUID) (int64, error) {
result, err := q.db.Exec(ctx, insertTaskUsageDailyForRuntime, runtimeID)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const listAgentRuntimes = `-- name: ListAgentRuntimes :many
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone FROM agent_runtime
WHERE workspace_id = $1
ORDER BY created_at ASC
`
func (q *Queries) ListAgentRuntimes(ctx context.Context, workspaceID pgtype.UUID) ([]AgentRuntime, error) {
rows, err := q.db.Query(ctx, listAgentRuntimes, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentRuntime{}
for rows.Next() {
var i AgentRuntime
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
&i.Timezone,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAgentRuntimesByOwner = `-- name: ListAgentRuntimesByOwner :many
SELECT id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone FROM agent_runtime
WHERE workspace_id = $1 AND owner_id = $2
ORDER BY created_at ASC
`
type ListAgentRuntimesByOwnerParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
OwnerID pgtype.UUID `json:"owner_id"`
}
func (q *Queries) ListAgentRuntimesByOwner(ctx context.Context, arg ListAgentRuntimesByOwnerParams) ([]AgentRuntime, error) {
rows, err := q.db.Query(ctx, listAgentRuntimesByOwner, arg.WorkspaceID, arg.OwnerID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentRuntime{}
for rows.Next() {
var i AgentRuntime
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
&i.Timezone,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const lockTaskUsageDailyRollup = `-- name: LockTaskUsageDailyRollup :exec
SELECT pg_advisory_xact_lock(4242)
`
// Serialize explicit timezone rebuilds with rollup_task_usage_daily(), which
// uses the same advisory key in migration 073. This prevents cron from
// writing old-timezone buckets while PATCH is deleting/rebuilding rows.
func (q *Queries) LockTaskUsageDailyRollup(ctx context.Context) error {
_, err := q.db.Exec(ctx, lockTaskUsageDailyRollup)
return err
}
const markAgentRuntimeOnline = `-- name: MarkAgentRuntimeOnline :one
UPDATE agent_runtime
SET status = 'online', last_seen_at = now(), updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone
`
// Used on the offline→online transition (and on first heartbeat after
// registration). Writes status, last_seen_at, and updated_at because the
// status flip is a real state change and we want updated_at to reflect it.
func (q *Queries) MarkAgentRuntimeOnline(ctx context.Context, id pgtype.UUID) (AgentRuntime, error) {
row := q.db.QueryRow(ctx, markAgentRuntimeOnline, id)
var i AgentRuntime
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
&i.Timezone,
)
return i, err
}
const markRuntimesOfflineByIDs = `-- name: MarkRuntimesOfflineByIDs :many
UPDATE agent_runtime
SET status = 'offline', updated_at = now()
WHERE status = 'online'
AND id = ANY($1::uuid[])
AND last_seen_at < now() - make_interval(secs => $2::double precision)
RETURNING id, workspace_id, owner_id, daemon_id, provider
`
type MarkRuntimesOfflineByIDsParams struct {
Ids []pgtype.UUID `json:"ids"`
StaleSeconds float64 `json:"stale_seconds"`
}
type MarkRuntimesOfflineByIDsRow struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
OwnerID pgtype.UUID `json:"owner_id"`
DaemonID pgtype.Text `json:"daemon_id"`
Provider string `json:"provider"`
}
// Flips a known set of runtime IDs from online to offline. Paired with
// SelectStaleOnlineRuntimes in the sweeper so the candidate selection and
// the actual write are decoupled (the LivenessStore filter sits between).
//
// Re-checks the stale predicate inside the UPDATE so a concurrent heartbeat
// between the SELECT (candidate gather), the LivenessStore filter, and this
// UPDATE cannot demote a runtime that just refreshed last_seen_at. The
// legacy MarkStaleRuntimesOffline UPDATE had this property implicitly
// because the predicate and the write lived in one statement; here we
// carry it forward explicitly so the SELECT/filter/UPDATE pipeline retains
// the same race-freedom.
func (q *Queries) MarkRuntimesOfflineByIDs(ctx context.Context, arg MarkRuntimesOfflineByIDsParams) ([]MarkRuntimesOfflineByIDsRow, error) {
rows, err := q.db.Query(ctx, markRuntimesOfflineByIDs, arg.Ids, arg.StaleSeconds)
if err != nil {
return nil, err
}
defer rows.Close()
items := []MarkRuntimesOfflineByIDsRow{}
for rows.Next() {
var i MarkRuntimesOfflineByIDsRow
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.OwnerID,
&i.DaemonID,
&i.Provider,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const reassignAgentsToRuntime = `-- name: ReassignAgentsToRuntime :execrows
UPDATE agent
SET runtime_id = $1
WHERE runtime_id = $2
`
type ReassignAgentsToRuntimeParams struct {
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
OldRuntimeID pgtype.UUID `json:"old_runtime_id"`
}
// Re-points every agent referencing old_runtime_id at new_runtime_id.
func (q *Queries) ReassignAgentsToRuntime(ctx context.Context, arg ReassignAgentsToRuntimeParams) (int64, error) {
result, err := q.db.Exec(ctx, reassignAgentsToRuntime, arg.NewRuntimeID, arg.OldRuntimeID)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const reassignTasksToRuntime = `-- name: ReassignTasksToRuntime :execrows
UPDATE agent_task_queue
SET runtime_id = $1
WHERE runtime_id = $2
`
type ReassignTasksToRuntimeParams struct {
NewRuntimeID pgtype.UUID `json:"new_runtime_id"`
OldRuntimeID pgtype.UUID `json:"old_runtime_id"`
}
// Re-points every queued/running/completed task referencing old_runtime_id.
// Required before deleting the old runtime row because agent_task_queue has
// an ON DELETE CASCADE FK that would otherwise drop historical tasks.
func (q *Queries) ReassignTasksToRuntime(ctx context.Context, arg ReassignTasksToRuntimeParams) (int64, error) {
result, err := q.db.Exec(ctx, reassignTasksToRuntime, arg.NewRuntimeID, arg.OldRuntimeID)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const recordRuntimeLegacyDaemonID = `-- name: RecordRuntimeLegacyDaemonID :exec
UPDATE agent_runtime
SET legacy_daemon_id = COALESCE(legacy_daemon_id, $2)
WHERE id = $1
`
type RecordRuntimeLegacyDaemonIDParams struct {
ID pgtype.UUID `json:"id"`
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
}
// Remembers the most recent hostname-derived daemon_id that was merged into
// this row. Useful for debugging when tracing back why a given runtime row
// subsumed an old one, and only overwrites NULL so the earliest merge is
// preserved.
func (q *Queries) RecordRuntimeLegacyDaemonID(ctx context.Context, arg RecordRuntimeLegacyDaemonIDParams) error {
_, err := q.db.Exec(ctx, recordRuntimeLegacyDaemonID, arg.ID, arg.LegacyDaemonID)
return err
}
const selectStaleOnlineRuntimes = `-- name: SelectStaleOnlineRuntimes :many
SELECT id, workspace_id, owner_id, daemon_id, provider FROM agent_runtime
WHERE status = 'online'
AND last_seen_at < now() - make_interval(secs => $1::double precision)
`
type SelectStaleOnlineRuntimesRow struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
OwnerID pgtype.UUID `json:"owner_id"`
DaemonID pgtype.Text `json:"daemon_id"`
Provider string `json:"provider"`
}
// Lists online runtimes whose last_seen_at exceeds the stale window. The
// sweeper uses this as a candidate set, then optionally filters via the
// LivenessStore before flipping rows to offline (a fresh Redis liveness
// record means the DB row is just lagging, not actually dead).
func (q *Queries) SelectStaleOnlineRuntimes(ctx context.Context, staleSeconds float64) ([]SelectStaleOnlineRuntimesRow, error) {
rows, err := q.db.Query(ctx, selectStaleOnlineRuntimes, staleSeconds)
if err != nil {
return nil, err
}
defer rows.Close()
items := []SelectStaleOnlineRuntimesRow{}
for rows.Next() {
var i SelectStaleOnlineRuntimesRow
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.OwnerID,
&i.DaemonID,
&i.Provider,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const setAgentRuntimeOffline = `-- name: SetAgentRuntimeOffline :exec
UPDATE agent_runtime
SET status = 'offline', updated_at = now()
WHERE id = $1
`
func (q *Queries) SetAgentRuntimeOffline(ctx context.Context, id pgtype.UUID) error {
_, err := q.db.Exec(ctx, setAgentRuntimeOffline, id)
return err
}
const touchAgentRuntimeLastSeen = `-- name: TouchAgentRuntimeLastSeen :execrows
UPDATE agent_runtime
SET last_seen_at = now()
WHERE id = $1 AND status = 'online'
`
// Bumps last_seen_at on an already-online runtime. Deliberately does NOT
// touch status or updated_at: status is unchanged on the hot heartbeat path,
// and avoiding updated_at keeps the row HOT-eligible (no index columns
// change) and avoids invalidating any downstream consumer that watches
// updated_at.
//
// The status='online' predicate is load-bearing: callers read rt.Status from
// a prior SELECT and may race with the sweeper, which can flip the row to
// offline between that SELECT and this UPDATE. Without the predicate this
// query would silently leave a freshly-heartbeated runtime stuck in offline.
// Returning affected rows lets callers detect that race and fall back to
// MarkAgentRuntimeOnline to flip the row back online.
func (q *Queries) TouchAgentRuntimeLastSeen(ctx context.Context, id pgtype.UUID) (int64, error) {
result, err := q.db.Exec(ctx, touchAgentRuntimeLastSeen, id)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const touchAgentRuntimesLastSeenBatch = `-- name: TouchAgentRuntimesLastSeenBatch :execrows
UPDATE agent_runtime
SET last_seen_at = now()
WHERE id = ANY($1::uuid[]) AND status = 'online'
`
// Bulk variant of TouchAgentRuntimeLastSeen used by the BatchedHeartbeatScheduler:
// coalesces N per-runtime "bump last_seen_at" requests into a single UPDATE so a
// fleet beating every 15s costs ~1 DB transaction per batch tick instead of N.
//
// Same load-bearing predicate as the single-id form: status='online' avoids
// silently un-deleting a sweeper-flipped offline row, and we deliberately do
// NOT touch updated_at so the rows stay HOT-eligible. Affected-rows < len(ids)
// means some IDs raced to offline between Schedule and flush; their next beat
// will fall through the recordHeartbeat sync path and call MarkAgentRuntimeOnline.
func (q *Queries) TouchAgentRuntimesLastSeenBatch(ctx context.Context, ids []pgtype.UUID) (int64, error) {
result, err := q.db.Exec(ctx, touchAgentRuntimesLastSeenBatch, ids)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const updateAgentRuntimeTimezone = `-- name: UpdateAgentRuntimeTimezone :one
UPDATE agent_runtime
SET timezone = $1, updated_at = now()
WHERE id = $2
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone
`
type UpdateAgentRuntimeTimezoneParams struct {
Timezone string `json:"timezone"`
ID pgtype.UUID `json:"id"`
}
// Operator-driven override of the runtime's reporting timezone (MUL-1950).
func (q *Queries) UpdateAgentRuntimeTimezone(ctx context.Context, arg UpdateAgentRuntimeTimezoneParams) (AgentRuntime, error) {
row := q.db.QueryRow(ctx, updateAgentRuntimeTimezone, arg.Timezone, arg.ID)
var i AgentRuntime
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
&i.Timezone,
)
return i, err
}
const upsertAgentRuntime = `-- name: UpsertAgentRuntime :one
INSERT INTO agent_runtime (
workspace_id,
daemon_id,
name,
runtime_mode,
provider,
status,
device_info,
metadata,
owner_id,
timezone,
last_seen_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, now())
ON CONFLICT (workspace_id, daemon_id, provider)
DO UPDATE SET
name = EXCLUDED.name,
runtime_mode = EXCLUDED.runtime_mode,
status = EXCLUDED.status,
device_info = EXCLUDED.device_info,
metadata = EXCLUDED.metadata,
owner_id = COALESCE(EXCLUDED.owner_id, agent_runtime.owner_id),
last_seen_at = now(),
updated_at = now()
RETURNING id, workspace_id, daemon_id, name, runtime_mode, provider, status, device_info, metadata, last_seen_at, created_at, updated_at, owner_id, legacy_daemon_id, timezone, (xmax = 0) AS inserted
`
type UpsertAgentRuntimeParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
DaemonID pgtype.Text `json:"daemon_id"`
Name string `json:"name"`
RuntimeMode string `json:"runtime_mode"`
Provider string `json:"provider"`
Status string `json:"status"`
DeviceInfo string `json:"device_info"`
Metadata []byte `json:"metadata"`
OwnerID pgtype.UUID `json:"owner_id"`
Timezone string `json:"timezone"`
}
type UpsertAgentRuntimeRow struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
DaemonID pgtype.Text `json:"daemon_id"`
Name string `json:"name"`
RuntimeMode string `json:"runtime_mode"`
Provider string `json:"provider"`
Status string `json:"status"`
DeviceInfo string `json:"device_info"`
Metadata []byte `json:"metadata"`
LastSeenAt pgtype.Timestamptz `json:"last_seen_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
OwnerID pgtype.UUID `json:"owner_id"`
LegacyDaemonID pgtype.Text `json:"legacy_daemon_id"`
Timezone string `json:"timezone"`
Inserted bool `json:"inserted"`
}
// (xmax = 0) AS inserted distinguishes a fresh insert (true) from an upsert
// that updated an existing row (false). Analytics reads this to fire
// runtime_registered/runtime_ready only on first-time registration.
//
// @timezone is set on INSERT only. On conflict we deliberately KEEP the
// existing agent_runtime.timezone — once an operator overrides the tz via
// the web UI we don't want a daemon reconnect (which sends its own system
// tz) to silently revert it. Daemons can still set the initial value when
// they're the first to register a brand-new runtime row.
func (q *Queries) UpsertAgentRuntime(ctx context.Context, arg UpsertAgentRuntimeParams) (UpsertAgentRuntimeRow, error) {
row := q.db.QueryRow(ctx, upsertAgentRuntime,
arg.WorkspaceID,
arg.DaemonID,
arg.Name,
arg.RuntimeMode,
arg.Provider,
arg.Status,
arg.DeviceInfo,
arg.Metadata,
arg.OwnerID,
arg.Timezone,
)
var i UpsertAgentRuntimeRow
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.DaemonID,
&i.Name,
&i.RuntimeMode,
&i.Provider,
&i.Status,
&i.DeviceInfo,
&i.Metadata,
&i.LastSeenAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.OwnerID,
&i.LegacyDaemonID,
&i.Timezone,
&i.Inserted,
)
return i, err
}