Files
multica/server/internal/handler/handler.go
Multica Eve eb067ff077 fix(server): aggregate task_usage into daily rollup table to cut DB load (#2256)
* fix(server): aggregate task_usage into daily rollup table to cut DB load

ListRuntimeUsage previously did a SUM(...) GROUP BY DATE(created_at), provider,
model over the raw task_usage stream once per runtime row on the runtimes
list and once per detail page load, scaling O(events) per call. This is the
hot read path responsible for sustained load on Postgres.

Switch the read path to a materialized daily rollup table maintained by a
pg_cron job:

- 072_task_usage_daily_rollup: schema for task_usage_daily +
  task_usage_rollup_state, plus rollup_task_usage_daily_window(p_from, p_to)
  (window primitive used by both cron and offline backfill, idempotent via
  ON CONFLICT DO UPDATE adding deltas) and rollup_task_usage_daily() (cron
  entry point — pg_try_advisory_lock(4242) for serialization, watermark
  advancement, 5-minute safety lag for late-visible inserts). Also adds
  idx_task_usage_created_at to help the two lazy endpoints
  (ListRuntimeUsageByAgent / GetRuntimeUsageByHour) that still hit the
  raw table.

- 073_task_usage_daily_pgcron: CREATE EXTENSION IF NOT EXISTS pg_cron in a
  DO/EXCEPTION block (mirrors the migration 032 pg_bigm pattern so envs
  without shared_preload_libraries=pg_cron skip gracefully) and schedules
  rollup_task_usage_daily() every 5 minutes when the extension is present.

- queries/runtime_usage.sql ListRuntimeUsage rewritten to read from
  task_usage_daily; sqlc regenerated. Other usage queries unchanged.

- cmd/backfill_task_usage_daily: one-shot Go command that walks
  task_usage in monthly slices through rollup_task_usage_daily_window,
  then stamps the watermark to now()-5m so the cron resumes cleanly.
  Run once after migrations have applied, before relying on the rollup.

- runtime_test.go: TestGetRuntimeUsage_BucketsByUsageTime now invokes
  rollup_task_usage_daily_window after fixture inserts so the handler
  sees the rolled-up rows. Synthetic daily rows cleaned up after each
  test.

- runtime_rollup_test.go: new tests covering aggregation correctness,
  idempotency contract of ON CONFLICT DO UPDATE, and the watermark
  advancing exactly to now()-5m via the cron entry point.

Deployment order: apply migrations → run backfill_task_usage_daily once
→ pg_cron picks up subsequent windows automatically. Today bucket may be
up to ~10 minutes stale (5 min cron + 5 min lag) by design.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* fix(server): make task_usage_daily rollup safe to overlap, replay, and correct

Addresses 4 review blockers on the original PR:

1. Cron/backfill double-count race: the rollup function is now idempotent.
   Window calls find DIRTY KEYS via task_usage.updated_at, then RECOMPUTE
   each bucket from ground truth and REPLACE the daily row (no more
   additive ON CONFLICT). Cron and backfill can now overlap safely.

2. Silent pg_cron absence: the read path is gated behind a new
   USAGE_DAILY_ROLLUP_ENABLED feature flag (default off). The raw
   task_usage scan is preserved as the fallback. Operators flip the
   flag per-environment after backfill + cron are confirmed healthy
   (task_usage_rollup_lag_seconds() helper added for monitoring).

3. UpsertTaskUsage corrections invisible to rollup: added
   task_usage.updated_at column (default now(), backfilled from
   created_at), and bumped it on conflict. Corrections now mark the
   bucket dirty and the next window call recomputes it correctly.

4. CREATE INDEX blocking writes on hot table: split into separate
   single-statement migrations using CREATE INDEX CONCURRENTLY
   (074, 075), matching the 035/067 pattern.

Also: cron.schedule() removed from migrations entirely. Migration 076
only enables the extension (gracefully on unsupported envs); the actual
schedule is a documented operator runbook step that runs AFTER backfill.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* fix(server): trigger-driven invalidation + online-safe migration for task_usage_daily

Round-2 review feedback on PR #2256:

1. Add explicit dirty-bucket queue (task_usage_daily_dirty) populated by
   triggers on agent_task_queue (UPDATE OF runtime_id, DELETE) and
   task_usage (DELETE). The rollup window function drains both this queue
   and the updated_at-based discovery, so runtime reassignment and
   issue-cascade deletes no longer leave the rollup divergent from the
   raw query.

   Triggers join via agent (not issue) to look up workspace_id, because
   when the cascade comes from issue, the issue row is already gone by
   the time atq's BEFORE DELETE fires; agent stays alive.

2. Make migration 072 online-safe: only ADD COLUMN updated_at TIMESTAMPTZ
   (nullable, no default → metadata-only ALTER, no row rewrite) and a
   separate ALTER for SET DEFAULT now() (also metadata-only). No bulk
   UPDATE on the hot task_usage table. The rollup window function's
   dirty_keys CTE handles legacy NULL rows via an OR branch, supported
   by partial index idx_task_usage_created_at_legacy.

3. Refresh stale documentation in cmd/backfill_task_usage_daily/main.go
   header to describe the current recompute/replace semantics, idempotent
   re-runnability, and the actual migration numbering (072..077).

Tests:
- TestRollupTaskUsageDaily_InvalidationOnReassign: verifies usage moves
  between runtime buckets after ReassignTasksToRuntime-style update.
- TestRollupTaskUsageDaily_InvalidationOnIssueDelete: verifies daily
  bucket is cleared after issue delete cascades through atq → task_usage.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* fix(server): close dirty-queue race + move legacy partial index to its own concurrent migration

Round-3 review feedback on PR #2256:

1. Blocker: dirty-queue invalidations could be silently lost under
   concurrency. ON CONFLICT DO NOTHING let a late trigger see the row
   already enqueued, no-op, and then the rollup drain (WHERE
   enqueued_at < p_to) would delete the original row — losing the
   late invalidation. Switched all three trigger enqueue paths to
   ON CONFLICT DO UPDATE SET enqueued_at = GREATEST(existing,
   EXCLUDED.enqueued_at), so any invalidation arriving during a
   rollup tick keeps enqueued_at > p_to (p_to = now() - 5min) and
   survives the post-tick drain.

2. High: idx_task_usage_created_at_legacy (partial index on hot
   task_usage table) was being created in the regular 077 migration
   without CONCURRENTLY. Moved to new migration 078 with
   CREATE INDEX CONCURRENTLY, matching the pattern of 074/075.
   077's down migration leaves the index alone (it is owned by 078).

3. Minor: gofmt -w on runtime_rollup_test.go and
   backfill_task_usage_daily/main.go (tabs were lost in the original
   heredoc append). PR description rewritten to describe the current
   recompute/replace + dirty queue + feature flag design and the
   072..078 migration ordering.

Tests still green: TestRollupTaskUsageDaily_* (including both new
invalidation regressions), TestGetRuntimeUsage_*, TestWorkspaceUsage_*.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

* fix(server): unify workspace_id source via agent in rollup window function

Round-4 review feedback (J) on PR #2256:

M1 (must-fix): The dirty queue triggers resolved workspace_id via
`agent.workspace_id`, but the window function's `dirty_from_updates`
discovery and `recomputed` recompute join used `issue.workspace_id`.
There is no schema-level FK guaranteeing
`agent.workspace_id == issue.workspace_id`. Any divergence (future
cross-workspace task scenarios, data repairs, migration bugs) would
cause:

  - dirty queue rows with workspace_id from agent
  - recompute join filtering by workspace_id from issue
  - 0 matches in recompute → bucket erroneously hits the
    deleted_empty branch and the daily row is silently dropped
  - dirty_from_updates path attributing usage to the wrong workspace

Replaced both CTEs to JOIN agent (not issue) so trigger / discovery /
recompute share one workspace_id source. Comment in 077 explains the
constraint.

N1: Refreshed two stale references in
cmd/backfill_task_usage_daily/main.go (header now says "072..078";
stampWatermark warning now mentions migration 073, where the rollup
state table is actually introduced).

Test: New TestRollupTaskUsageDaily_WorkspaceMismatch constructs an
atq with agent.workspace_id != issue.workspace_id, asserts the bucket
lands under agent's workspace (not issue's), and re-asserts after a
runtime reassign in the foreign workspace. Acts as a canary if the
schema invariant changes.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: Eve <eve@multica.ai>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: multica-agent <github@multica.ai>
Co-authored-by: Devv <devv@Devvs-Mac-mini.local>
2026-05-08 15:35:21 +08:00

