mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
* feat(runtimes): cascade-archive agents on runtime delete (MUL-2667) Replace the bare 409 "cannot delete runtime: it has active agents" with a structured response carrying the blocking agent list, and wire a cascade endpoint that archives those agents, cancels their tasks, pauses dangling autopilots and deletes the runtime in a single transaction. The unified DeleteRuntimeDialog opens directly in cascade mode when the runtime has bound agents, pivots from light to cascade if the strict DELETE refuses with runtime_has_active_agents, and re-prompts when the cascade refuses with runtime_delete_plan_changed (live agent set drifted while the dialog was open). The online-local self-healing rule is preserved at the affordance level (kebab hidden, Diagnostics button disabled with tooltip) and re-checked at confirm time as defence in depth. Co-authored-by: multica-agent <github@multica.ai> * fix(runtimes): close cascade race + i18n delete dialog (PR #3266 review) - Acquire FOR UPDATE on the runtime row at the top of the cascade tx so FK-validated agent INSERTs/UPDATEs that would point at this runtime block until commit, and lock each currently-active agent row via ListActiveAgentsByRuntimeForUpdate so a concurrent archive/move of an existing active row also blocks. - Switch the bulk archive from runtime-keyed (ArchiveAgentsByRuntime) to ID-keyed (ArchiveAgentsByIDs), narrowed to the user-confirmed expected_active_agent_ids set. Combined with the runtime row lock, this guarantees no agent outside the confirmed plan can be silently archived between plan-compare and archive even at read-committed. - Wire delete-runtime-dialog.tsx to runtimes locale via useT(); add detail.delete_dialog.{light,cascade} keys (EN with _one/_other plurals, zh-Hans _other) covering titles, descriptions, warning, notices, checkbox, buttons, table headers, presence labels, and toasts. Resolves the i18next/no-literal-string CI failure. - Locale parity test passes (51 tests). All 4 dialog test cases pass unmodified (EN copy preserves original wording). Full views vitest: 91 files / 792 tests green; full server go test: green. Co-authored-by: multica-agent <github@multica.ai> --------- Co-authored-by: multica-agent <github@multica.ai>
2602 lines
81 KiB
Go
2602 lines
81 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.31.1
|
|
// source: agent.sql
|
|
|
|
package db
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const archiveAgent = `-- name: ArchiveAgent :one
|
|
UPDATE agent SET archived_at = now(), archived_by = $2, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
type ArchiveAgentParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
ArchivedBy pgtype.UUID `json:"archived_by"`
|
|
}
|
|
|
|
func (q *Queries) ArchiveAgent(ctx context.Context, arg ArchiveAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, archiveAgent, arg.ID, arg.ArchivedBy)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const archiveAgentsByIDs = `-- name: ArchiveAgentsByIDs :many
|
|
UPDATE agent
|
|
SET archived_at = now(), archived_by = $1, updated_at = now()
|
|
WHERE id = ANY($2::uuid[]) AND archived_at IS NULL
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
type ArchiveAgentsByIDsParams struct {
|
|
ArchivedBy pgtype.UUID `json:"archived_by"`
|
|
AgentIds []pgtype.UUID `json:"agent_ids"`
|
|
}
|
|
|
|
// Narrow archive that only touches the explicit ID list. Used by the
|
|
// cascade-delete endpoint so the user's expected_active_agent_ids list
|
|
// is the authoritative bound on what gets archived: any agent that
|
|
// appeared on the runtime after the user opened the dialog is filtered
|
|
// out here so it can't be silently archived even in the (vanishingly
|
|
// rare) case where a row-level race slips past the runtime FOR UPDATE
|
|
// lock. Returns the affected rows so the caller can broadcast
|
|
// agent:archived per agent.
|
|
func (q *Queries) ArchiveAgentsByIDs(ctx context.Context, arg ArchiveAgentsByIDsParams) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, archiveAgentsByIDs, arg.ArchivedBy, arg.AgentIds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const archiveAgentsByRuntime = `-- name: ArchiveAgentsByRuntime :many
|
|
UPDATE agent
|
|
SET archived_at = now(), archived_by = $1, updated_at = now()
|
|
WHERE runtime_id = ANY($2::uuid[]) AND archived_at IS NULL
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
type ArchiveAgentsByRuntimeParams struct {
|
|
ArchivedBy pgtype.UUID `json:"archived_by"`
|
|
RuntimeIds []pgtype.UUID `json:"runtime_ids"`
|
|
}
|
|
|
|
// Bulk-archives every active agent bound to any runtime in the given set.
|
|
// Used when revoking a leaving member's runtimes so agents pinned to those
|
|
// runtimes can no longer be assigned new work. Returns the affected rows so
|
|
// the caller can broadcast agent:archived per agent.
|
|
func (q *Queries) ArchiveAgentsByRuntime(ctx context.Context, arg ArchiveAgentsByRuntimeParams) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, archiveAgentsByRuntime, arg.ArchivedBy, arg.RuntimeIds)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTask = `-- name: CancelAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, cancelAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
|
|
// Returns the affected rows so callers can broadcast task:cancelled events.
|
|
// Mirrors the shape of CancelAgentTasksByIssue / CancelAgentTasksByIssueAndAgent
|
|
// (also :many + RETURNING + completed_at) so the three sibling cancel paths
|
|
// behave consistently.
|
|
func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByAgent, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByChatSession = `-- name: CancelAgentTasksByChatSession :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE chat_session_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
// Cancels active tasks belonging to a chat session. Called from
|
|
// DeleteChatSession so the daemon doesn't keep running work whose result
|
|
// has nowhere to land. Must run BEFORE the chat_session row is deleted —
|
|
// the FK ON DELETE SET NULL would otherwise nullify chat_session_id and we
|
|
// could no longer reach those tasks.
|
|
func (q *Queries) CancelAgentTasksByChatSession(ctx context.Context, chatSessionID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByChatSession, chatSessionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
// Cancels every active task on the issue and returns the affected rows so the
|
|
// caller can reconcile each agent's status and broadcast task:cancelled events
|
|
// (#1587). Prior :exec form silently dropped that info, so internal cancel
|
|
// paths (issue status flips to cancelled/done, etc.) left agents stuck at
|
|
// status="working" with no self-correction.
|
|
func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgent :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
type CancelAgentTasksByIssueAndAgentParams struct {
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
}
|
|
|
|
// Cancels active tasks for a single (issue, agent) pair without touching
|
|
// tasks belonging to other agents on the same issue. Used by the manual
|
|
// rerun flow so re-running the assignee doesn't collateral-cancel a
|
|
// still-running @-mention agent on the same issue.
|
|
func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg CancelAgentTasksByIssueAndAgentParams) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByIssueAndAgent, arg.IssueID, arg.AgentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComment :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
// Cancels active tasks whose trigger is the given comment. Called when a
|
|
// comment is deleted so the agent does not run with the now-deleted content
|
|
// already embedded in its prompt. Must run BEFORE the comment row is deleted
|
|
// because the FK ON DELETE SET NULL would otherwise nullify trigger_comment_id
|
|
// and we'd lose the ability to find the affected tasks.
|
|
func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerCommentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByTriggerComment, triggerCommentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const claimAgentTask = `-- name: ClaimAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'dispatched', dispatched_at = now()
|
|
WHERE id = (
|
|
SELECT atq.id FROM agent_task_queue atq
|
|
WHERE atq.agent_id = $1 AND atq.status = 'queued'
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM agent_task_queue active
|
|
WHERE active.agent_id = atq.agent_id
|
|
AND active.status IN ('dispatched', 'running')
|
|
AND (
|
|
(atq.issue_id IS NOT NULL AND active.issue_id = atq.issue_id)
|
|
OR (atq.chat_session_id IS NOT NULL AND active.chat_session_id = atq.chat_session_id)
|
|
OR (
|
|
atq.issue_id IS NULL
|
|
AND atq.chat_session_id IS NULL
|
|
AND atq.autopilot_run_id IS NULL
|
|
AND active.issue_id IS NULL
|
|
AND active.chat_session_id IS NULL
|
|
AND active.autopilot_run_id IS NULL
|
|
)
|
|
)
|
|
)
|
|
ORDER BY atq.priority DESC, atq.created_at ASC
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
|
|
// a task is only claimable when no other task for the same issue AND same agent is
|
|
// already dispatched or running. This allows different agents to work on the same
|
|
// issue in parallel while preventing a single agent from running duplicate tasks.
|
|
// Chat tasks (issue_id IS NULL) use chat_session_id for serialization instead.
|
|
// Quick-create tasks have no issue / chat / autopilot link, so they serialize on
|
|
// "any other quick-create-shaped task" (all four FKs NULL) for the same agent —
|
|
// otherwise a user mashing the create button could fire concurrent quick-creates
|
|
// whose completion lookup would race over "most recent issue by this agent".
|
|
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, claimAgentTask, agentID)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const clearAgentMcpConfig = `-- name: ClearAgentMcpConfig :one
|
|
UPDATE agent SET mcp_config = NULL, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
func (q *Queries) ClearAgentMcpConfig(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, clearAgentMcpConfig, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const clearAgentThinkingLevel = `-- name: ClearAgentThinkingLevel :one
|
|
UPDATE agent SET thinking_level = NULL, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
// Explicit NULL-clear for thinking_level. COALESCE-based UpdateAgent cannot
|
|
// set the column back to NULL, so the API layer routes "user picked Default"
|
|
// through this dedicated query.
|
|
func (q *Queries) ClearAgentThinkingLevel(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, clearAgentThinkingLevel, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const completeAgentTask = `-- name: CompleteAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
|
|
WHERE id = $1 AND status = 'running'
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
type CompleteAgentTaskParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Result []byte `json:"result"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, completeAgentTask,
|
|
arg.ID,
|
|
arg.Result,
|
|
arg.SessionID,
|
|
arg.WorkDir,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const countRunningTasks = `-- name: CountRunningTasks :one
|
|
SELECT count(*) FROM agent_task_queue
|
|
WHERE agent_id = $1 AND status IN ('dispatched', 'running')
|
|
`
|
|
|
|
func (q *Queries) CountRunningTasks(ctx context.Context, agentID pgtype.UUID) (int64, error) {
|
|
row := q.db.QueryRow(ctx, countRunningTasks, agentID)
|
|
var count int64
|
|
err := row.Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
const createAgent = `-- name: CreateAgent :one
|
|
INSERT INTO agent (
|
|
workspace_id, name, description, avatar_url, runtime_mode,
|
|
runtime_config, runtime_id, visibility, max_concurrent_tasks, owner_id,
|
|
instructions, custom_env, custom_args, mcp_config, model, thinking_level,
|
|
skills_local
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
type CreateAgentParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
AvatarUrl pgtype.Text `json:"avatar_url"`
|
|
RuntimeMode string `json:"runtime_mode"`
|
|
RuntimeConfig []byte `json:"runtime_config"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Visibility string `json:"visibility"`
|
|
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
|
|
OwnerID pgtype.UUID `json:"owner_id"`
|
|
Instructions string `json:"instructions"`
|
|
CustomEnv []byte `json:"custom_env"`
|
|
CustomArgs []byte `json:"custom_args"`
|
|
McpConfig []byte `json:"mcp_config"`
|
|
Model pgtype.Text `json:"model"`
|
|
ThinkingLevel pgtype.Text `json:"thinking_level"`
|
|
SkillsLocal string `json:"skills_local"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, createAgent,
|
|
arg.WorkspaceID,
|
|
arg.Name,
|
|
arg.Description,
|
|
arg.AvatarUrl,
|
|
arg.RuntimeMode,
|
|
arg.RuntimeConfig,
|
|
arg.RuntimeID,
|
|
arg.Visibility,
|
|
arg.MaxConcurrentTasks,
|
|
arg.OwnerID,
|
|
arg.Instructions,
|
|
arg.CustomEnv,
|
|
arg.CustomArgs,
|
|
arg.McpConfig,
|
|
arg.Model,
|
|
arg.ThinkingLevel,
|
|
arg.SkillsLocal,
|
|
)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createAgentTask = `-- name: CreateAgentTask :one
|
|
INSERT INTO agent_task_queue (
|
|
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
|
|
trigger_summary, force_fresh_session, is_leader_task
|
|
)
|
|
VALUES (
|
|
$1, $2, $3, 'queued', $4, $5,
|
|
$6,
|
|
COALESCE($7::boolean, FALSE),
|
|
COALESCE($8::boolean, FALSE)
|
|
)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
type CreateAgentTaskParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
Priority int32 `json:"priority"`
|
|
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
|
|
TriggerSummary pgtype.Text `json:"trigger_summary"`
|
|
ForceFreshSession pgtype.Bool `json:"force_fresh_session"`
|
|
IsLeaderTask pgtype.Bool `json:"is_leader_task"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createAgentTask,
|
|
arg.AgentID,
|
|
arg.RuntimeID,
|
|
arg.IssueID,
|
|
arg.Priority,
|
|
arg.TriggerCommentID,
|
|
arg.TriggerSummary,
|
|
arg.ForceFreshSession,
|
|
arg.IsLeaderTask,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
|
|
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
|
|
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
type CreateQuickCreateTaskParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Priority int32 `json:"priority"`
|
|
Context []byte `json:"context"`
|
|
}
|
|
|
|
// Quick-create tasks have no issue / chat / autopilot link; the entire job
|
|
// description (prompt, requester, workspace) lives in context JSONB. The
|
|
// daemon detects this variant via context.type == "quick_create".
|
|
func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCreateTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createQuickCreateTask,
|
|
arg.AgentID,
|
|
arg.RuntimeID,
|
|
arg.Priority,
|
|
arg.Context,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createRetryTask = `-- name: CreateRetryTask :one
|
|
INSERT INTO agent_task_queue (
|
|
agent_id, runtime_id, issue_id, chat_session_id, autopilot_run_id,
|
|
status, priority, trigger_comment_id, trigger_summary, context,
|
|
session_id, work_dir,
|
|
attempt, max_attempts, parent_task_id, force_fresh_session, is_leader_task
|
|
)
|
|
SELECT
|
|
p.agent_id, p.runtime_id, p.issue_id, p.chat_session_id, p.autopilot_run_id,
|
|
'queued', p.priority, p.trigger_comment_id, p.trigger_summary, p.context,
|
|
CASE WHEN p.failure_reason IS NOT DISTINCT FROM 'codex_semantic_inactivity' THEN NULL ELSE p.session_id END,
|
|
CASE WHEN p.failure_reason IS NOT DISTINCT FROM 'codex_semantic_inactivity' THEN NULL ELSE p.work_dir END,
|
|
p.attempt + 1, p.max_attempts, p.id,
|
|
p.failure_reason IS NOT DISTINCT FROM 'codex_semantic_inactivity',
|
|
p.is_leader_task
|
|
FROM agent_task_queue p
|
|
WHERE p.id = $1
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
// Clones a parent task into a fresh queued attempt. Carries forward the
|
|
// agent's resume context (session_id/work_dir) so the child can continue
|
|
// the conversation when the backend supports it. Resume-unsafe failures are
|
|
// retried as fresh sessions so the child does not inherit a stuck agent
|
|
// conversation. Keep the CASE WHEN predicates in sync with
|
|
// resumeUnsafeFailureReason and the resume lookup blacklists. attempt is
|
|
// incremented; max_attempts, trigger_comment_id, and is_leader_task are
|
|
// inherited so the retried task keeps the same squad-role provenance as its
|
|
// parent and the self-trigger guard in shouldEnqueueSquadLeaderOnComment
|
|
// continues to recognise it as a leader task.
|
|
func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createRetryTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const expireStaleQueuedTasks = `-- name: ExpireStaleQueuedTasks :many
|
|
WITH victims AS (
|
|
SELECT id FROM agent_task_queue
|
|
WHERE status = 'queued'
|
|
AND created_at < now() - make_interval(secs => $1::double precision)
|
|
ORDER BY created_at ASC
|
|
LIMIT $2::int
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
UPDATE agent_task_queue t
|
|
SET status = 'failed',
|
|
completed_at = now(),
|
|
error = 'task expired in queue',
|
|
failure_reason = 'queued_expired'
|
|
FROM victims v
|
|
WHERE t.id = v.id
|
|
AND t.status = 'queued'
|
|
AND t.created_at < now() - make_interval(secs => $1::double precision)
|
|
RETURNING t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task
|
|
`
|
|
|
|
type ExpireStaleQueuedTasksParams struct {
|
|
TtlSecs float64 `json:"ttl_secs"`
|
|
MaxPerTick int32 `json:"max_per_tick"`
|
|
}
|
|
|
|
// Fails tasks that have been sitting in 'queued' for longer than the TTL.
|
|
// This is the cleanup arm of the MUL-1899 "queued backlog" fix: even with the
|
|
// new dispatch-time admission gate that refuses to enqueue when the runtime
|
|
// is offline, we still need to drain the historical 87k+ doomed rows and
|
|
// handle edge cases where a runtime goes offline AFTER a task is already
|
|
// queued (the admission check protects new enqueues, not in-flight queue
|
|
// depth).
|
|
//
|
|
// Concurrency safety: the daemon's claim path may race with this sweeper to
|
|
// transition the same row out of 'queued'. We protect against that two
|
|
// ways:
|
|
// 1. The CTE selects victims with FOR UPDATE SKIP LOCKED so a row that is
|
|
// currently being claimed (or otherwise locked) is skipped — no lock
|
|
// contention with the dispatch path, and we won't queue up behind it.
|
|
// 2. The outer UPDATE re-checks status='queued' AND the TTL predicate at
|
|
// apply time. If a daemon claimed the row between selection and update
|
|
// (e.g. lock released after the claim transaction commits), the row is
|
|
// already 'dispatched'/'running' and the WHERE clause filters it out
|
|
// so we cannot clobber an in-flight task.
|
|
//
|
|
// Capped via LIMIT inside the CTE so a single sweep tick cannot monopolise
|
|
// the DB when the backlog is large — the sweeper drains the rest on
|
|
// subsequent ticks.
|
|
func (q *Queries) ExpireStaleQueuedTasks(ctx context.Context, arg ExpireStaleQueuedTasksParams) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, expireStaleQueuedTasks, arg.TtlSecs, arg.MaxPerTick)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const failAgentTask = `-- name: FailAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed',
|
|
completed_at = now(),
|
|
error = $2,
|
|
failure_reason = COALESCE($3, 'agent_error'),
|
|
session_id = COALESCE($4, session_id),
|
|
work_dir = COALESCE($5, work_dir)
|
|
WHERE id = $1 AND status IN ('dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
type FailAgentTaskParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Error pgtype.Text `json:"error"`
|
|
FailureReason pgtype.Text `json:"failure_reason"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
// Marks a task as failed. session_id and work_dir are merged via COALESCE so
|
|
// if the agent already established a real session before failing (e.g. it
|
|
// crashed mid-conversation, was cancelled, or hit a tool error) the resume
|
|
// pointer is preserved on the task row. The next chat task can then fall
|
|
// back to GetLastChatTaskSession and continue the conversation instead of
|
|
// silently starting over.
|
|
//
|
|
// failure_reason is a coarse classifier consumed by the auto-retry path;
|
|
// 'agent_error' is the safe default when the daemon doesn't supply one.
|
|
func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, failAgentTask,
|
|
arg.ID,
|
|
arg.Error,
|
|
arg.FailureReason,
|
|
arg.SessionID,
|
|
arg.WorkDir,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const failStaleTasks = `-- name: FailStaleTasks :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed', completed_at = now(), error = 'task timed out',
|
|
failure_reason = 'timeout'
|
|
WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision))
|
|
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
type FailStaleTasksParams struct {
|
|
DispatchTimeoutSecs float64 `json:"dispatch_timeout_secs"`
|
|
RunningTimeoutSecs float64 `json:"running_timeout_secs"`
|
|
}
|
|
|
|
// Fails tasks stuck in dispatched/running beyond the given thresholds.
|
|
// Handles cases where the daemon is alive but the task is orphaned
|
|
// (e.g. agent process hung, daemon failed to report completion).
|
|
func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, failStaleTasks, arg.DispatchTimeoutSecs, arg.RunningTimeoutSecs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getAgent = `-- name: GetAgent :one
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local FROM agent
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, getAgent, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentInWorkspace = `-- name: GetAgentInWorkspace :one
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local FROM agent
|
|
WHERE id = $1 AND workspace_id = $2
|
|
`
|
|
|
|
type GetAgentInWorkspaceParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspaceParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, getAgentInWorkspace, arg.ID, arg.WorkspaceID)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentTask = `-- name: GetAgentTask :one
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, getAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getLastTaskSession = `-- name: GetLastTaskSession :one
|
|
SELECT session_id, work_dir, runtime_id FROM agent_task_queue
|
|
WHERE agent_id = $1 AND issue_id = $2
|
|
AND (
|
|
status = 'completed'
|
|
OR (
|
|
status = 'failed'
|
|
AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message', 'api_invalid_request', 'codex_semantic_inactivity')
|
|
AND NOT (COALESCE(error, '') ILIKE '%400%' AND COALESCE(error, '') ILIKE '%invalid_request_error%')
|
|
)
|
|
)
|
|
AND session_id IS NOT NULL
|
|
ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC
|
|
LIMIT 1
|
|
`
|
|
|
|
type GetLastTaskSessionParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
}
|
|
|
|
type GetLastTaskSessionRow struct {
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
}
|
|
|
|
// Returns the session_id and work_dir from the most recent task for a given
|
|
// (agent_id, issue_id) pair, used for session resumption on the auto-retry
|
|
// path. We accept both 'completed' and 'failed' tasks: a failed task may
|
|
// have established a real agent session before crashing (orphaned by a
|
|
// daemon restart, runtime offline, or sweeper timeout), and the daemon pins
|
|
// the resume pointer mid-flight via UpdateAgentTaskSession. Without this,
|
|
// an auto-retry of a mid-run failure would silently start a fresh
|
|
// conversation and lose the in-flight context — exactly what MUL-1128's B
|
|
// branch is meant to fix.
|
|
//
|
|
// Manual rerun (TaskService.RerunIssue) does NOT take this path: it sets
|
|
// force_fresh_session=true on the new task, and the daemon claim handler
|
|
// skips this lookup entirely. The user already judged the prior output bad;
|
|
// resuming the same conversation would replay a poisoned state.
|
|
//
|
|
// Tasks that ended in a known "poisoned" terminal state are also excluded
|
|
// here so even auto-retry does not inherit the bad session. The daemon
|
|
// classifies these failures (iteration_limit, agent_fallback_message,
|
|
// api_invalid_request, codex_semantic_inactivity) when it detects either an
|
|
// agent fallback marker in the output, an upstream API 400 that means the
|
|
// conversation history itself is unprocessable (oversized image, malformed
|
|
// base64, etc.), or a Codex semantic inactivity timeout whose recorded
|
|
// session may replay the same stuck state.
|
|
//
|
|
// The error-text ILIKE clause is defense-in-depth for the api_invalid_request
|
|
// shape: a legacy row tagged 'agent_error' (pre-MUL-1921), a deploy-window
|
|
// row that the old code wrote between migration and rollout, or a future
|
|
// error format that escapes the daemon classifier all still get filtered
|
|
// here as long as the canonical Anthropic 400 marker is present in the
|
|
// error text. Migration 079 backfills the failure_reason column itself,
|
|
// so observability stays accurate; this clause guarantees session resume
|
|
// never picks up a bad session even when failure_reason hasn't caught up.
|
|
func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) {
|
|
row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID)
|
|
var i GetLastTaskSessionRow
|
|
err := row.Scan(&i.SessionID, &i.WorkDir, &i.RuntimeID)
|
|
return i, err
|
|
}
|
|
|
|
const getLatestTaskIsLeaderForIssueAndAgent = `-- name: GetLatestTaskIsLeaderForIssueAndAgent :one
|
|
SELECT is_leader_task FROM agent_task_queue
|
|
WHERE issue_id = $1 AND agent_id = $2
|
|
ORDER BY created_at DESC
|
|
LIMIT 1
|
|
`
|
|
|
|
type GetLatestTaskIsLeaderForIssueAndAgentParams struct {
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
}
|
|
|
|
// Returns the is_leader_task flag of the agent's most recent task on this
|
|
// issue, or NULL if the agent has never had a task on this issue. Used by
|
|
// the squad-leader self-trigger guard to tell whether the agent's last
|
|
// activity on the issue was in the leader role or the worker role (an
|
|
// agent that holds both roles in a squad would otherwise be skipped by
|
|
// the role-blind authorID == leaderID check).
|
|
func (q *Queries) GetLatestTaskIsLeaderForIssueAndAgent(ctx context.Context, arg GetLatestTaskIsLeaderForIssueAndAgentParams) (bool, error) {
|
|
row := q.db.QueryRow(ctx, getLatestTaskIsLeaderForIssueAndAgent, arg.IssueID, arg.AgentID)
|
|
var is_leader_task bool
|
|
err := row.Scan(&is_leader_task)
|
|
return is_leader_task, err
|
|
}
|
|
|
|
const getWorkspaceAgentActivity30d = `-- name: GetWorkspaceAgentActivity30d :many
|
|
SELECT
|
|
atq.agent_id,
|
|
DATE_TRUNC('day', atq.completed_at)::timestamptz AS bucket,
|
|
COUNT(*)::int AS task_count,
|
|
COUNT(*) FILTER (WHERE atq.status = 'failed')::int AS failed_count
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.completed_at IS NOT NULL
|
|
AND atq.completed_at > now() - INTERVAL '30 days'
|
|
GROUP BY atq.agent_id, bucket
|
|
ORDER BY atq.agent_id, bucket
|
|
`
|
|
|
|
type GetWorkspaceAgentActivity30dRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
Bucket pgtype.Timestamptz `json:"bucket"`
|
|
TaskCount int32 `json:"task_count"`
|
|
FailedCount int32 `json:"failed_count"`
|
|
}
|
|
|
|
// Returns per-agent daily activity buckets for the last 30 days. Single
|
|
// workspace-wide read backs both surfaces:
|
|
// - Agents list ACTIVITY column — uses only the trailing 7 buckets
|
|
// - Agent detail "Last 30 days" panel — uses the full 30
|
|
//
|
|
// 30 days contains 7 days, so one fetch + a client-side .slice(-7) wins
|
|
// over fetching twice. Days with no completion produce no row; the
|
|
// front-end zero-fills.
|
|
//
|
|
// Anchored on completed_at (not created_at) because the sparkline answers
|
|
// "what did this agent produce?" not "what was queued at it?". A task that's
|
|
// still in flight has no completed_at and contributes nothing here — that's
|
|
// correct: in-flight tasks are surfaced via the live presence indicator,
|
|
// not the historical trend.
|
|
func (q *Queries) GetWorkspaceAgentActivity30d(ctx context.Context, workspaceID pgtype.UUID) ([]GetWorkspaceAgentActivity30dRow, error) {
|
|
rows, err := q.db.Query(ctx, getWorkspaceAgentActivity30d, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []GetWorkspaceAgentActivity30dRow{}
|
|
for rows.Next() {
|
|
var i GetWorkspaceAgentActivity30dRow
|
|
if err := rows.Scan(
|
|
&i.AgentID,
|
|
&i.Bucket,
|
|
&i.TaskCount,
|
|
&i.FailedCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getWorkspaceAgentRunCounts = `-- name: GetWorkspaceAgentRunCounts :many
|
|
SELECT
|
|
atq.agent_id,
|
|
COUNT(*)::int AS run_count
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.created_at > now() - INTERVAL '30 days'
|
|
GROUP BY atq.agent_id
|
|
`
|
|
|
|
type GetWorkspaceAgentRunCountsRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RunCount int32 `json:"run_count"`
|
|
}
|
|
|
|
// Total task runs per agent over the trailing 30 days, used by the Agents
|
|
// list RUNS column. 30-day window keeps the count meaningful (a long-dormant
|
|
// agent shouldn't show "5,420 runs from 2 years ago") and keeps the scan
|
|
// bounded as the workspace ages.
|
|
func (q *Queries) GetWorkspaceAgentRunCounts(ctx context.Context, workspaceID pgtype.UUID) ([]GetWorkspaceAgentRunCountsRow, error) {
|
|
rows, err := q.db.Query(ctx, getWorkspaceAgentRunCounts, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []GetWorkspaceAgentRunCountsRow{}
|
|
for rows.Next() {
|
|
var i GetWorkspaceAgentRunCountsRow
|
|
if err := rows.Scan(&i.AgentID, &i.RunCount); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const hasActiveTaskForIssue = `-- name: HasActiveTaskForIssue :one
|
|
SELECT count(*) > 0 AS has_active FROM agent_task_queue
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
`
|
|
|
|
// Returns true if there is any queued, dispatched, or running task for the issue.
|
|
func (q *Queries) HasActiveTaskForIssue(ctx context.Context, issueID pgtype.UUID) (bool, error) {
|
|
row := q.db.QueryRow(ctx, hasActiveTaskForIssue, issueID)
|
|
var has_active bool
|
|
err := row.Scan(&has_active)
|
|
return has_active, err
|
|
}
|
|
|
|
const hasPendingTaskForIssue = `-- name: HasPendingTaskForIssue :one
|
|
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched')
|
|
`
|
|
|
|
// Returns true if there is a queued or dispatched (but not yet running) task for the issue.
|
|
// Used by the coalescing queue: allow enqueue when a task is running (so
|
|
// the agent picks up new comments on the next cycle) but skip if a pending
|
|
// task already exists (natural dedup).
|
|
func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUID) (bool, error) {
|
|
row := q.db.QueryRow(ctx, hasPendingTaskForIssue, issueID)
|
|
var has_pending bool
|
|
err := row.Scan(&has_pending)
|
|
return has_pending, err
|
|
}
|
|
|
|
const hasPendingTaskForIssueAndAgent = `-- name: HasPendingTaskForIssueAndAgent :one
|
|
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
|
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched')
|
|
`
|
|
|
|
type HasPendingTaskForIssueAndAgentParams struct {
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
}
|
|
|
|
// Returns true if a specific agent already has a queued or dispatched task
|
|
// for the given issue. Used by @mention trigger dedup.
|
|
func (q *Queries) HasPendingTaskForIssueAndAgent(ctx context.Context, arg HasPendingTaskForIssueAndAgentParams) (bool, error) {
|
|
row := q.db.QueryRow(ctx, hasPendingTaskForIssueAndAgent, arg.IssueID, arg.AgentID)
|
|
var has_pending bool
|
|
err := row.Scan(&has_pending)
|
|
return has_pending, err
|
|
}
|
|
|
|
const linkTaskToIssue = `-- name: LinkTaskToIssue :exec
|
|
UPDATE agent_task_queue
|
|
SET issue_id = $2
|
|
WHERE id = $1 AND issue_id IS NULL
|
|
`
|
|
|
|
type LinkTaskToIssueParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
}
|
|
|
|
// Attaches the issue a quick-create task produced back to the task row, once
|
|
// the agent has finished and the issue exists. Guarded by `issue_id IS NULL`
|
|
// so this never overwrites an issue id that was set at task creation (only
|
|
// quick-create tasks land here unset). Fixes the activity row staying on
|
|
// "Creating issue" forever after completion.
|
|
func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams) error {
|
|
_, err := q.db.Exec(ctx, linkTaskToIssue, arg.ID, arg.IssueID)
|
|
return err
|
|
}
|
|
|
|
const listActiveAgentsByRuntime = `-- name: ListActiveAgentsByRuntime :many
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local FROM agent
|
|
WHERE runtime_id = $1 AND archived_at IS NULL
|
|
ORDER BY name ASC
|
|
`
|
|
|
|
// Returns every non-archived agent bound to a runtime. Backs the cascade
|
|
// delete dialog: when DELETE /api/runtimes/:id refuses with
|
|
// runtime_has_active_agents, the response carries this list so the front-end
|
|
// can render exactly the agents that will be archived if the user confirms,
|
|
// and so the cascade endpoint's expected_active_agent_ids check has a stable
|
|
// snapshot to compare against. Ordered by name for a deterministic display.
|
|
func (q *Queries) ListActiveAgentsByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, listActiveAgentsByRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listActiveAgentsByRuntimeForUpdate = `-- name: ListActiveAgentsByRuntimeForUpdate :many
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local FROM agent
|
|
WHERE runtime_id = $1 AND archived_at IS NULL
|
|
ORDER BY name ASC
|
|
FOR UPDATE
|
|
`
|
|
|
|
// FOR UPDATE variant used inside the cascade-delete transaction. Locks
|
|
// each currently-active agent row so a concurrent archive/move of one
|
|
// of those rows blocks until our transaction commits. Pair with
|
|
// LockAgentRuntime, which holds the runtime row exclusively to also
|
|
// block FK-validated INSERTs / runtime_id updates that would otherwise
|
|
// add a new agent to the runtime mid-cascade. Together they guarantee
|
|
// that the set we compared against expected_active_agent_ids is exactly
|
|
// the set ArchiveAgentsByIDs will operate on — no race window.
|
|
func (q *Queries) ListActiveAgentsByRuntimeForUpdate(ctx context.Context, runtimeID pgtype.UUID) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, listActiveAgentsByRuntimeForUpdate, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
// Backs the issue-detail "agent live" banner. Includes 'queued' so the
|
|
// banner shows up the moment a task is enqueued — not only after a runtime
|
|
// claims it. The queued window can be long when the runtime is offline or
|
|
// busy on a prior task, and a silent UI during that window looks like the
|
|
// platform never received the trigger.
|
|
func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listActiveTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgentTasks = `-- name: ListAgentTasks :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
|
|
WHERE agent_id = $1
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listAgentTasks, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgents = `-- name: ListAgents :many
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local FROM agent
|
|
WHERE workspace_id = $1 AND archived_at IS NULL
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, listAgents, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAllAgents = `-- name: ListAllAgents :many
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local FROM agent
|
|
WHERE workspace_id = $1
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, listAllAgents, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
|
|
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
|
ORDER BY priority DESC, created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listPendingTasksByRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
|
|
WHERE runtime_id = $1 AND status = 'queued'
|
|
ORDER BY priority DESC, created_at ASC
|
|
`
|
|
|
|
// Returns rows the runtime can attempt to claim. Status is restricted to
|
|
// 'queued' (in contrast to ListPendingTasksByRuntime which also includes
|
|
// 'dispatched') because dispatched rows are by definition already owned
|
|
// and cannot be re-claimed — including them in the candidate list pads
|
|
// the result with rows that always lose the per-(issue, agent) race in
|
|
// ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
|
|
// runtime is busy on a long-running task. Backed by the partial index
|
|
// idx_agent_task_queue_claim_candidates so the warm path is cheap.
|
|
func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listQueuedClaimCandidatesByRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listTasksByIssue = `-- name: ListTasksByIssue :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task FROM agent_task_queue
|
|
WHERE issue_id = $1
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
|
|
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.status IN ('queued', 'dispatched', 'running')
|
|
|
|
UNION ALL
|
|
|
|
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session, t.is_leader_task FROM (
|
|
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session, atq.is_leader_task
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.status IN ('completed', 'failed')
|
|
ORDER BY atq.agent_id, atq.completed_at DESC NULLS LAST
|
|
) t
|
|
`
|
|
|
|
// Returns the tasks needed to derive each agent's current presence:
|
|
// - All active tasks (queued / dispatched / running) — for working signal + counts
|
|
// - Each agent's most recent OUTCOME task (completed / failed) — for sticky
|
|
// failed signal
|
|
//
|
|
// The front-end picks "active wins, else latest outcome" — see derive-presence.ts.
|
|
//
|
|
// Cancelled tasks are excluded from the outcome half on purpose: cancel is a
|
|
// procedural signal ("attempt aborted"), not an outcome. It tells us nothing
|
|
// about whether the agent works, so it must NOT be allowed to mask a prior
|
|
// failure. Concretely: if an agent fails and then the user cancels the queued
|
|
// retry (or the parent issue closes and cascades cancels), the failed signal
|
|
// has to stay red. Only a real success (completed) or a fresh attempt (active)
|
|
// clears it.
|
|
//
|
|
// No UI windows in SQL: stickiness is decided by "is the latest outcome a
|
|
// failure?", not a 2-minute clock. JOINs agent because agent_task_queue has
|
|
// no workspace_id column.
|
|
func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listWorkspaceAgentTaskSnapshot, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const reclaimStaleDispatchedTaskForRuntime = `-- name: ReclaimStaleDispatchedTaskForRuntime :one
|
|
UPDATE agent_task_queue
|
|
SET dispatched_at = now()
|
|
WHERE id = (
|
|
SELECT atq.id FROM agent_task_queue atq
|
|
WHERE atq.runtime_id = $1
|
|
AND atq.status = 'dispatched'
|
|
AND atq.started_at IS NULL
|
|
AND atq.dispatched_at < now() - make_interval(secs => $2::double precision)
|
|
ORDER BY atq.priority DESC, atq.dispatched_at ASC
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
type ReclaimStaleDispatchedTaskForRuntimeParams struct {
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
ClaimRecoverySecs float64 `json:"claim_recovery_secs"`
|
|
}
|
|
|
|
// Re-delivers a task whose previous claim likely succeeded server-side but
|
|
// whose response never reached the daemon. The task is still in `dispatched`
|
|
// with no `started_at`, so the daemon has not acknowledged it via StartTask.
|
|
// Refresh dispatched_at so the server-side dispatch timeout measures from the
|
|
// recovered delivery attempt.
|
|
func (q *Queries) ReclaimStaleDispatchedTaskForRuntime(ctx context.Context, arg ReclaimStaleDispatchedTaskForRuntimeParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, reclaimStaleDispatchedTaskForRuntime, arg.RuntimeID, arg.ClaimRecoverySecs)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const recoverOrphanedTasksForRuntime = `-- name: RecoverOrphanedTasksForRuntime :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed',
|
|
completed_at = now(),
|
|
error = 'daemon restarted while task was in flight',
|
|
failure_reason = 'runtime_recovery'
|
|
WHERE runtime_id = $1 AND status IN ('dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
// Called by the daemon at startup. Atomically fails any dispatched/running
|
|
// task that the prior incarnation of this runtime owned but did not
|
|
// finalize. Returns the failed rows so callers can hand them to the
|
|
// auto-retry path.
|
|
func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, recoverOrphanedTasksForRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const refreshAgentStatusFromTasks = `-- name: RefreshAgentStatusFromTasks :one
|
|
UPDATE agent AS a
|
|
SET status = CASE WHEN EXISTS (
|
|
SELECT 1 FROM agent_task_queue q
|
|
WHERE q.agent_id = a.id AND q.status IN ('dispatched', 'running')
|
|
) THEN 'working' ELSE 'idle' END,
|
|
updated_at = now()
|
|
WHERE a.id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
func (q *Queries) RefreshAgentStatusFromTasks(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, refreshAgentStatusFromTasks, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const restoreAgent = `-- name: RestoreAgent :one
|
|
UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
func (q *Queries) RestoreAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, restoreAgent, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const startAgentTask = `-- name: StartAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'running', started_at = now()
|
|
WHERE id = $1 AND status = 'dispatched'
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session, is_leader_task
|
|
`
|
|
|
|
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, startAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
&i.IsLeaderTask,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgent = `-- name: UpdateAgent :one
|
|
UPDATE agent SET
|
|
name = COALESCE($2, name),
|
|
description = COALESCE($3, description),
|
|
avatar_url = COALESCE($4, avatar_url),
|
|
runtime_config = COALESCE($5, runtime_config),
|
|
runtime_mode = COALESCE($6, runtime_mode),
|
|
runtime_id = COALESCE($7, runtime_id),
|
|
visibility = COALESCE($8, visibility),
|
|
status = COALESCE($9, status),
|
|
max_concurrent_tasks = COALESCE($10, max_concurrent_tasks),
|
|
instructions = COALESCE($11, instructions),
|
|
custom_env = COALESCE($12, custom_env),
|
|
custom_args = COALESCE($13, custom_args),
|
|
mcp_config = COALESCE($14, mcp_config),
|
|
model = COALESCE($15, model),
|
|
thinking_level = COALESCE($16, thinking_level),
|
|
skills_local = COALESCE($17, skills_local),
|
|
updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
type UpdateAgentParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Name pgtype.Text `json:"name"`
|
|
Description pgtype.Text `json:"description"`
|
|
AvatarUrl pgtype.Text `json:"avatar_url"`
|
|
RuntimeConfig []byte `json:"runtime_config"`
|
|
RuntimeMode pgtype.Text `json:"runtime_mode"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Visibility pgtype.Text `json:"visibility"`
|
|
Status pgtype.Text `json:"status"`
|
|
MaxConcurrentTasks pgtype.Int4 `json:"max_concurrent_tasks"`
|
|
Instructions pgtype.Text `json:"instructions"`
|
|
CustomEnv []byte `json:"custom_env"`
|
|
CustomArgs []byte `json:"custom_args"`
|
|
McpConfig []byte `json:"mcp_config"`
|
|
Model pgtype.Text `json:"model"`
|
|
ThinkingLevel pgtype.Text `json:"thinking_level"`
|
|
SkillsLocal pgtype.Text `json:"skills_local"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, updateAgent,
|
|
arg.ID,
|
|
arg.Name,
|
|
arg.Description,
|
|
arg.AvatarUrl,
|
|
arg.RuntimeConfig,
|
|
arg.RuntimeMode,
|
|
arg.RuntimeID,
|
|
arg.Visibility,
|
|
arg.Status,
|
|
arg.MaxConcurrentTasks,
|
|
arg.Instructions,
|
|
arg.CustomEnv,
|
|
arg.CustomArgs,
|
|
arg.McpConfig,
|
|
arg.Model,
|
|
arg.ThinkingLevel,
|
|
arg.SkillsLocal,
|
|
)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentCustomEnv = `-- name: UpdateAgentCustomEnv :one
|
|
UPDATE agent
|
|
SET custom_env = $2, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
type UpdateAgentCustomEnvParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
CustomEnv []byte `json:"custom_env"`
|
|
}
|
|
|
|
// Replaces an agent's custom_env map wholesale. Used by the dedicated
|
|
// env-management endpoint (POST/PUT /api/agents/{id}/env), which is the
|
|
// only post-creation write path for env. UpdateAgent has been stripped
|
|
// of custom_env handling so all env mutations flow through here and the
|
|
// handler's audit-log + **** sentinel guard.
|
|
func (q *Queries) UpdateAgentCustomEnv(ctx context.Context, arg UpdateAgentCustomEnvParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, updateAgentCustomEnv, arg.ID, arg.CustomEnv)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentStatus = `-- name: UpdateAgentStatus :one
|
|
UPDATE agent SET status = $2, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model, thinking_level, skills_local
|
|
`
|
|
|
|
type UpdateAgentStatusParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgentStatus(ctx context.Context, arg UpdateAgentStatusParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, updateAgentStatus, arg.ID, arg.Status)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
&i.ThinkingLevel,
|
|
&i.SkillsLocal,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentTaskSession = `-- name: UpdateAgentTaskSession :exec
|
|
UPDATE agent_task_queue
|
|
SET session_id = COALESCE($2, session_id),
|
|
work_dir = COALESCE($3, work_dir)
|
|
WHERE id = $1 AND status IN ('dispatched', 'running')
|
|
`
|
|
|
|
type UpdateAgentTaskSessionParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
// Pins the resume pointer mid-flight so a daemon crash leaves a usable
|
|
// session_id/work_dir on the task row. No-op if the task is no longer
|
|
// in dispatched/running.
|
|
func (q *Queries) UpdateAgentTaskSession(ctx context.Context, arg UpdateAgentTaskSessionParams) error {
|
|
_, err := q.db.Exec(ctx, updateAgentTaskSession, arg.ID, arg.SessionID, arg.WorkDir)
|
|
return err
|
|
}
|