Files
multica/server/pkg/db/generated/agent.sql.go
Bohan Jiang 341ce7bfa5 feat: support local working directory for projects (MUL-2618 v1) (#3283)
* feat(project): add local_directory project_resource type (MUL-2662)

Adds a second project_resource type alongside github_repo so a project
can be pinned to an existing directory on a specific daemon (the v1 of
the local-working-directory flow tracked in MUL-2618). The ref schema is
{ local_path, daemon_id, label? }; local_path must be absolute and
daemon_id is required. The same (daemon_id, local_path) pair is allowed
on multiple projects by design — no UNIQUE constraint is added.

Implementation reuses the existing project_resource API surface: the new
type is wired through the validator switch with no migration, no new
events, and no daemon-handler changes (daemon already passes through
arbitrary resource types via ProjectResources). The CLI gains
--local-path / --daemon-id / --ref-label shortcuts so
`multica project resource add --type local_directory` mirrors the
existing `--type github_repo --url ...` ergonomics; the generic --ref
flag still works for both types.

Tests cover the full CRUD lifecycle, the same-path-across-projects
allowance, the same-path-same-project conflict, the validator rejections
(missing/blank/relative path, missing daemon_id, wrong payload type),
and the cross-platform isAbsoluteLocalPath helper.

Co-authored-by: multica-agent <github@multica.ai>

* feat(project): add update endpoint + label-shadow guard for project_resource (MUL-2662)

Addresses the Elon review on PR #3263:

- Add PUT /api/projects/{id}/resources/{resourceId} with sqlc query,
  matching handler, CLI `project resource update`, and a new
  EventProjectResourceUpdated WS event. resource_type stays immutable;
  ref/label/position are all individually optional.
- Catch same-project (daemon_id, local_path) collisions where only the
  embedded label differs — the row-level UNIQUE only matches the full
  ref JSON, so a label typo would otherwise let the same working
  directory bind twice.
- Tests cover the update lifecycle (label-only / ref / clear / 404 /
  invalid path) and the label-shadow conflict on both create and
  update; the in-place rename still succeeds because the conflict
  scan ignores the row being edited.

Incidental: regenerating sqlc picked up a missing skills_local scan in
UpdateAgentCustomEnv that drifted in from #3200.

Co-authored-by: multica-agent <github@multica.ai>

* fix(project): close bundled-create label-shadow gap + merge resource_ref on CLI update (MUL-2662)

Two follow-ups from MUL-2662 review round 2:

- CreateProject inline resources path now dedupes local_directory entries on
  (daemon_id, local_path) before opening the transaction. The DB-level
  UNIQUE(project_id, resource_type, resource_ref) constraint only fires on a
  full JSON match, so two rows with the same target but different `label`
  would otherwise slip past. Standalone POST/PUT already cover this via
  findLocalDirectoryConflict; bundled create was the missing surface.
- `multica project resource update` now seeds resource_ref from the existing
  row before applying per-type shortcut flags, so `--default-branch-hint x`
  on its own no longer constructs a payload missing `url` (which the server
  400s on). Local_directory partial edits get the same merge behavior.

Co-authored-by: multica-agent <github@multica.ai>

* feat(desktop): local_directory project_resource UI (MUL-2665) (#3273)

* feat(desktop): local_directory project_resource UI (MUL-2665)

First UI surface for the local-working-directory flow tracked in MUL-2618.
Lets users on the desktop pin a project to an existing folder on this
machine; web stays read-only since the per-daemon check can't be done in
the browser.

What's new for the renderer:

- ProjectResourcesSection grows a desktop-only "Add local directory"
  button next to the existing GitHub-repo popover. Clicking it opens
  Electron's native folder picker, validates the path through a new
  IPC pair (existence + r/w), and submits a project_resource of
  resource_type=local_directory with daemon_id pulled live from
  daemonAPI.getStatus.
- LocalDirectoryRow renders the rename pencil + path tooltip, and
  greys out when ref.daemon_id != this machine's daemon_id (with a
  "only available on the machine that registered this directory"
  tooltip). Delete stays enabled so users can drop stale registrations
  from any device.
- LocalDirectoryHint sits above the issue-detail comment composer and
  shows "Agent will work in-place at {label} ({path})" when the issue's
  project has a local_directory matching this daemon. Hidden on web.
- TaskStatusPill picks up a new "waiting_for_directory_release" stage
  that the daemon will publish when it dequeues a task but can't
  acquire the path lock. The render is in place now so the daemon
  sibling subtask can wire the status string without an additional UI
  PR.

Plumbing:

- @multica/core/types gains LocalDirectoryResourceRef +
  UpdateProjectResourceRequest, and the api client gets the matching
  PUT method backed by the server endpoint that landed in
  2ac3faebb (MUL-2662). A useUpdateProjectResource hook drives the
  in-place label edit.
- New Electron handlers under apps/desktop/src/main/local-directory.ts:
    local-directory:pick     -> dialog.showOpenDialog (openDirectory)
    local-directory:validate -> stat + access(R_OK + W_OK)
  exposed through the preload as desktopAPI.pickDirectory /
  validateLocalDirectory. View code talks to them via a thin
  packages/views/platform helper that returns reason=unsupported on
  web instead of crashing.
- useLocalDaemonStatus exposes the local daemon's id, device name, and
  running flag from daemonAPI.onStatusChange so the renderer can do the
  cross-device match without coupling to the desktop preload typings.

Tests:

- pickStageKeys gets a unit test covering the new stage and proving
  the directory-release status outranks availability hints.
- LocalDirectoryHint tests cover the four render branches (no project,
  no daemon, foreign daemon, matching daemon).
- i18n parity stays green; new keys added under projects.resources.*
  and chat.status_pill.stages.waiting_for_directory_release in both
  locales.

Out of scope (will land separately):
- The daemon-side waiting/lock signal that flips the pill into the
  new state.
- Adding local_directory to the create-project modal's bulk
  attach flow.
- Docs page refresh for project-resources.mdx — left for the
  MUL-2618 umbrella sweep.

Co-authored-by: multica-agent <github@multica.ai>

* fix(desktop): hide rename for foreign daemon local_directory rows (MUL-2618)

Address review nit on #3273: the rename pencil was gated only by
`canEdit`, so a foreign / unknown-daemon row still showed it even
though the spec says cross-device rows are disabled. Gate rename on
`!mismatch` so it disappears on those rows; delete stays available
so a stale registration can still be dropped from any device.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* feat(daemon): local_directory execution + path mutex + GC exception (MUL-2663) (#3274)

* feat(daemon): local_directory execution + path mutex + GC exception (MUL-2663)

Wires up the daemon side of the local_directory project_resource introduced
in MUL-2662. When a task is dispatched against a project whose resources
include a local_directory pinned to this daemon's UUID, the daemon now:

  - Validates the path (absolute, exists, daemon process can read+write,
    not in the system-root / $HOME blacklist) and fails the task fast on
    any precondition violation, with a user-readable reason.
  - Serialises concurrent tasks on the same on-disk path via a
    daemon-local LocalPathLocker keyed by symlink-resolved realpath. The
    lock is held for the entire task lifetime (claim → context write →
    agent → result report).
  - When the lock is contended, the daemon flips the row to a new
    waiting_local_directory status on the server (carrying a wait_reason
    like "<path> (held by task <short id>)") so the UI can render
    "等待本地目录释放" instead of leaving the row silently in dispatched
    past the sweeper timeout. The status accepts being woken into running
    once the lock is acquired.
  - Sets execenv.WorkDir to the user's path (no copy, no mount). envRoot
    still lives under workspacesRoot/<wsID>/ and hosts output/, logs/, and
    .gc_meta.json — the daemon's logbook for the run.
  - Stamps GCMeta.LocalDirectory=true so the GC loop never RemoveAlls
    envRoot for these tasks (gcActionClean → gcActionCleanArtifacts,
    gcActionOrphan → gcActionSkip). The user's directory was never under
    envRoot to begin with, so this is defense in depth.
  - Skips execenv.Reuse for local_directory tasks because the prior
    WorkDir is the user's path and reusing it through that code path
    loses the envRoot association the GC loop needs. Prepare is cheap
    here (no clone, no copy), so always running it is fine.

Server-side protocol changes:

  - New CHECK value 'waiting_local_directory' on agent_task_queue.status
    plus a wait_reason TEXT column (migration 109).
  - All cancel / active / counted-as-running / orphan-recovery queries
    expanded to include the new status; FailStaleTasks intentionally
    excludes it (the daemon owns the wait).
  - New SQL MarkAgentTaskWaitingLocalDirectory(id, reason) and a relaxed
    StartAgentTask that accepts both dispatched and
    waiting_local_directory as preconditions (and clears wait_reason on
    the way through).
  - New POST /api/daemon/tasks/{taskId}/wait-local-directory endpoint,
    TaskService.MarkTaskWaitingLocalDirectory broadcaster, and matching
    daemon Client.MarkTaskWaitingLocalDirectory.

Tests cover: path blacklist + R/W enforcement, mutex serialisation +
ctx-cancelled wait, lock handover between two tasks, GC never returns
gcActionClean / gcActionOrphan for local_directory rows (with negative
control for the standard path), and Prepare/Cleanup correctly substitute
+ protect the user's WorkDir.

The desktop UI side (UI for adding a local_directory resource, surfacing
the "等待本地目录" badge) is MUL-2665; the agent-task lifecycle changes
(no branch switch, dirty-tree tolerant, auto-commit) are MUL-2664.

This PR targets the shared MUL-2618 v1 feature branch agent/j/912b8cb1,
not main; the whole v1 will be merged to main together when complete.

Co-authored-by: multica-agent <github@multica.ai>

* fix(daemon): tighten local_directory status, symlink, cancel handling (MUL-2618)

Address the 3 must-fix items from Elon's review of PR #3274.

1. Status string unified. The server / daemon publish
   `waiting_local_directory`; align views, locales, and the
   pickStageKeys test (PR #3273 had used `waiting_for_directory_release`
   on a placeholder string). Without this, the daemon's wait state
   never reached the pill once the two siblings merged.

2. validateLocalPath now also runs the blacklist against the
   symlink-resolved realpath, with macOS's `/etc` -> `/private/etc`
   redirect handled via `isBlacklistedRealPath` which compares
   canonical forms. Without this, a symlink such as
   `/Users/me/proj/home -> /Users/me` slipped the literal $HOME check
   while every daemon write still landed in the user's home. Tests
   cover symlink-to-home, symlink-to-system-root, and the negative
   case (symlink to a regular subdirectory).

3. acquireLocalDirectoryLockIfNeeded now spins up a cancellation
   watcher inside `onWait` (lazy — the fast path stays free) so the
   gap between dispatch and StartTask responds to server-side cancel
   or row deletion. If the watcher fires while the daemon is parked
   on the path mutex, the lock-wait context is cancelled, Acquire
   returns promptly, and the helper exits silently the same way the
   run-phase poller does. New TestAcquireLocalDirectoryLock_CancelDuringWait
   exercises the path end-to-end with a fake server.

Co-authored-by: multica-agent <github@multica.ai>

* fix(daemon): unconditional canonical blacklist + Windows drive-root generalisation (MUL-2618)

- validateLocalPath now always runs isBlacklistedRealPath on the
  symlink-resolved path, not only when it differs from absPath. The old
  guard let users type the canonical form of an OS-symlinked banned root
  (e.g. /private/tmp, /private/etc, /private/var on macOS) straight
  through, since EvalSymlinks is a no-op on already-canonical input.
- Windows drive-root rejection moved off the static C/D/E/F enumeration
  onto filepath.VolumeName via a new isDriveRoot helper, so removable /
  network drives mounted at G:..Z: and UNC \\server\share roots are also
  blocked. systemRootBlacklist keeps the well-known C:\ trees only.
- Tests: macOS-only case exercises direct /private/{tmp,etc,var}; a
  new TestIsDriveRoot covers the Windows generalisation (skipped on
  POSIX runners by runtime guard).

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>

* feat(views): wire waiting_local_directory end-to-end in issue UI + presence (MUL-2618)

Connect the daemon-emitted `task:waiting_local_directory` and `task:running`
events through to issue execution log, sticky agent banner, activity indicator,
and agent presence so a parked task is no longer invisible on the issue page.

- Add `waiting_local_directory` to `AgentTask.status` and the typed
  `task:running` / `task:waiting_local_directory` WS event payloads.
- Chat realtime sync writes both new statuses into the pending-task cache so
  the chat StatusPill flips out of a stale `dispatched` frame.
- ExecutionLogSection: count `waiting_local_directory` as active, add tone +
  status label, treat parked tasks the same as dispatched for time anchor /
  transcript visibility / terminate-confirm note.
- AgentLiveCard: subscribe to both new events, rank the parked state between
  dispatched and queued, and surface a "is waiting for the local directory"
  banner with the muted "Clock" treatment used for queued.
- IssueAgentActivityIndicator: route parked tasks into the queued bucket so
  the hover stack and chip stay visible.
- derive-presence: parked tasks count toward `queuedCount` so the agent
  workload chip stays out of `idle` while the daemon waits on the path lock.
- Locales: add `agent_live.is_waiting_local_directory` and
  `execution_log.status_waiting_local_directory` (en + zh-Hans).

Co-authored-by: multica-agent <github@multica.ai>

* feat(project): enforce one local_directory per (project, daemon) (MUL-2618)

The daemon-side resolver picks the first matching local_directory by
daemon_id, so allowing two rows on the same daemon — even at different
paths — let the agent silently write into whichever sorted first. Tighten
the invariant top to bottom:

- server: `findLocalDirectoryConflict` rejects any second row sharing a
  daemon_id, regardless of `local_path` or label. Bundled-create surface in
  `CreateProject` runs the same daemon-scoped dedupe up front.
- daemon: `findLocalDirectoryAssignment` fails fast when it finds more than
  one row pinned to the current daemon (older API client / direct DB
  writes can still produce that state — refuse to guess).
- desktop UI: hide the "Add local directory" action once the current
  daemon owns a row on this project, with a hint and a defensive toast on
  the call path; foreign-daemon rows stay visible read-only as before.
- Tests:
  * daemon: new `two local_directory rows on this daemon fail fast` /
    `local_directory rows on different daemons coexist` cases.
  * handler: rewrite the legacy `LabelShadow` cases as
    `DaemonScopedConflict` / `BundledLocalDirectoryDaemonConflict` —
    asserts 409 on same-daemon different-path, 201 on per-daemon bundles.
- Locales: en + zh-Hans copy for the new hint + toast.

Co-authored-by: multica-agent <github@multica.ai>

* chore(sqlc): drop stale skills_local in UpdateAgentCustomEnv (MUL-2618)

Follow-up to the main-merge in 0f8e8ca7: the auto-merge preserved most
of main's skills_local revert but kept the column reference inside the
UpdateAgentCustomEnv scanner because that block hadn't been touched by
either side. Re-running `sqlc generate` regenerates the file without
skills_local in this query, matching the rest of the file and the
post-revert schema.

Co-authored-by: multica-agent <github@multica.ai>

* feat(create-project): binary source picker — repos OR local directory

Turn the create-project dialog's "Repos" pill into a binary Source
picker. A project's source is mutually exclusive: either a set of
GitHub repos (worktree mode, default) or a single local working
directory (local mode, desktop-only). Mirrors the constraint the
backend will enforce next.

Behavior:
- Pill shows the active mode's selection (GitHub icon + repo count, or
  folder icon + local label/path).
- Popover has a 2-tab segmented control at the top; the Local tab is
  hidden entirely on web (local_directory needs a daemon_id).
- Local tab requires the daemon online — amber notice + disabled picker
  when offline, re-renders automatically via useLocalDaemonStatus.
- Switching tabs preserves the other side's stash, but handleSubmit
  only emits the resource matching the active sourceMode, so abandoned
  picks never leak into the created project.

Backend mutual-exclusion validation + the resources-section
conditional-add-button still to come — this PR just unblocks the
dialog so it can be demoed.

* fix(mobile): cover waiting_local_directory in run row status maps (MUL-2618)

---------

Co-authored-by: multica-agent <github@multica.ai>
Co-authored-by: Multica J <j@multica.ai>
2026-05-27 13:44:31 +08:00

2674 lines
85 KiB
Go

// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.31.1
// source: agent.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const archiveAgent = `-- name: ArchiveAgent :one
UPDATE agent SET archived_at = now(), archived_by = $2, updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
type ArchiveAgentParams struct {
ID pgtype.UUID `json:"id"`
ArchivedBy pgtype.UUID `json:"archived_by"`
}
func (q *Queries) ArchiveAgent(ctx context.Context, arg ArchiveAgentParams) (Agent, error) {
row := q.db.QueryRow(ctx, archiveAgent, arg.ID, arg.ArchivedBy)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const archiveAgentsByIDs = `-- name: ArchiveAgentsByIDs :many
UPDATE agent
SET archived_at = now(), archived_by = $1, updated_at = now()
WHERE id = ANY($2::uuid[]) AND archived_at IS NULL
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
type ArchiveAgentsByIDsParams struct {
ArchivedBy pgtype.UUID `json:"archived_by"`
AgentIds []pgtype.UUID `json:"agent_ids"`
}
// Narrow archive that only touches the explicit ID list. Used by the
// cascade-delete endpoint so the user's expected_active_agent_ids list
// is the authoritative bound on what gets archived: any agent that
// appeared on the runtime after the user opened the dialog is filtered
// out here so it can't be silently archived even in the (vanishingly
// rare) case where a row-level race slips past the runtime FOR UPDATE
// lock. Returns the affected rows so the caller can broadcast
// agent:archived per agent.
func (q *Queries) ArchiveAgentsByIDs(ctx context.Context, arg ArchiveAgentsByIDsParams) ([]Agent, error) {
rows, err := q.db.Query(ctx, archiveAgentsByIDs, arg.ArchivedBy, arg.AgentIds)
if err != nil {
return nil, err
}
defer rows.Close()
items := []Agent{}
for rows.Next() {
var i Agent
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const archiveAgentsByRuntime = `-- name: ArchiveAgentsByRuntime :many
UPDATE agent
SET archived_at = now(), archived_by = $1, updated_at = now()
WHERE runtime_id = ANY($2::uuid[]) AND archived_at IS NULL
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
type ArchiveAgentsByRuntimeParams struct {
ArchivedBy pgtype.UUID `json:"archived_by"`
RuntimeIds []pgtype.UUID `json:"runtime_ids"`
}
// Bulk-archives every active agent bound to any runtime in the given set.
// Used when revoking a leaving member's runtimes so agents pinned to those
// runtimes can no longer be assigned new work. Returns the affected rows so
// the caller can broadcast agent:archived per agent.
func (q *Queries) ArchiveAgentsByRuntime(ctx context.Context, arg ArchiveAgentsByRuntimeParams) ([]Agent, error) {
rows, err := q.db.Query(ctx, archiveAgentsByRuntime, arg.ArchivedBy, arg.RuntimeIds)
if err != nil {
return nil, err
}
defer rows.Close()
items := []Agent{}
for rows.Next() {
var i Agent
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const cancelAgentTask = `-- name: CancelAgentTask :one
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, cancelAgentTask, id)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
// Returns the affected rows so callers can broadcast task:cancelled events.
// Mirrors the shape of CancelAgentTasksByIssue / CancelAgentTasksByIssueAndAgent
// (also :many + RETURNING + completed_at) so the three sibling cancel paths
// behave consistently.
func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, cancelAgentTasksByAgent, agentID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const cancelAgentTasksByChatSession = `-- name: CancelAgentTasksByChatSession :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE chat_session_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
// Cancels active tasks belonging to a chat session. Called from
// DeleteChatSession so the daemon doesn't keep running work whose result
// has nowhere to land. Must run BEFORE the chat_session row is deleted —
// the FK ON DELETE SET NULL would otherwise nullify chat_session_id and we
// could no longer reach those tasks.
func (q *Queries) CancelAgentTasksByChatSession(ctx context.Context, chatSessionID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, cancelAgentTasksByChatSession, chatSessionID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
// Cancels every active task on the issue and returns the affected rows so the
// caller can reconcile each agent's status and broadcast task:cancelled events
// (#1587). Prior :exec form silently dropped that info, so internal cancel
// paths (issue status flips to cancelled/done, etc.) left agents stuck at
// status="working" with no self-correction.
func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, cancelAgentTasksByIssue, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgent :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
type CancelAgentTasksByIssueAndAgentParams struct {
IssueID pgtype.UUID `json:"issue_id"`
AgentID pgtype.UUID `json:"agent_id"`
}
// Cancels active tasks for a single (issue, agent) pair without touching
// tasks belonging to other agents on the same issue. Used by the manual
// rerun flow so re-running the assignee doesn't collateral-cancel a
// still-running @-mention agent on the same issue.
func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg CancelAgentTasksByIssueAndAgentParams) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, cancelAgentTasksByIssueAndAgent, arg.IssueID, arg.AgentID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComment :many
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
// Cancels active tasks whose trigger is the given comment. Called when a
// comment is deleted so the agent does not run with the now-deleted content
// already embedded in its prompt. Must run BEFORE the comment row is deleted
// because the FK ON DELETE SET NULL would otherwise nullify trigger_comment_id
// and we'd lose the ability to find the affected tasks.
func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerCommentID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, cancelAgentTasksByTriggerComment, triggerCommentID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const claimAgentTask = `-- name: ClaimAgentTask :one
UPDATE agent_task_queue
SET status = 'dispatched', dispatched_at = now()
WHERE id = (
SELECT atq.id FROM agent_task_queue atq
WHERE atq.agent_id = $1 AND atq.status = 'queued'
AND NOT EXISTS (
SELECT 1 FROM agent_task_queue active
WHERE active.agent_id = atq.agent_id
AND active.status IN ('dispatched', 'running', 'waiting_local_directory')
AND (
(atq.issue_id IS NOT NULL AND active.issue_id = atq.issue_id)
OR (atq.chat_session_id IS NOT NULL AND active.chat_session_id = atq.chat_session_id)
OR (
atq.issue_id IS NULL
AND atq.chat_session_id IS NULL
AND atq.autopilot_run_id IS NULL
AND active.issue_id IS NULL
AND active.chat_session_id IS NULL
AND active.autopilot_run_id IS NULL
)
)
)
ORDER BY atq.priority DESC, atq.created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
// a task is only claimable when no other task for the same issue AND same agent is
// already dispatched or running. This allows different agents to work on the same
// issue in parallel while preventing a single agent from running duplicate tasks.
// Chat tasks (issue_id IS NULL) use chat_session_id for serialization instead.
// Quick-create tasks have no issue / chat / autopilot link, so they serialize on
// "any other quick-create-shaped task" (all four FKs NULL) for the same agent —
// otherwise a user mashing the create button could fire concurrent quick-creates
// whose completion lookup would race over "most recent issue by this agent".
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, claimAgentTask, agentID)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const clearAgentMcpConfig = `-- name: ClearAgentMcpConfig :one
UPDATE agent SET mcp_config = NULL, updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
func (q *Queries) ClearAgentMcpConfig(ctx context.Context, id pgtype.UUID) (Agent, error) {
row := q.db.QueryRow(ctx, clearAgentMcpConfig, id)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const clearAgentThinkingLevel = `-- name: ClearAgentThinkingLevel :one
UPDATE agent SET thinking_level = NULL, updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
// Explicit NULL-clear for thinking_level. COALESCE-based UpdateAgent cannot
// set the column back to NULL, so the API layer routes "user picked Default"
// through this dedicated query.
func (q *Queries) ClearAgentThinkingLevel(ctx context.Context, id pgtype.UUID) (Agent, error) {
row := q.db.QueryRow(ctx, clearAgentThinkingLevel, id)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const completeAgentTask = `-- name: CompleteAgentTask :one
UPDATE agent_task_queue
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
type CompleteAgentTaskParams struct {
ID pgtype.UUID `json:"id"`
Result []byte `json:"result"`
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
}
func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, completeAgentTask,
arg.ID,
arg.Result,
arg.SessionID,
arg.WorkDir,
)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const countRunningTasks = `-- name: CountRunningTasks :one
SELECT count(*) FROM agent_task_queue
WHERE agent_id = $1 AND status IN ('dispatched', 'running', 'waiting_local_directory')
`
func (q *Queries) CountRunningTasks(ctx context.Context, agentID pgtype.UUID) (int64, error) {
row := q.db.QueryRow(ctx, countRunningTasks, agentID)
var count int64
err := row.Scan(&count)
return count, err
}
const createAgent = `-- name: CreateAgent :one
INSERT INTO agent (
workspace_id, name, description, avatar_url, runtime_mode,
runtime_config, runtime_id, visibility, max_concurrent_tasks, owner_id,
instructions, custom_env, custom_args, mcp_config, model, thinking_level
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16)
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
type CreateAgentParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Name string `json:"name"`
Description string `json:"description"`
AvatarUrl pgtype.Text `json:"avatar_url"`
RuntimeMode string `json:"runtime_mode"`
RuntimeConfig []byte `json:"runtime_config"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Visibility string `json:"visibility"`
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
OwnerID pgtype.UUID `json:"owner_id"`
Instructions string `json:"instructions"`
CustomEnv []byte `json:"custom_env"`
CustomArgs []byte `json:"custom_args"`
McpConfig []byte `json:"mcp_config"`
Model pgtype.Text `json:"model"`
ThinkingLevel pgtype.Text `json:"thinking_level"`
}
func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent, error) {
row := q.db.QueryRow(ctx, createAgent,
arg.WorkspaceID,
arg.Name,
arg.Description,
arg.AvatarUrl,
arg.RuntimeMode,
arg.RuntimeConfig,
arg.RuntimeID,
arg.Visibility,
arg.MaxConcurrentTasks,
arg.OwnerID,
arg.Instructions,
arg.CustomEnv,
arg.CustomArgs,
arg.McpConfig,
arg.Model,
arg.ThinkingLevel,
)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const createAgentTask = `-- name: CreateAgentTask :one
INSERT INTO agent_task_queue (
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
trigger_summary, force_fresh_session, is_leader_task
)
VALUES (
$1, $2, $3, 'queued', $4, $5,
$6,
COALESCE($7::boolean, FALSE),
COALESCE($8::boolean, FALSE)
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
type CreateAgentTaskParams struct {
AgentID pgtype.UUID `json:"agent_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
IssueID pgtype.UUID `json:"issue_id"`
Priority int32 `json:"priority"`
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
TriggerSummary pgtype.Text `json:"trigger_summary"`
ForceFreshSession pgtype.Bool `json:"force_fresh_session"`
IsLeaderTask pgtype.Bool `json:"is_leader_task"`
}
func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, createAgentTask,
arg.AgentID,
arg.RuntimeID,
arg.IssueID,
arg.Priority,
arg.TriggerCommentID,
arg.TriggerSummary,
arg.ForceFreshSession,
arg.IsLeaderTask,
)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
VALUES ($1, $2, NULL, 'queued', $3, $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
type CreateQuickCreateTaskParams struct {
AgentID pgtype.UUID `json:"agent_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Priority int32 `json:"priority"`
Context []byte `json:"context"`
}
// Quick-create tasks have no issue / chat / autopilot link; the entire job
// description (prompt, requester, workspace) lives in context JSONB. The
// daemon detects this variant via context.type == "quick_create".
func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCreateTaskParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, createQuickCreateTask,
arg.AgentID,
arg.RuntimeID,
arg.Priority,
arg.Context,
)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const createRetryTask = `-- name: CreateRetryTask :one
INSERT INTO agent_task_queue (
agent_id, runtime_id, issue_id, chat_session_id, autopilot_run_id,
status, priority, trigger_comment_id, trigger_summary, context,
session_id, work_dir,
attempt, max_attempts, parent_task_id, force_fresh_session, is_leader_task
)
SELECT
p.agent_id, p.runtime_id, p.issue_id, p.chat_session_id, p.autopilot_run_id,
'queued', p.priority, p.trigger_comment_id, p.trigger_summary, p.context,
CASE WHEN p.failure_reason IS NOT DISTINCT FROM 'codex_semantic_inactivity' THEN NULL ELSE p.session_id END,
CASE WHEN p.failure_reason IS NOT DISTINCT FROM 'codex_semantic_inactivity' THEN NULL ELSE p.work_dir END,
p.attempt + 1, p.max_attempts, p.id,
p.failure_reason IS NOT DISTINCT FROM 'codex_semantic_inactivity',
p.is_leader_task
FROM agent_task_queue p
WHERE p.id = $1
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
// Clones a parent task into a fresh queued attempt. Carries forward the
// agent's resume context (session_id/work_dir) so the child can continue
// the conversation when the backend supports it. Resume-unsafe failures are
// retried as fresh sessions so the child does not inherit a stuck agent
// conversation. Keep the CASE WHEN predicates in sync with
// resumeUnsafeFailureReason and the resume lookup blacklists. attempt is
// incremented; max_attempts, trigger_comment_id, and is_leader_task are
// inherited so the retried task keeps the same squad-role provenance as its
// parent and the self-trigger guard in shouldEnqueueSquadLeaderOnComment
// continues to recognise it as a leader task.
func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, createRetryTask, id)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const expireStaleQueuedTasks = `-- name: ExpireStaleQueuedTasks :many
WITH victims AS (
SELECT id FROM agent_task_queue
WHERE status = 'queued'
AND created_at < now() - make_interval(secs => $1::double precision)
ORDER BY created_at ASC
LIMIT $2::int
FOR UPDATE SKIP LOCKED
)
UPDATE agent_task_queue t
SET status = 'failed',
completed_at = now(),
error = 'task expired in queue',
failure_reason = 'queued_expired'
FROM victims v
WHERE t.id = v.id
AND t.status = 'queued'
AND t.created_at < now() - make_interval(secs => $1::double precision)
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason
`
type ExpireStaleQueuedTasksParams struct {
TtlSecs float64 `json:"ttl_secs"`
MaxPerTick int32 `json:"max_per_tick"`
}
// Fails tasks that have been sitting in 'queued' for longer than the TTL.
// This is the cleanup arm of the MUL-1899 "queued backlog" fix: even with the
// new dispatch-time admission gate that refuses to enqueue when the runtime
// is offline, we still need to drain the historical 87k+ doomed rows and
// handle edge cases where a runtime goes offline AFTER a task is already
// queued (the admission check protects new enqueues, not in-flight queue
// depth).
//
// Concurrency safety: the daemon's claim path may race with this sweeper to
// transition the same row out of 'queued'. We protect against that two
// ways:
// 1. The CTE selects victims with FOR UPDATE SKIP LOCKED so a row that is
// currently being claimed (or otherwise locked) is skipped — no lock
// contention with the dispatch path, and we won't queue up behind it.
// 2. The outer UPDATE re-checks status='queued' AND the TTL predicate at
// apply time. If a daemon claimed the row between selection and update
// (e.g. lock released after the claim transaction commits), the row is
// already 'dispatched'/'running' and the WHERE clause filters it out
// so we cannot clobber an in-flight task.
//
// Capped via LIMIT inside the CTE so a single sweep tick cannot monopolise
// the DB when the backlog is large — the sweeper drains the rest on
// subsequent ticks.
func (q *Queries) ExpireStaleQueuedTasks(ctx context.Context, arg ExpireStaleQueuedTasksParams) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, expireStaleQueuedTasks, arg.TtlSecs, arg.MaxPerTick)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const failAgentTask = `-- name: FailAgentTask :one
UPDATE agent_task_queue
SET status = 'failed',
completed_at = now(),
error = $2,
failure_reason = COALESCE($3, 'agent_error'),
session_id = COALESCE($4, session_id),
work_dir = COALESCE($5, work_dir)
WHERE id = $1 AND status IN ('dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
type FailAgentTaskParams struct {
ID pgtype.UUID `json:"id"`
Error pgtype.Text `json:"error"`
FailureReason pgtype.Text `json:"failure_reason"`
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
}
// Marks a task as failed. session_id and work_dir are merged via COALESCE so
// if the agent already established a real session before failing (e.g. it
// crashed mid-conversation, was cancelled, or hit a tool error) the resume
// pointer is preserved on the task row. The next chat task can then fall
// back to GetLastChatTaskSession and continue the conversation instead of
// silently starting over.
//
// failure_reason is a coarse classifier consumed by the auto-retry path;
// 'agent_error' is the safe default when the daemon doesn't supply one.
func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, failAgentTask,
arg.ID,
arg.Error,
arg.FailureReason,
arg.SessionID,
arg.WorkDir,
)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const failStaleTasks = `-- name: FailStaleTasks :many
UPDATE agent_task_queue
SET status = 'failed', completed_at = now(), error = 'task timed out',
failure_reason = 'timeout'
WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision))
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
type FailStaleTasksParams struct {
DispatchTimeoutSecs float64 `json:"dispatch_timeout_secs"`
RunningTimeoutSecs float64 `json:"running_timeout_secs"`
}
// Fails tasks stuck in dispatched/running beyond the given thresholds.
// Handles cases where the daemon is alive but the task is orphaned
// (e.g. agent process hung, daemon failed to report completion).
// waiting_local_directory rows are intentionally excluded: the daemon owns
// the wait (with its own ctx-driven timeout) and a legitimate queue ahead
// of this task can exceed the dispatch / running timeouts without being
// "stuck". If the daemon dies, RecoverOrphanedTasksForRuntime reclaims
// those rows at restart.
func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, failStaleTasks, arg.DispatchTimeoutSecs, arg.RunningTimeoutSecs)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getAgent = `-- name: GetAgent :one
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level FROM agent
WHERE id = $1
`
func (q *Queries) GetAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
row := q.db.QueryRow(ctx, getAgent, id)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const getAgentInWorkspace = `-- name: GetAgentInWorkspace :one
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level FROM agent
WHERE id = $1 AND workspace_id = $2
`
type GetAgentInWorkspaceParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspaceParams) (Agent, error) {
row := q.db.QueryRow(ctx, getAgentInWorkspace, arg.ID, arg.WorkspaceID)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const getAgentTask = `-- name: GetAgentTask :one
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason FROM agent_task_queue
WHERE id = $1
`
func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, getAgentTask, id)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const getLastTaskSession = `-- name: GetLastTaskSession :one
SELECT session_id, work_dir, runtime_id FROM agent_task_queue
WHERE agent_id = $1 AND issue_id = $2
AND (
status = 'completed'
OR (
status = 'failed'
AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message', 'api_invalid_request', 'codex_semantic_inactivity')
AND NOT (COALESCE(error, '') ILIKE '%400%' AND COALESCE(error, '') ILIKE '%invalid_request_error%')
)
)
AND session_id IS NOT NULL
ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC
LIMIT 1
`
type GetLastTaskSessionParams struct {
AgentID pgtype.UUID `json:"agent_id"`
IssueID pgtype.UUID `json:"issue_id"`
}
type GetLastTaskSessionRow struct {
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
RuntimeID pgtype.UUID `json:"runtime_id"`
}
// Returns the session_id and work_dir from the most recent task for a given
// (agent_id, issue_id) pair, used for session resumption on the auto-retry
// path. We accept both 'completed' and 'failed' tasks: a failed task may
// have established a real agent session before crashing (orphaned by a
// daemon restart, runtime offline, or sweeper timeout), and the daemon pins
// the resume pointer mid-flight via UpdateAgentTaskSession. Without this,
// an auto-retry of a mid-run failure would silently start a fresh
// conversation and lose the in-flight context — exactly what MUL-1128's B
// branch is meant to fix.
//
// Manual rerun (TaskService.RerunIssue) does NOT take this path: it sets
// force_fresh_session=true on the new task, and the daemon claim handler
// skips this lookup entirely. The user already judged the prior output bad;
// resuming the same conversation would replay a poisoned state.
//
// Tasks that ended in a known "poisoned" terminal state are also excluded
// here so even auto-retry does not inherit the bad session. The daemon
// classifies these failures (iteration_limit, agent_fallback_message,
// api_invalid_request, codex_semantic_inactivity) when it detects either an
// agent fallback marker in the output, an upstream API 400 that means the
// conversation history itself is unprocessable (oversized image, malformed
// base64, etc.), or a Codex semantic inactivity timeout whose recorded
// session may replay the same stuck state.
//
// The error-text ILIKE clause is defense-in-depth for the api_invalid_request
// shape: a legacy row tagged 'agent_error' (pre-MUL-1921), a deploy-window
// row that the old code wrote between migration and rollout, or a future
// error format that escapes the daemon classifier all still get filtered
// here as long as the canonical Anthropic 400 marker is present in the
// error text. Migration 079 backfills the failure_reason column itself,
// so observability stays accurate; this clause guarantees session resume
// never picks up a bad session even when failure_reason hasn't caught up.
func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) {
row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID)
var i GetLastTaskSessionRow
err := row.Scan(&i.SessionID, &i.WorkDir, &i.RuntimeID)
return i, err
}
const getLatestTaskIsLeaderForIssueAndAgent = `-- name: GetLatestTaskIsLeaderForIssueAndAgent :one
SELECT is_leader_task FROM agent_task_queue
WHERE issue_id = $1 AND agent_id = $2
ORDER BY created_at DESC
LIMIT 1
`
type GetLatestTaskIsLeaderForIssueAndAgentParams struct {
IssueID pgtype.UUID `json:"issue_id"`
AgentID pgtype.UUID `json:"agent_id"`
}
// Returns the is_leader_task flag of the agent's most recent task on this
// issue, or NULL if the agent has never had a task on this issue. Used by
// the squad-leader self-trigger guard to tell whether the agent's last
// activity on the issue was in the leader role or the worker role (an
// agent that holds both roles in a squad would otherwise be skipped by
// the role-blind authorID == leaderID check).
func (q *Queries) GetLatestTaskIsLeaderForIssueAndAgent(ctx context.Context, arg GetLatestTaskIsLeaderForIssueAndAgentParams) (bool, error) {
row := q.db.QueryRow(ctx, getLatestTaskIsLeaderForIssueAndAgent, arg.IssueID, arg.AgentID)
var is_leader_task bool
err := row.Scan(&is_leader_task)
return is_leader_task, err
}
const getWorkspaceAgentActivity30d = `-- name: GetWorkspaceAgentActivity30d :many
SELECT
atq.agent_id,
DATE_TRUNC('day', atq.completed_at)::timestamptz AS bucket,
COUNT(*)::int AS task_count,
COUNT(*) FILTER (WHERE atq.status = 'failed')::int AS failed_count
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND atq.completed_at IS NOT NULL
AND atq.completed_at > now() - INTERVAL '30 days'
GROUP BY atq.agent_id, bucket
ORDER BY atq.agent_id, bucket
`
type GetWorkspaceAgentActivity30dRow struct {
AgentID pgtype.UUID `json:"agent_id"`
Bucket pgtype.Timestamptz `json:"bucket"`
TaskCount int32 `json:"task_count"`
FailedCount int32 `json:"failed_count"`
}
// Returns per-agent daily activity buckets for the last 30 days. Single
// workspace-wide read backs both surfaces:
// - Agents list ACTIVITY column — uses only the trailing 7 buckets
// - Agent detail "Last 30 days" panel — uses the full 30
//
// 30 days contains 7 days, so one fetch + a client-side .slice(-7) wins
// over fetching twice. Days with no completion produce no row; the
// front-end zero-fills.
//
// Anchored on completed_at (not created_at) because the sparkline answers
// "what did this agent produce?" not "what was queued at it?". A task that's
// still in flight has no completed_at and contributes nothing here — that's
// correct: in-flight tasks are surfaced via the live presence indicator,
// not the historical trend.
func (q *Queries) GetWorkspaceAgentActivity30d(ctx context.Context, workspaceID pgtype.UUID) ([]GetWorkspaceAgentActivity30dRow, error) {
rows, err := q.db.Query(ctx, getWorkspaceAgentActivity30d, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []GetWorkspaceAgentActivity30dRow{}
for rows.Next() {
var i GetWorkspaceAgentActivity30dRow
if err := rows.Scan(
&i.AgentID,
&i.Bucket,
&i.TaskCount,
&i.FailedCount,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getWorkspaceAgentRunCounts = `-- name: GetWorkspaceAgentRunCounts :many
SELECT
atq.agent_id,
COUNT(*)::int AS run_count
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND atq.created_at > now() - INTERVAL '30 days'
GROUP BY atq.agent_id
`
type GetWorkspaceAgentRunCountsRow struct {
AgentID pgtype.UUID `json:"agent_id"`
RunCount int32 `json:"run_count"`
}
// Total task runs per agent over the trailing 30 days, used by the Agents
// list RUNS column. 30-day window keeps the count meaningful (a long-dormant
// agent shouldn't show "5,420 runs from 2 years ago") and keeps the scan
// bounded as the workspace ages.
func (q *Queries) GetWorkspaceAgentRunCounts(ctx context.Context, workspaceID pgtype.UUID) ([]GetWorkspaceAgentRunCountsRow, error) {
rows, err := q.db.Query(ctx, getWorkspaceAgentRunCounts, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []GetWorkspaceAgentRunCountsRow{}
for rows.Next() {
var i GetWorkspaceAgentRunCountsRow
if err := rows.Scan(&i.AgentID, &i.RunCount); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const hasActiveTaskForIssue = `-- name: HasActiveTaskForIssue :one
SELECT count(*) > 0 AS has_active FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
`
// Returns true if there is any queued, dispatched, waiting_local_directory,
// or running task for the issue.
func (q *Queries) HasActiveTaskForIssue(ctx context.Context, issueID pgtype.UUID) (bool, error) {
row := q.db.QueryRow(ctx, hasActiveTaskForIssue, issueID)
var has_active bool
err := row.Scan(&has_active)
return has_active, err
}
const hasPendingTaskForIssue = `-- name: HasPendingTaskForIssue :one
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('queued', 'dispatched')
`
// Returns true if there is a queued or dispatched (but not yet running) task for the issue.
// Used by the coalescing queue: allow enqueue when a task is running (so
// the agent picks up new comments on the next cycle) but skip if a pending
// task already exists (natural dedup).
func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUID) (bool, error) {
row := q.db.QueryRow(ctx, hasPendingTaskForIssue, issueID)
var has_pending bool
err := row.Scan(&has_pending)
return has_pending, err
}
const hasPendingTaskForIssueAndAgent = `-- name: HasPendingTaskForIssueAndAgent :one
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched')
`
type HasPendingTaskForIssueAndAgentParams struct {
IssueID pgtype.UUID `json:"issue_id"`
AgentID pgtype.UUID `json:"agent_id"`
}
// Returns true if a specific agent already has a queued or dispatched task
// for the given issue. Used by @mention trigger dedup.
func (q *Queries) HasPendingTaskForIssueAndAgent(ctx context.Context, arg HasPendingTaskForIssueAndAgentParams) (bool, error) {
row := q.db.QueryRow(ctx, hasPendingTaskForIssueAndAgent, arg.IssueID, arg.AgentID)
var has_pending bool
err := row.Scan(&has_pending)
return has_pending, err
}
const linkTaskToIssue = `-- name: LinkTaskToIssue :exec
UPDATE agent_task_queue
SET issue_id = $2
WHERE id = $1 AND issue_id IS NULL
`
type LinkTaskToIssueParams struct {
ID pgtype.UUID `json:"id"`
IssueID pgtype.UUID `json:"issue_id"`
}
// Attaches the issue a quick-create task produced back to the task row, once
// the agent has finished and the issue exists. Guarded by `issue_id IS NULL`
// so this never overwrites an issue id that was set at task creation (only
// quick-create tasks land here unset). Fixes the activity row staying on
// "Creating issue" forever after completion.
func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams) error {
_, err := q.db.Exec(ctx, linkTaskToIssue, arg.ID, arg.IssueID)
return err
}
const listActiveAgentsByRuntime = `-- name: ListActiveAgentsByRuntime :many
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level FROM agent
WHERE runtime_id = $1 AND archived_at IS NULL
ORDER BY name ASC
`
// Returns every non-archived agent bound to a runtime. Backs the cascade
// delete dialog: when DELETE /api/runtimes/:id refuses with
// runtime_has_active_agents, the response carries this list so the front-end
// can render exactly the agents that will be archived if the user confirms,
// and so the cascade endpoint's expected_active_agent_ids check has a stable
// snapshot to compare against. Ordered by name for a deterministic display.
func (q *Queries) ListActiveAgentsByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]Agent, error) {
rows, err := q.db.Query(ctx, listActiveAgentsByRuntime, runtimeID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []Agent{}
for rows.Next() {
var i Agent
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listActiveAgentsByRuntimeForUpdate = `-- name: ListActiveAgentsByRuntimeForUpdate :many
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level FROM agent
WHERE runtime_id = $1 AND archived_at IS NULL
ORDER BY name ASC
FOR UPDATE
`
// FOR UPDATE variant used inside the cascade-delete transaction. Locks
// each currently-active agent row so a concurrent archive/move of one
// of those rows blocks until our transaction commits. Pair with
// LockAgentRuntime, which holds the runtime row exclusively to also
// block FK-validated INSERTs / runtime_id updates that would otherwise
// add a new agent to the runtime mid-cascade. Together they guarantee
// that the set we compared against expected_active_agent_ids is exactly
// the set ArchiveAgentsByIDs will operate on — no race window.
func (q *Queries) ListActiveAgentsByRuntimeForUpdate(ctx context.Context, runtimeID pgtype.UUID) ([]Agent, error) {
rows, err := q.db.Query(ctx, listActiveAgentsByRuntimeForUpdate, runtimeID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []Agent{}
for rows.Next() {
var i Agent
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
ORDER BY created_at DESC
`
// Backs the issue-detail "agent live" banner. Includes 'queued' so the
// banner shows up the moment a task is enqueued — not only after a runtime
// claims it. The queued window can be long when the runtime is offline or
// busy on a prior task, and a silent UI during that window looks like the
// platform never received the trigger.
func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listActiveTasksByIssue, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason FROM agent_task_queue
WHERE agent_id = $1
ORDER BY created_at DESC
`
func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listAgentTasks, agentID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAgents = `-- name: ListAgents :many
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level FROM agent
WHERE workspace_id = $1 AND archived_at IS NULL
ORDER BY created_at ASC
`
func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Agent, error) {
rows, err := q.db.Query(ctx, listAgents, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []Agent{}
for rows.Next() {
var i Agent
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAllAgents = `-- name: ListAllAgents :many
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level FROM agent
WHERE workspace_id = $1
ORDER BY created_at ASC
`
func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Agent, error) {
rows, err := q.db.Query(ctx, listAllAgents, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []Agent{}
for rows.Next() {
var i Agent
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC
`
func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listPendingTasksByRuntime, runtimeID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason FROM agent_task_queue
WHERE runtime_id = $1 AND status = 'queued'
ORDER BY priority DESC, created_at ASC
`
// Returns rows the runtime can attempt to claim. Status is restricted to
// 'queued' (in contrast to ListPendingTasksByRuntime which also includes
// 'dispatched') because dispatched rows are by definition already owned
// and cannot be re-claimed — including them in the candidate list pads
// the result with rows that always lose the per-(issue, agent) race in
// ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
// runtime is busy on a long-running task. Backed by the partial index
// idx_agent_task_queue_claim_candidates so the warm path is cheap.
func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listQueuedClaimCandidatesByRuntime, runtimeID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listTasksByIssue = `-- name: ListTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason FROM agent_task_queue
WHERE issue_id = $1
ORDER BY created_at DESC
`
func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listTasksByIssue, issueID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND atq.status IN ('queued', 'dispatched', 'running', 'waiting_local_directory')
UNION ALL
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task, t.wait_reason FROM (
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task, atq.wait_reason
FROM agent_task_queue atq
JOIN agent a ON a.id = atq.agent_id
WHERE a.workspace_id = $1
AND atq.status IN ('completed', 'failed')
ORDER BY atq.agent_id, atq.completed_at DESC NULLS LAST
) t
`
// Returns the tasks needed to derive each agent's current presence:
// - All active tasks (queued / dispatched / running) — for working signal + counts
// - Each agent's most recent OUTCOME task (completed / failed) — for sticky
// failed signal
//
// The front-end picks "active wins, else latest outcome" — see derive-presence.ts.
//
// Cancelled tasks are excluded from the outcome half on purpose: cancel is a
// procedural signal ("attempt aborted"), not an outcome. It tells us nothing
// about whether the agent works, so it must NOT be allowed to mask a prior
// failure. Concretely: if an agent fails and then the user cancels the queued
// retry (or the parent issue closes and cascades cancels), the failed signal
// has to stay red. Only a real success (completed) or a fresh attempt (active)
// clears it.
//
// No UI windows in SQL: stickiness is decided by "is the latest outcome a
// failure?", not a 2-minute clock. JOINs agent because agent_task_queue has
// no workspace_id column.
func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, listWorkspaceAgentTaskSnapshot, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const markAgentTaskWaitingLocalDirectory = `-- name: MarkAgentTaskWaitingLocalDirectory :one
UPDATE agent_task_queue
SET status = 'waiting_local_directory', wait_reason = $2
WHERE id = $1 AND status = 'dispatched'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
type MarkAgentTaskWaitingLocalDirectoryParams struct {
ID pgtype.UUID `json:"id"`
WaitReason pgtype.Text `json:"wait_reason"`
}
// Transitions a freshly-dispatched task into 'waiting_local_directory' while
// the daemon waits for another in-flight task to release the path lock on a
// project_resource of type local_directory. wait_reason carries a short
// human-readable hint (typically the contested path) that the UI surfaces
// alongside the status.
//
// The CHECK only allows the transition from 'dispatched' so a daemon can't
// mark an already-running or terminal task as waiting; the StartAgentTask
// mutation handles the reverse transition once the lock is acquired.
func (q *Queries) MarkAgentTaskWaitingLocalDirectory(ctx context.Context, arg MarkAgentTaskWaitingLocalDirectoryParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, markAgentTaskWaitingLocalDirectory, arg.ID, arg.WaitReason)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const reclaimStaleDispatchedTaskForRuntime = `-- name: ReclaimStaleDispatchedTaskForRuntime :one
UPDATE agent_task_queue
SET dispatched_at = now()
WHERE id = (
SELECT atq.id FROM agent_task_queue atq
WHERE atq.runtime_id = $1
AND atq.status = 'dispatched'
AND atq.started_at IS NULL
AND atq.dispatched_at < now() - make_interval(secs => $2::double precision)
ORDER BY atq.priority DESC, atq.dispatched_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
type ReclaimStaleDispatchedTaskForRuntimeParams struct {
RuntimeID pgtype.UUID `json:"runtime_id"`
ClaimRecoverySecs float64 `json:"claim_recovery_secs"`
}
// Re-delivers a task whose previous claim likely succeeded server-side but
// whose response never reached the daemon. The task is still in `dispatched`
// with no `started_at`, so the daemon has not acknowledged it via StartTask.
// Refresh dispatched_at so the server-side dispatch timeout measures from the
// recovered delivery attempt.
func (q *Queries) ReclaimStaleDispatchedTaskForRuntime(ctx context.Context, arg ReclaimStaleDispatchedTaskForRuntimeParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, reclaimStaleDispatchedTaskForRuntime, arg.RuntimeID, arg.ClaimRecoverySecs)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const recoverOrphanedTasksForRuntime = `-- name: RecoverOrphanedTasksForRuntime :many
UPDATE agent_task_queue
SET status = 'failed',
completed_at = now(),
error = 'daemon restarted while task was in flight',
failure_reason = 'runtime_recovery',
wait_reason = NULL
WHERE runtime_id = $1 AND status IN ('dispatched', 'running', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
// Called by the daemon at startup. Atomically fails any dispatched/running/
// waiting_local_directory task that the prior incarnation of this runtime
// owned but did not finalize. Returns the failed rows so callers can hand
// them to the auto-retry path. waiting_local_directory rows are included
// because the daemon holding the path lock is the same process that just
// died — without us, the row would sit waiting forever.
func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
rows, err := q.db.Query(ctx, recoverOrphanedTasksForRuntime, runtimeID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentTaskQueue{}
for rows.Next() {
var i AgentTaskQueue
if err := rows.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const refreshAgentStatusFromTasks = `-- name: RefreshAgentStatusFromTasks :one
UPDATE agent AS a
SET status = CASE WHEN EXISTS (
SELECT 1 FROM agent_task_queue q
WHERE q.agent_id = a.id AND q.status IN ('dispatched', 'running', 'waiting_local_directory')
) THEN 'working' ELSE 'idle' END,
updated_at = now()
WHERE a.id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
func (q *Queries) RefreshAgentStatusFromTasks(ctx context.Context, id pgtype.UUID) (Agent, error) {
row := q.db.QueryRow(ctx, refreshAgentStatusFromTasks, id)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const restoreAgent = `-- name: RestoreAgent :one
UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
func (q *Queries) RestoreAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
row := q.db.QueryRow(ctx, restoreAgent, id)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running', started_at = now(), wait_reason = NULL
WHERE id = $1 AND status IN ('dispatched', 'waiting_local_directory')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task, wait_reason
`
// Transitions a task to running. Accepts either 'dispatched' (the normal
// claim → run flow) or 'waiting_local_directory' (the daemon held the row in
// a wait state while another task owned the local_directory path lock; once
// the lock was acquired the daemon flips here). wait_reason is cleared on
// the transition so a future read can't conflate "currently waiting" with
// "previously waited".
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, startAgentTask, id)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.ChatSessionID,
&i.AutopilotRunID,
&i.Attempt,
&i.MaxAttempts,
&i.ParentTaskID,
&i.FailureReason,
&i.TriggerSummary,
&i.ForceFreshSession,
&i.IsLeaderTask,
&i.WaitReason,
)
return i, err
}
const updateAgent = `-- name: UpdateAgent :one
UPDATE agent SET
name = COALESCE($2, name),
description = COALESCE($3, description),
avatar_url = COALESCE($4, avatar_url),
runtime_config = COALESCE($5, runtime_config),
runtime_mode = COALESCE($6, runtime_mode),
runtime_id = COALESCE($7, runtime_id),
visibility = COALESCE($8, visibility),
status = COALESCE($9, status),
max_concurrent_tasks = COALESCE($10, max_concurrent_tasks),
instructions = COALESCE($11, instructions),
custom_env = COALESCE($12, custom_env),
custom_args = COALESCE($13, custom_args),
mcp_config = COALESCE($14, mcp_config),
model = COALESCE($15, model),
thinking_level = COALESCE($16, thinking_level),
updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
type UpdateAgentParams struct {
ID pgtype.UUID `json:"id"`
Name pgtype.Text `json:"name"`
Description pgtype.Text `json:"description"`
AvatarUrl pgtype.Text `json:"avatar_url"`
RuntimeConfig []byte `json:"runtime_config"`
RuntimeMode pgtype.Text `json:"runtime_mode"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Visibility pgtype.Text `json:"visibility"`
Status pgtype.Text `json:"status"`
MaxConcurrentTasks pgtype.Int4 `json:"max_concurrent_tasks"`
Instructions pgtype.Text `json:"instructions"`
CustomEnv []byte `json:"custom_env"`
CustomArgs []byte `json:"custom_args"`
McpConfig []byte `json:"mcp_config"`
Model pgtype.Text `json:"model"`
ThinkingLevel pgtype.Text `json:"thinking_level"`
}
func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent, error) {
row := q.db.QueryRow(ctx, updateAgent,
arg.ID,
arg.Name,
arg.Description,
arg.AvatarUrl,
arg.RuntimeConfig,
arg.RuntimeMode,
arg.RuntimeID,
arg.Visibility,
arg.Status,
arg.MaxConcurrentTasks,
arg.Instructions,
arg.CustomEnv,
arg.CustomArgs,
arg.McpConfig,
arg.Model,
arg.ThinkingLevel,
)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const updateAgentCustomEnv = `-- name: UpdateAgentCustomEnv :one
UPDATE agent
SET custom_env = $2, updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
type UpdateAgentCustomEnvParams struct {
ID pgtype.UUID `json:"id"`
CustomEnv []byte `json:"custom_env"`
}
// Replaces an agent's custom_env map wholesale. Used by the dedicated
// env-management endpoint (POST/PUT /api/agents/{id}/env), which is the
// only post-creation write path for env. UpdateAgent has been stripped
// of custom_env handling so all env mutations flow through here and the
// handler's audit-log + **** sentinel guard.
func (q *Queries) UpdateAgentCustomEnv(ctx context.Context, arg UpdateAgentCustomEnvParams) (Agent, error) {
row := q.db.QueryRow(ctx, updateAgentCustomEnv, arg.ID, arg.CustomEnv)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const updateAgentStatus = `-- name: UpdateAgentStatus :one
UPDATE agent SET status = $2, updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level
`
type UpdateAgentStatusParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
}
func (q *Queries) UpdateAgentStatus(ctx context.Context, arg UpdateAgentStatusParams) (Agent, error) {
row := q.db.QueryRow(ctx, updateAgentStatus, arg.ID, arg.Status)
var i Agent
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Name,
&i.AvatarUrl,
&i.RuntimeMode,
&i.RuntimeConfig,
&i.Visibility,
&i.Status,
&i.MaxConcurrentTasks,
&i.OwnerID,
&i.CreatedAt,
&i.UpdatedAt,
&i.Description,
&i.RuntimeID,
&i.Instructions,
&i.ArchivedAt,
&i.ArchivedBy,
&i.CustomEnv,
&i.CustomArgs,
&i.McpConfig,
&i.Model,
&i.ThinkingLevel,
)
return i, err
}
const updateAgentTaskSession = `-- name: UpdateAgentTaskSession :exec
UPDATE agent_task_queue
SET session_id = COALESCE($2, session_id),
work_dir = COALESCE($3, work_dir)
WHERE id = $1 AND status IN ('dispatched', 'running')
`
type UpdateAgentTaskSessionParams struct {
ID pgtype.UUID `json:"id"`
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
}
// Pins the resume pointer mid-flight so a daemon crash leaves a usable
// session_id/work_dir on the task row. No-op if the task is no longer
// in dispatched/running. waiting_local_directory tasks have no session yet
// so this query intentionally skips them.
func (q *Queries) UpdateAgentTaskSession(ctx context.Context, arg UpdateAgentTaskSessionParams) error {
_, err := q.db.Exec(ctx, updateAgentTaskSession, arg.ID, arg.SessionID, arg.WorkDir)
return err
}