590 lines
19 KiB
Go

package handler
import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"log/slog"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/analytics"
"github.com/multica-ai/multica/server/internal/auth"
"github.com/multica-ai/multica/server/internal/daemonws"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/middleware"
"github.com/multica-ai/multica/server/internal/realtime"
"github.com/multica-ai/multica/server/internal/service"
"github.com/multica-ai/multica/server/internal/storage"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// randomID returns a random 16-byte hex string used as a request ID for
// in-memory stores (model list, local skills, CLI update, etc.).
func randomID() string {
b := make([]byte, 16)
rand.Read(b)
return hex.EncodeToString(b)
}
type txStarter interface {
Begin(ctx context.Context) (pgx.Tx, error)
}
type dbExecutor interface {
Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error)
QueryRow(ctx context.Context, sql string, args ...any) pgx.Row
}
type Config struct {
AllowSignup bool
AllowedEmails []string
AllowedEmailDomains []string
// UseDailyRollupForRuntimeUsage routes ListRuntimeUsage to the
// task_usage_daily rollup table when true. Default false: the read
// path stays on the raw task_usage stream so rollup-related issues
// (pg_cron not running, backfill not yet performed, watermark stuck)
// can never make the dashboard return empty/stale data. Operators
// flip this on per environment AFTER:
// 1) migrations 072..076 applied,
// 2) backfill_task_usage_daily ran successfully,
// 3) cron job scheduled and task_usage_rollup_lag_seconds() < 900.
UseDailyRollupForRuntimeUsage bool
}
type Handler struct {
Queries *db.Queries
DB dbExecutor
TxStarter txStarter
Hub *realtime.Hub
DaemonHub *daemonws.Hub
Bus *events.Bus
TaskService *service.TaskService
AutopilotService *service.AutopilotService
EmailService *service.EmailService
UpdateStore UpdateStore
ModelListStore ModelListStore
LocalSkillListStore LocalSkillListStore
LocalSkillImportStore LocalSkillImportStore
LivenessStore LivenessStore
HeartbeatScheduler HeartbeatScheduler
Storage storage.Storage
CFSigner *auth.CloudFrontSigner
Analytics analytics.Client
PATCache *auth.PATCache
DaemonTokenCache *auth.DaemonTokenCache
cfg Config
}
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService, store storage.Storage, cfSigner *auth.CloudFrontSigner, analyticsClient analytics.Client, cfg Config, daemonHubs ...*daemonws.Hub) *Handler {
var executor dbExecutor
if candidate, ok := txStarter.(dbExecutor); ok {
executor = candidate
}
if analyticsClient == nil {
analyticsClient = analytics.NoopClient{}
}
var daemonHub *daemonws.Hub
if len(daemonHubs) > 0 {
daemonHub = daemonHubs[0]
}
taskSvc := service.NewTaskService(queries, txStarter, hub, bus, daemonHub)
return &Handler{
Queries: queries,
DB: executor,
TxStarter: txStarter,
Hub: hub,
DaemonHub: daemonHub,
Bus: bus,
TaskService: taskSvc,
AutopilotService: service.NewAutopilotService(queries, txStarter, bus, taskSvc),
EmailService: emailService,
UpdateStore: NewInMemoryUpdateStore(),
ModelListStore: NewInMemoryModelListStore(),
LocalSkillListStore: NewInMemoryLocalSkillListStore(),
LocalSkillImportStore: NewInMemoryLocalSkillImportStore(),
LivenessStore: NewNoopLivenessStore(),
HeartbeatScheduler: NewPassthroughHeartbeatScheduler(queries),
Storage: store,
CFSigner: cfSigner,
Analytics: analyticsClient,
cfg: cfg,
}
}
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
json.NewEncoder(w).Encode(v)
}
func writeError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, map[string]string{"error": msg})
}
// Thin wrappers around util functions.
//
// parseUUID is intentionally the panicking variant: any handler call site
// reachable here is expected to feed a UUID that is either (a) a sqlc round-trip
// of a DB-sourced value, or (b) a raw request input that has already been
// validated upstream. A panic here means an unguarded user-input string slipped
// in — that is a real bug we want surfaced loudly (chi's middleware.Recoverer
// converts it to a 500) instead of silently corrupting data via a zero UUID.
//
// For unvalidated user input at request boundaries, use parseUUIDOrBadRequest
// (writes 400) — never feed raw chi.URLParam / request-body strings into
// parseUUID directly when the call writes to the database.
func parseUUID(s string) pgtype.UUID { return util.MustParseUUID(s) }
func uuidToString(u pgtype.UUID) string { return util.UUIDToString(u) }
func textToPtr(t pgtype.Text) *string { return util.TextToPtr(t) }
func ptrToText(s *string) pgtype.Text { return util.PtrToText(s) }
func strToText(s string) pgtype.Text { return util.StrToText(s) }
func timestampToString(t pgtype.Timestamptz) string { return util.TimestampToString(t) }
func timestampToPtr(t pgtype.Timestamptz) *string { return util.TimestampToPtr(t) }
func uuidToPtr(u pgtype.UUID) *string { return util.UUIDToPtr(u) }
func int8ToPtr(v pgtype.Int8) *int64 { return util.Int8ToPtr(v) }
// parseUUIDOrBadRequest validates a UUID string sourced from user input
// (URL params, request body, headers). On invalid input it writes a 400
// response and returns ok=false; callers must return immediately.
//
// Use this anywhere a malformed UUID would otherwise reach a write query
// (DELETE / UPDATE) — the silent zero-UUID behavior of the old ParseUUID
// caused real silent-data-loss bugs (#1661).
func parseUUIDOrBadRequest(w http.ResponseWriter, s, fieldName string) (pgtype.UUID, bool) {
u, err := util.ParseUUID(s)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid "+fieldName)
return pgtype.UUID{}, false
}
return u, true
}
func parseUUIDSliceOrBadRequest(w http.ResponseWriter, ids []string, fieldName string) ([]pgtype.UUID, bool) {
uuids := make([]pgtype.UUID, len(ids))
for i, id := range ids {
u, err := util.ParseUUID(id)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid "+fieldName)
return nil, false
}
uuids[i] = u
}
return uuids, true
}
// publish sends a domain event through the event bus.
func (h *Handler) publish(eventType, workspaceID, actorType, actorID string, payload any) {
h.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: actorType,
ActorID: actorID,
Payload: payload,
})
}
// publishTask is publish() plus a TaskID hint so the realtime layer can route
// the event to the per-task scope rather than the whole workspace.
func (h *Handler) publishTask(eventType, workspaceID, actorType, actorID, taskID string, payload any) {
h.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: actorType,
ActorID: actorID,
TaskID: taskID,
Payload: payload,
})
}
// publishChat is publish() plus a ChatSessionID hint so the realtime layer
// can route the event to the per-chat-session scope.
func (h *Handler) publishChat(eventType, workspaceID, actorType, actorID, chatSessionID string, payload any) {
h.Bus.Publish(events.Event{
Type: eventType,
WorkspaceID: workspaceID,
ActorType: actorType,
ActorID: actorID,
ChatSessionID: chatSessionID,
Payload: payload,
})
}
func isNotFound(err error) bool {
return errors.Is(err, pgx.ErrNoRows)
}
func isUniqueViolation(err error) bool {
var pgErr *pgconn.PgError
return errors.As(err, &pgErr) && pgErr.Code == "23505"
}
func requestUserID(r *http.Request) string {
return r.Header.Get("X-User-ID")
}
// resolveActor determines whether the request is from an agent or a human member.
// If X-Agent-ID and X-Task-ID headers are both set, validates that the task
// belongs to the claimed agent (defense-in-depth against manual header spoofing).
// If only X-Agent-ID is set, validates that the agent belongs to the workspace.
// Returns ("agent", agentID) on success, ("member", userID) otherwise.
func (h *Handler) resolveActor(r *http.Request, userID, workspaceID string) (actorType, actorID string) {
agentID := r.Header.Get("X-Agent-ID")
if agentID == "" {
return "member", userID
}
agentUUID, err := util.ParseUUID(agentID)
if err != nil {
slog.Debug("resolveActor: X-Agent-ID is not a valid UUID, falling back to member", "agent_id", agentID)
return "member", userID
}
// Validate the agent exists in the target workspace.
agent, err := h.Queries.GetAgent(r.Context(), agentUUID)
if err != nil || uuidToString(agent.WorkspaceID) != workspaceID {
slog.Debug("resolveActor: X-Agent-ID rejected, agent not found or workspace mismatch", "agent_id", agentID, "workspace_id", workspaceID)
return "member", userID
}
// When X-Task-ID is provided, cross-check that the task belongs to this agent.
if taskID := r.Header.Get("X-Task-ID"); taskID != "" {
taskUUID, err := util.ParseUUID(taskID)
if err != nil {
slog.Debug("resolveActor: X-Task-ID is not a valid UUID, falling back to member", "task_id", taskID)
return "member", userID
}
task, err := h.Queries.GetAgentTask(r.Context(), taskUUID)
if err != nil || uuidToString(task.AgentID) != agentID {
slog.Debug("resolveActor: X-Task-ID rejected, task not found or agent mismatch", "agent_id", agentID, "task_id", taskID)
return "member", userID
}
}
return "agent", agentID
}
func requireUserID(w http.ResponseWriter, r *http.Request) (string, bool) {
userID := requestUserID(r)
if userID == "" {
writeError(w, http.StatusUnauthorized, "user not authenticated")
return "", false
}
return userID, true
}
// resolveWorkspaceID returns the workspace UUID for this request. Delegates
// to middleware.ResolveWorkspaceIDFromRequest so middleware-protected routes
// and middleware-less routes (e.g. /api/upload-file) share identical
// resolution behavior — including slug → UUID translation via the DB.
//
// Returns "" when no workspace identifier was provided or a slug was provided
// but doesn't match any workspace.
func (h *Handler) resolveWorkspaceID(r *http.Request) string {
return middleware.ResolveWorkspaceIDFromRequest(r, h.Queries)
}
// ctxMember returns the workspace member from context (set by workspace middleware).
func ctxMember(ctx context.Context) (db.Member, bool) {
return middleware.MemberFromContext(ctx)
}
// ctxWorkspaceID returns the workspace ID from context (set by workspace middleware).
func ctxWorkspaceID(ctx context.Context) string {
return middleware.WorkspaceIDFromContext(ctx)
}
// workspaceIDFromURL returns the workspace ID from context (preferred) or chi URL param (fallback).
func workspaceIDFromURL(r *http.Request, param string) string {
if id := middleware.WorkspaceIDFromContext(r.Context()); id != "" {
return id
}
return chi.URLParam(r, param)
}
// workspaceMember returns the member from middleware context, or falls back to a DB
// lookup when the handler is called directly (e.g. in tests).
func (h *Handler) workspaceMember(w http.ResponseWriter, r *http.Request, workspaceID string) (db.Member, bool) {
if m, ok := ctxMember(r.Context()); ok {
return m, true
}
return h.requireWorkspaceMember(w, r, workspaceID, "workspace not found")
}
func roleAllowed(role string, roles ...string) bool {
for _, candidate := range roles {
if role == candidate {
return true
}
}
return false
}
func countOwners(members []db.Member) int {
owners := 0
for _, member := range members {
if member.Role == "owner" {
owners++
}
}
return owners
}
func (h *Handler) getWorkspaceMember(ctx context.Context, userID, workspaceID string) (db.Member, error) {
userUUID, err := util.ParseUUID(userID)
if err != nil {
return db.Member{}, err
}
wsUUID, err := util.ParseUUID(workspaceID)
if err != nil {
return db.Member{}, err
}
return h.Queries.GetMemberByUserAndWorkspace(ctx, db.GetMemberByUserAndWorkspaceParams{
UserID: userUUID,
WorkspaceID: wsUUID,
})
}
func (h *Handler) requireWorkspaceMember(w http.ResponseWriter, r *http.Request, workspaceID, notFoundMsg string) (db.Member, bool) {
if workspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return db.Member{}, false
}
userID, ok := requireUserID(w, r)
if !ok {
return db.Member{}, false
}
member, err := h.getWorkspaceMember(r.Context(), userID, workspaceID)
if err != nil {
writeError(w, http.StatusNotFound, notFoundMsg)
return db.Member{}, false
}
return member, true
}
func (h *Handler) requireWorkspaceRole(w http.ResponseWriter, r *http.Request, workspaceID, notFoundMsg string, roles ...string) (db.Member, bool) {
member, ok := h.requireWorkspaceMember(w, r, workspaceID, notFoundMsg)
if !ok {
return db.Member{}, false
}
if !roleAllowed(member.Role, roles...) {
writeError(w, http.StatusForbidden, "insufficient permissions")
return db.Member{}, false
}
return member, true
}
// isWorkspaceEntity checks whether a user_id belongs to the given workspace,
// as either a member or an agent depending on userType.
func (h *Handler) isWorkspaceEntity(ctx context.Context, userType, userID, workspaceID string) bool {
switch userType {
case "member":
_, err := h.getWorkspaceMember(ctx, userID, workspaceID)
return err == nil
case "agent":
userUUID, err := util.ParseUUID(userID)
if err != nil {
return false
}
wsUUID, err := util.ParseUUID(workspaceID)
if err != nil {
return false
}
_, err = h.Queries.GetAgentInWorkspace(ctx, db.GetAgentInWorkspaceParams{
ID: userUUID,
WorkspaceID: wsUUID,
})
return err == nil
default:
return false
}
}
func (h *Handler) loadIssueForUser(w http.ResponseWriter, r *http.Request, issueID string) (db.Issue, bool) {
if _, ok := requireUserID(w, r); !ok {
return db.Issue{}, false
}
workspaceID := h.resolveWorkspaceID(r)
if workspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return db.Issue{}, false
}
// Try identifier format first (e.g., "JIA-42"). resolveIssueByIdentifier
// silently returns false for non-identifier strings, falling through to
// the UUID path below.
if issue, ok := h.resolveIssueByIdentifier(r.Context(), issueID, workspaceID); ok {
return issue, true
}
issueUUID, err := util.ParseUUID(issueID)
if err != nil {
// Not a valid UUID and didn't match identifier format → 404 (consistent
// with previous silent-zero behavior, which would also have produced 404).
writeError(w, http.StatusNotFound, "issue not found")
return db.Issue{}, false
}
wsUUID, err := util.ParseUUID(workspaceID)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid workspace_id")
return db.Issue{}, false
}
issue, err := h.Queries.GetIssueInWorkspace(r.Context(), db.GetIssueInWorkspaceParams{
ID: issueUUID,
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "issue not found")
return db.Issue{}, false
}
return issue, true
}
// resolveIssueByIdentifier tries to look up an issue by "PREFIX-NUMBER" format.
func (h *Handler) resolveIssueByIdentifier(ctx context.Context, id, workspaceID string) (db.Issue, bool) {
parts := splitIdentifier(id)
if parts == nil {
return db.Issue{}, false
}
if workspaceID == "" {
return db.Issue{}, false
}
wsUUID, err := util.ParseUUID(workspaceID)
if err != nil {
return db.Issue{}, false
}
issue, err := h.Queries.GetIssueByNumber(ctx, db.GetIssueByNumberParams{
WorkspaceID: wsUUID,
Number: parts.number,
})
if err != nil {
return db.Issue{}, false
}
return issue, true
}
type identifierParts struct {
prefix string
number int32
}
func splitIdentifier(id string) *identifierParts {
idx := -1
for i := len(id) - 1; i >= 0; i-- {
if id[i] == '-' {
idx = i
break
}
}
if idx <= 0 || idx >= len(id)-1 {
return nil
}
numStr := id[idx+1:]
num := 0
for _, c := range numStr {
if c < '0' || c > '9' {
return nil
}
num = num*10 + int(c-'0')
}
if num <= 0 {
return nil
}
return &identifierParts{prefix: id[:idx], number: int32(num)}
}
// getIssuePrefix fetches the issue_prefix for a workspace.
// Falls back to generating a prefix from the workspace name if the stored
// prefix is empty (e.g. workspaces created before the prefix was introduced).
func (h *Handler) getIssuePrefix(ctx context.Context, workspaceID pgtype.UUID) string {
ws, err := h.Queries.GetWorkspace(ctx, workspaceID)
if err != nil {
return ""
}
if ws.IssuePrefix != "" {
return ws.IssuePrefix
}
return generateIssuePrefix(ws.Name)
}
func (h *Handler) loadAgentForUser(w http.ResponseWriter, r *http.Request, agentID string) (db.Agent, bool) {
if _, ok := requireUserID(w, r); !ok {
return db.Agent{}, false
}
workspaceID := h.resolveWorkspaceID(r)
if workspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return db.Agent{}, false
}
agentUUID, ok := parseUUIDOrBadRequest(w, agentID, "agent id")
if !ok {
return db.Agent{}, false
}
wsUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace id")
if !ok {
return db.Agent{}, false
}
agent, err := h.Queries.GetAgentInWorkspace(r.Context(), db.GetAgentInWorkspaceParams{
ID: agentUUID,
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "agent not found")
return db.Agent{}, false
}
return agent, true
}
func (h *Handler) loadInboxItemForUser(w http.ResponseWriter, r *http.Request, itemID string) (db.InboxItem, bool) {
userID, ok := requireUserID(w, r)
if !ok {
return db.InboxItem{}, false
}
workspaceID := h.resolveWorkspaceID(r)
if workspaceID == "" {
writeError(w, http.StatusBadRequest, "workspace_id is required")
return db.InboxItem{}, false
}
itemUUID, ok := parseUUIDOrBadRequest(w, itemID, "inbox item id")
if !ok {
return db.InboxItem{}, false
}
wsUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace id")
if !ok {
return db.InboxItem{}, false
}
item, err := h.Queries.GetInboxItemInWorkspace(r.Context(), db.GetInboxItemInWorkspaceParams{
ID: itemUUID,
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "inbox item not found")
return db.InboxItem{}, false
}
if item.RecipientType != "member" || uuidToString(item.RecipientID) != userID {
writeError(w, http.StatusNotFound, "inbox item not found")
return db.InboxItem{}, false
}
return item, true
}