Files
multica/server/migrations/082_rollup_runtime_timezone.down.sql
Multica Eve d6349c16ec feat(runtime): per-runtime timezone for token-usage aggregation (MUL-1950) (#2394)
* feat: per-runtime timezone for token usage aggregation

The runtime token-usage charts (daily and hourly tabs on the
runtime-detail page) bucketed every event by the Postgres session
timezone, which is UTC in production. For an operator in UTC+8 that
meant a Tuesday afternoon's tasks landed in Tuesday early-morning's
bar — the chart was always one off.

Fix: store an IANA timezone on agent_runtime and aggregate under it.

* migrations 081 / 082 add agent_runtime.timezone (TEXT NOT NULL
  DEFAULT 'UTC') and rebuild the rollup pipeline (window function
  and both trigger functions) to compute bucket_date with
  AT TIME ZONE rt.timezone instead of bare DATE().
* No historical backfill — task_usage_daily rows already on disk
  keep their UTC bucket_date; only future writes / re-touches
  recompute under the new tz. (Product call from MUL-1950: 'guarantee
  future correctness'.)
* runtime_usage.sql gains a @tz parameter on ListRuntimeUsage and
  GetRuntimeUsageByHour and threads tz through GetRuntimeTaskHourly  Activity. ListRuntimeUsageDaily reads bucket_date as-is since the
  rollup already wrote it in tz.
* parseSinceParamInTZ replaces the raw N×24h cutoff with start-of-
  day-N in the runtime's tz so 'last 7 days' lines up with bucket
  boundaries.
* Daemon registration sends the host's IANA tz (TZ env, then
  time.Local), and UpsertAgentRuntime preserves any user override
  via a CASE-on-existing-value pattern so a daemon reconnect can't
  silently revert the operator's setting.
* New PATCH /api/runtimes/:id endpoint (UpdateAgentRuntime) lets
  the runtime detail page edit the tz; the editor seeds with the
  browser tz on first interaction.

Refs: MUL-1950

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* fix: harden runtime timezone rollups

Co-authored-by: multica-agent <github@multica.ai>

* fix: address runtime timezone review nits

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: Eve <eve@multica.ai>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
Co-authored-by: Eve <eve@multica-ai.local>
2026-05-11 14:39:35 +08:00

175 lines
6.9 KiB
PL/PgSQL

-- Restore the pre-082 function bodies (UTC-bucketed via bare DATE()).
-- Mirrors the definitions installed by migration 077.
CREATE OR REPLACE FUNCTION enqueue_task_usage_daily_dirty_for_atq()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
IF TG_OP = 'UPDATE' THEN
IF OLD.runtime_id IS DISTINCT FROM NEW.runtime_id THEN
IF OLD.runtime_id IS NOT NULL THEN
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, OLD.runtime_id, tu.provider, tu.model
FROM task_usage tu
JOIN agent a ON a.id = OLD.agent_id
WHERE tu.task_id = OLD.id
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
END IF;
IF NEW.runtime_id IS NOT NULL THEN
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, NEW.runtime_id, tu.provider, tu.model
FROM task_usage tu
JOIN agent a ON a.id = NEW.agent_id
WHERE tu.task_id = NEW.id
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
END IF;
END IF;
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
IF OLD.runtime_id IS NOT NULL THEN
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
SELECT DISTINCT DATE(tu.created_at), a.workspace_id, OLD.runtime_id, tu.provider, tu.model
FROM task_usage tu
JOIN agent a ON a.id = OLD.agent_id
WHERE tu.task_id = OLD.id
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
END IF;
RETURN OLD;
END IF;
RETURN NULL;
END;
$$;
CREATE OR REPLACE FUNCTION enqueue_task_usage_daily_dirty_for_tu()
RETURNS TRIGGER
LANGUAGE plpgsql
AS $$
BEGIN
INSERT INTO task_usage_daily_dirty (bucket_date, workspace_id, runtime_id, provider, model)
SELECT DATE(OLD.created_at), a.workspace_id, atq.runtime_id, OLD.provider, OLD.model
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE atq.id = OLD.task_id
AND atq.runtime_id IS NOT NULL
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET enqueued_at = GREATEST(task_usage_daily_dirty.enqueued_at, EXCLUDED.enqueued_at);
RETURN OLD;
END;
$$;
CREATE OR REPLACE FUNCTION rollup_task_usage_daily_window(
p_from TIMESTAMPTZ,
p_to TIMESTAMPTZ
)
RETURNS BIGINT
LANGUAGE plpgsql
AS $$
DECLARE
v_rows BIGINT;
BEGIN
IF p_from >= p_to THEN
RETURN 0;
END IF;
WITH
dirty_from_updates AS (
SELECT DISTINCT
DATE(tu.created_at) AS bucket_date,
a.workspace_id AS workspace_id,
atq.runtime_id AS runtime_id,
tu.provider AS provider,
tu.model AS model
FROM task_usage tu
JOIN agent_task_queue atq ON atq.id = tu.task_id
JOIN agent a ON a.id = atq.agent_id
WHERE atq.runtime_id IS NOT NULL
AND (
(tu.updated_at >= p_from AND tu.updated_at < p_to)
OR (tu.updated_at IS NULL
AND tu.created_at >= p_from
AND tu.created_at < p_to)
)
),
dirty_from_queue AS (
SELECT bucket_date, workspace_id, runtime_id, provider, model
FROM task_usage_daily_dirty
WHERE enqueued_at < p_to
),
dirty_keys AS (
SELECT * FROM dirty_from_updates
UNION
SELECT * FROM dirty_from_queue
),
recomputed AS (
SELECT
dk.bucket_date,
dk.workspace_id,
dk.runtime_id,
dk.provider,
dk.model,
SUM(tu.input_tokens)::bigint AS input_tokens,
SUM(tu.output_tokens)::bigint AS output_tokens,
SUM(tu.cache_read_tokens)::bigint AS cache_read_tokens,
SUM(tu.cache_write_tokens)::bigint AS cache_write_tokens,
COUNT(*)::bigint AS event_count
FROM dirty_keys dk
JOIN agent_task_queue atq ON atq.runtime_id = dk.runtime_id
JOIN agent a ON a.id = atq.agent_id
AND a.workspace_id = dk.workspace_id
JOIN task_usage tu ON tu.task_id = atq.id
AND tu.provider = dk.provider
AND tu.model = dk.model
AND DATE(tu.created_at) = dk.bucket_date
GROUP BY 1, 2, 3, 4, 5
),
upserted AS (
INSERT INTO task_usage_daily AS d (
bucket_date, workspace_id, runtime_id, provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
event_count
)
SELECT
bucket_date, workspace_id, runtime_id, provider, model,
input_tokens, output_tokens, cache_read_tokens, cache_write_tokens,
event_count
FROM recomputed
ON CONFLICT (bucket_date, workspace_id, runtime_id, provider, model) DO UPDATE
SET input_tokens = EXCLUDED.input_tokens,
output_tokens = EXCLUDED.output_tokens,
cache_read_tokens = EXCLUDED.cache_read_tokens,
cache_write_tokens = EXCLUDED.cache_write_tokens,
event_count = EXCLUDED.event_count,
updated_at = now()
RETURNING 1
),
deleted_empty AS (
DELETE FROM task_usage_daily d
USING dirty_keys dk
WHERE d.bucket_date = dk.bucket_date
AND d.workspace_id = dk.workspace_id
AND d.runtime_id = dk.runtime_id
AND d.provider = dk.provider
AND d.model = dk.model
AND NOT EXISTS (
SELECT 1 FROM recomputed r
WHERE r.bucket_date = dk.bucket_date
AND r.workspace_id = dk.workspace_id
AND r.runtime_id = dk.runtime_id
AND r.provider = dk.provider
AND r.model = dk.model
)
RETURNING 1
)
SELECT (SELECT COUNT(*) FROM upserted) + (SELECT COUNT(*) FROM deleted_empty)
INTO v_rows;
DELETE FROM task_usage_daily_dirty WHERE enqueued_at < p_to;
RETURN v_rows;
END;
$$;