mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
Each @mention or assignee comment now creates its own queued task instead of folding into a single pending task per (issue, agent). The previous HasPendingTaskForIssueAndAgent short-circuit in enqueueMentionedAgentTasks and shouldEnqueueOnComment is gone; ClaimAgentTask already enforces per-(issue, agent) serial execution at claim time, so multiple queued rows drain one-by-one without overlap. Why: see docs/rfcs/0001-mention-dedup-policy.md. Coalescing dropped the trigger comment for every mention after the first, gave no UI feedback, and collapsed distinct intents (different threads, different requests). Drops the unused HasPendingTaskForIssue and HasPendingTaskForIssueAndAgent queries and regenerates sqlc. Adds TestRepeatedMentionsEnqueueSeparateTasks pinning the new contract: two back-to-back @mentions queue two tasks with distinct trigger_comment_ids. Co-authored-by: multica-agent <github@multica.ai>
1977 lines
60 KiB
Go
1977 lines
60 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.30.0
|
|
// source: agent.sql
|
|
|
|
package db
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const archiveAgent = `-- name: ArchiveAgent :one
|
|
UPDATE agent SET archived_at = now(), archived_by = $2, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
type ArchiveAgentParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
ArchivedBy pgtype.UUID `json:"archived_by"`
|
|
}
|
|
|
|
func (q *Queries) ArchiveAgent(ctx context.Context, arg ArchiveAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, archiveAgent, arg.ID, arg.ArchivedBy)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const cancelAgentTask = `-- name: CancelAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, cancelAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
|
|
// Returns the affected rows so callers can broadcast task:cancelled events.
|
|
// Mirrors the shape of CancelAgentTasksByIssue / CancelAgentTasksByIssueAndAgent
|
|
// (also :many + RETURNING + completed_at) so the three sibling cancel paths
|
|
// behave consistently.
|
|
func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByAgent, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByChatSession = `-- name: CancelAgentTasksByChatSession :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE chat_session_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
// Cancels active tasks belonging to a chat session. Called from
|
|
// DeleteChatSession so the daemon doesn't keep running work whose result
|
|
// has nowhere to land. Must run BEFORE the chat_session row is deleted —
|
|
// the FK ON DELETE SET NULL would otherwise nullify chat_session_id and we
|
|
// could no longer reach those tasks.
|
|
func (q *Queries) CancelAgentTasksByChatSession(ctx context.Context, chatSessionID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByChatSession, chatSessionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
// Cancels every active task on the issue and returns the affected rows so the
|
|
// caller can reconcile each agent's status and broadcast task:cancelled events
|
|
// (#1587). Prior :exec form silently dropped that info, so internal cancel
|
|
// paths (issue status flips to cancelled/done, etc.) left agents stuck at
|
|
// status="working" with no self-correction.
|
|
func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgent :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
type CancelAgentTasksByIssueAndAgentParams struct {
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
}
|
|
|
|
// Cancels active tasks for a single (issue, agent) pair without touching
|
|
// tasks belonging to other agents on the same issue. Used by the manual
|
|
// rerun flow so re-running the assignee doesn't collateral-cancel a
|
|
// still-running @-mention agent on the same issue.
|
|
func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg CancelAgentTasksByIssueAndAgentParams) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByIssueAndAgent, arg.IssueID, arg.AgentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComment :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
// Cancels active tasks whose trigger is the given comment. Called when a
|
|
// comment is deleted so the agent does not run with the now-deleted content
|
|
// already embedded in its prompt. Must run BEFORE the comment row is deleted
|
|
// because the FK ON DELETE SET NULL would otherwise nullify trigger_comment_id
|
|
// and we'd lose the ability to find the affected tasks.
|
|
func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerCommentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByTriggerComment, triggerCommentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const claimAgentTask = `-- name: ClaimAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'dispatched', dispatched_at = now()
|
|
WHERE id = (
|
|
SELECT atq.id FROM agent_task_queue atq
|
|
WHERE atq.agent_id = $1 AND atq.status = 'queued'
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM agent_task_queue active
|
|
WHERE active.agent_id = atq.agent_id
|
|
AND active.status IN ('dispatched', 'running')
|
|
AND (
|
|
(atq.issue_id IS NOT NULL AND active.issue_id = atq.issue_id)
|
|
OR (atq.chat_session_id IS NOT NULL AND active.chat_session_id = atq.chat_session_id)
|
|
OR (
|
|
atq.issue_id IS NULL
|
|
AND atq.chat_session_id IS NULL
|
|
AND atq.autopilot_run_id IS NULL
|
|
AND active.issue_id IS NULL
|
|
AND active.chat_session_id IS NULL
|
|
AND active.autopilot_run_id IS NULL
|
|
)
|
|
)
|
|
)
|
|
ORDER BY atq.priority DESC, atq.created_at ASC
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
|
|
// a task is only claimable when no other task for the same issue AND same agent is
|
|
// already dispatched or running. This allows different agents to work on the same
|
|
// issue in parallel while preventing a single agent from running duplicate tasks.
|
|
// Chat tasks (issue_id IS NULL) use chat_session_id for serialization instead.
|
|
// Quick-create tasks have no issue / chat / autopilot link, so they serialize on
|
|
// "any other quick-create-shaped task" (all four FKs NULL) for the same agent —
|
|
// otherwise a user mashing the create button could fire concurrent quick-creates
|
|
// whose completion lookup would race over "most recent issue by this agent".
|
|
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, claimAgentTask, agentID)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const clearAgentMcpConfig = `-- name: ClearAgentMcpConfig :one
|
|
UPDATE agent SET mcp_config = NULL, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
func (q *Queries) ClearAgentMcpConfig(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, clearAgentMcpConfig, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const completeAgentTask = `-- name: CompleteAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
|
|
WHERE id = $1 AND status = 'running'
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
type CompleteAgentTaskParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Result []byte `json:"result"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, completeAgentTask,
|
|
arg.ID,
|
|
arg.Result,
|
|
arg.SessionID,
|
|
arg.WorkDir,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const countRunningTasks = `-- name: CountRunningTasks :one
|
|
SELECT count(*) FROM agent_task_queue
|
|
WHERE agent_id = $1 AND status IN ('dispatched', 'running')
|
|
`
|
|
|
|
func (q *Queries) CountRunningTasks(ctx context.Context, agentID pgtype.UUID) (int64, error) {
|
|
row := q.db.QueryRow(ctx, countRunningTasks, agentID)
|
|
var count int64
|
|
err := row.Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
const createAgent = `-- name: CreateAgent :one
|
|
INSERT INTO agent (
|
|
workspace_id, name, description, avatar_url, runtime_mode,
|
|
runtime_config, runtime_id, visibility, max_concurrent_tasks, owner_id,
|
|
instructions, custom_env, custom_args, mcp_config, model
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
type CreateAgentParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
AvatarUrl pgtype.Text `json:"avatar_url"`
|
|
RuntimeMode string `json:"runtime_mode"`
|
|
RuntimeConfig []byte `json:"runtime_config"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Visibility string `json:"visibility"`
|
|
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
|
|
OwnerID pgtype.UUID `json:"owner_id"`
|
|
Instructions string `json:"instructions"`
|
|
CustomEnv []byte `json:"custom_env"`
|
|
CustomArgs []byte `json:"custom_args"`
|
|
McpConfig []byte `json:"mcp_config"`
|
|
Model pgtype.Text `json:"model"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, createAgent,
|
|
arg.WorkspaceID,
|
|
arg.Name,
|
|
arg.Description,
|
|
arg.AvatarUrl,
|
|
arg.RuntimeMode,
|
|
arg.RuntimeConfig,
|
|
arg.RuntimeID,
|
|
arg.Visibility,
|
|
arg.MaxConcurrentTasks,
|
|
arg.OwnerID,
|
|
arg.Instructions,
|
|
arg.CustomEnv,
|
|
arg.CustomArgs,
|
|
arg.McpConfig,
|
|
arg.Model,
|
|
)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createAgentTask = `-- name: CreateAgentTask :one
|
|
INSERT INTO agent_task_queue (
|
|
agent_id, runtime_id, issue_id, status, priority, trigger_comment_id,
|
|
trigger_summary, force_fresh_session
|
|
)
|
|
VALUES (
|
|
$1, $2, $3, 'queued', $4, $5,
|
|
$6,
|
|
COALESCE($7::boolean, FALSE)
|
|
)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
type CreateAgentTaskParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
Priority int32 `json:"priority"`
|
|
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
|
|
TriggerSummary pgtype.Text `json:"trigger_summary"`
|
|
ForceFreshSession pgtype.Bool `json:"force_fresh_session"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createAgentTask,
|
|
arg.AgentID,
|
|
arg.RuntimeID,
|
|
arg.IssueID,
|
|
arg.Priority,
|
|
arg.TriggerCommentID,
|
|
arg.TriggerSummary,
|
|
arg.ForceFreshSession,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
|
|
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
|
|
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
type CreateQuickCreateTaskParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Priority int32 `json:"priority"`
|
|
Context []byte `json:"context"`
|
|
}
|
|
|
|
// Quick-create tasks have no issue / chat / autopilot link; the entire job
|
|
// description (prompt, requester, workspace) lives in context JSONB. The
|
|
// daemon detects this variant via context.type == "quick_create".
|
|
func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCreateTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createQuickCreateTask,
|
|
arg.AgentID,
|
|
arg.RuntimeID,
|
|
arg.Priority,
|
|
arg.Context,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createRetryTask = `-- name: CreateRetryTask :one
|
|
INSERT INTO agent_task_queue (
|
|
agent_id, runtime_id, issue_id, chat_session_id, autopilot_run_id,
|
|
status, priority, trigger_comment_id, trigger_summary, context,
|
|
session_id, work_dir,
|
|
attempt, max_attempts, parent_task_id
|
|
)
|
|
SELECT
|
|
p.agent_id, p.runtime_id, p.issue_id, p.chat_session_id, p.autopilot_run_id,
|
|
'queued', p.priority, p.trigger_comment_id, p.trigger_summary, p.context,
|
|
p.session_id, p.work_dir,
|
|
p.attempt + 1, p.max_attempts, p.id
|
|
FROM agent_task_queue p
|
|
WHERE p.id = $1
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
// Clones a parent task into a fresh queued attempt. Carries forward the
|
|
// agent's resume context (session_id/work_dir) so the child can continue
|
|
// the conversation when the backend supports it. attempt is incremented;
|
|
// max_attempts and trigger_comment_id are inherited.
|
|
func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createRetryTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const failAgentTask = `-- name: FailAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed',
|
|
completed_at = now(),
|
|
error = $2,
|
|
failure_reason = COALESCE($3, 'agent_error'),
|
|
session_id = COALESCE($4, session_id),
|
|
work_dir = COALESCE($5, work_dir)
|
|
WHERE id = $1 AND status IN ('dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
type FailAgentTaskParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Error pgtype.Text `json:"error"`
|
|
FailureReason pgtype.Text `json:"failure_reason"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
// Marks a task as failed. session_id and work_dir are merged via COALESCE so
|
|
// if the agent already established a real session before failing (e.g. it
|
|
// crashed mid-conversation, was cancelled, or hit a tool error) the resume
|
|
// pointer is preserved on the task row. The next chat task can then fall
|
|
// back to GetLastChatTaskSession and continue the conversation instead of
|
|
// silently starting over.
|
|
//
|
|
// failure_reason is a coarse classifier consumed by the auto-retry path;
|
|
// 'agent_error' is the safe default when the daemon doesn't supply one.
|
|
func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, failAgentTask,
|
|
arg.ID,
|
|
arg.Error,
|
|
arg.FailureReason,
|
|
arg.SessionID,
|
|
arg.WorkDir,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const failStaleTasks = `-- name: FailStaleTasks :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed', completed_at = now(), error = 'task timed out',
|
|
failure_reason = 'timeout'
|
|
WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision))
|
|
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
type FailStaleTasksParams struct {
|
|
DispatchTimeoutSecs float64 `json:"dispatch_timeout_secs"`
|
|
RunningTimeoutSecs float64 `json:"running_timeout_secs"`
|
|
}
|
|
|
|
// Fails tasks stuck in dispatched/running beyond the given thresholds.
|
|
// Handles cases where the daemon is alive but the task is orphaned
|
|
// (e.g. agent process hung, daemon failed to report completion).
|
|
func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, failStaleTasks, arg.DispatchTimeoutSecs, arg.RunningTimeoutSecs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getAgent = `-- name: GetAgent :one
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model FROM agent
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, getAgent, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentInWorkspace = `-- name: GetAgentInWorkspace :one
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model FROM agent
|
|
WHERE id = $1 AND workspace_id = $2
|
|
`
|
|
|
|
type GetAgentInWorkspaceParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspaceParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, getAgentInWorkspace, arg.ID, arg.WorkspaceID)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentTask = `-- name: GetAgentTask :one
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session FROM agent_task_queue
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, getAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getLastTaskSession = `-- name: GetLastTaskSession :one
|
|
SELECT session_id, work_dir, runtime_id FROM agent_task_queue
|
|
WHERE agent_id = $1 AND issue_id = $2
|
|
AND (
|
|
status = 'completed'
|
|
OR (status = 'failed' AND COALESCE(failure_reason, '') NOT IN ('iteration_limit', 'agent_fallback_message'))
|
|
)
|
|
AND session_id IS NOT NULL
|
|
ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC
|
|
LIMIT 1
|
|
`
|
|
|
|
type GetLastTaskSessionParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
}
|
|
|
|
type GetLastTaskSessionRow struct {
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
}
|
|
|
|
// Returns the session_id and work_dir from the most recent task for a given
|
|
// (agent_id, issue_id) pair, used for session resumption on the auto-retry
|
|
// path. We accept both 'completed' and 'failed' tasks: a failed task may
|
|
// have established a real agent session before crashing (orphaned by a
|
|
// daemon restart, runtime offline, or sweeper timeout), and the daemon pins
|
|
// the resume pointer mid-flight via UpdateAgentTaskSession. Without this,
|
|
// an auto-retry of a mid-run failure would silently start a fresh
|
|
// conversation and lose the in-flight context — exactly what MUL-1128's B
|
|
// branch is meant to fix.
|
|
//
|
|
// Manual rerun (TaskService.RerunIssue) does NOT take this path: it sets
|
|
// force_fresh_session=true on the new task, and the daemon claim handler
|
|
// skips this lookup entirely. The user already judged the prior output bad;
|
|
// resuming the same conversation would replay a poisoned state.
|
|
//
|
|
// Tasks that ended in a known "poisoned" terminal state are also excluded
|
|
// here so even auto-retry does not inherit the bad session. The daemon
|
|
// classifies these failures (iteration_limit, agent_fallback_message) when
|
|
// it detects the agent emitted a fallback marker instead of a real result.
|
|
func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) {
|
|
row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID)
|
|
var i GetLastTaskSessionRow
|
|
err := row.Scan(&i.SessionID, &i.WorkDir, &i.RuntimeID)
|
|
return i, err
|
|
}
|
|
|
|
const getWorkspaceAgentActivity30d = `-- name: GetWorkspaceAgentActivity30d :many
|
|
SELECT
|
|
atq.agent_id,
|
|
DATE_TRUNC('day', atq.completed_at)::timestamptz AS bucket,
|
|
COUNT(*)::int AS task_count,
|
|
COUNT(*) FILTER (WHERE atq.status = 'failed')::int AS failed_count
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.completed_at IS NOT NULL
|
|
AND atq.completed_at > now() - INTERVAL '30 days'
|
|
GROUP BY atq.agent_id, bucket
|
|
ORDER BY atq.agent_id, bucket
|
|
`
|
|
|
|
type GetWorkspaceAgentActivity30dRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
Bucket pgtype.Timestamptz `json:"bucket"`
|
|
TaskCount int32 `json:"task_count"`
|
|
FailedCount int32 `json:"failed_count"`
|
|
}
|
|
|
|
// Returns per-agent daily activity buckets for the last 30 days. Single
|
|
// workspace-wide read backs both surfaces:
|
|
// - Agents list ACTIVITY column — uses only the trailing 7 buckets
|
|
// - Agent detail "Last 30 days" panel — uses the full 30
|
|
//
|
|
// 30 days contains 7 days, so one fetch + a client-side .slice(-7) wins
|
|
// over fetching twice. Days with no completion produce no row; the
|
|
// front-end zero-fills.
|
|
//
|
|
// Anchored on completed_at (not created_at) because the sparkline answers
|
|
// "what did this agent produce?" not "what was queued at it?". A task that's
|
|
// still in flight has no completed_at and contributes nothing here — that's
|
|
// correct: in-flight tasks are surfaced via the live presence indicator,
|
|
// not the historical trend.
|
|
func (q *Queries) GetWorkspaceAgentActivity30d(ctx context.Context, workspaceID pgtype.UUID) ([]GetWorkspaceAgentActivity30dRow, error) {
|
|
rows, err := q.db.Query(ctx, getWorkspaceAgentActivity30d, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []GetWorkspaceAgentActivity30dRow{}
|
|
for rows.Next() {
|
|
var i GetWorkspaceAgentActivity30dRow
|
|
if err := rows.Scan(
|
|
&i.AgentID,
|
|
&i.Bucket,
|
|
&i.TaskCount,
|
|
&i.FailedCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getWorkspaceAgentRunCounts = `-- name: GetWorkspaceAgentRunCounts :many
|
|
SELECT
|
|
atq.agent_id,
|
|
COUNT(*)::int AS run_count
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.created_at > now() - INTERVAL '30 days'
|
|
GROUP BY atq.agent_id
|
|
`
|
|
|
|
type GetWorkspaceAgentRunCountsRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RunCount int32 `json:"run_count"`
|
|
}
|
|
|
|
// Total task runs per agent over the trailing 30 days, used by the Agents
|
|
// list RUNS column. 30-day window keeps the count meaningful (a long-dormant
|
|
// agent shouldn't show "5,420 runs from 2 years ago") and keeps the scan
|
|
// bounded as the workspace ages.
|
|
func (q *Queries) GetWorkspaceAgentRunCounts(ctx context.Context, workspaceID pgtype.UUID) ([]GetWorkspaceAgentRunCountsRow, error) {
|
|
rows, err := q.db.Query(ctx, getWorkspaceAgentRunCounts, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []GetWorkspaceAgentRunCountsRow{}
|
|
for rows.Next() {
|
|
var i GetWorkspaceAgentRunCountsRow
|
|
if err := rows.Scan(&i.AgentID, &i.RunCount); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const hasActiveTaskForIssue = `-- name: HasActiveTaskForIssue :one
|
|
SELECT count(*) > 0 AS has_active FROM agent_task_queue
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
`
|
|
|
|
// Returns true if there is any queued, dispatched, or running task for the issue.
|
|
func (q *Queries) HasActiveTaskForIssue(ctx context.Context, issueID pgtype.UUID) (bool, error) {
|
|
row := q.db.QueryRow(ctx, hasActiveTaskForIssue, issueID)
|
|
var has_active bool
|
|
err := row.Scan(&has_active)
|
|
return has_active, err
|
|
}
|
|
|
|
const linkTaskToIssue = `-- name: LinkTaskToIssue :exec
|
|
UPDATE agent_task_queue
|
|
SET issue_id = $2
|
|
WHERE id = $1 AND issue_id IS NULL
|
|
`
|
|
|
|
type LinkTaskToIssueParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
}
|
|
|
|
// Attaches the issue a quick-create task produced back to the task row, once
|
|
// the agent has finished and the issue exists. Guarded by `issue_id IS NULL`
|
|
// so this never overwrites an issue id that was set at task creation (only
|
|
// quick-create tasks land here unset). Fixes the activity row staying on
|
|
// "Creating issue" forever after completion.
|
|
func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams) error {
|
|
_, err := q.db.Exec(ctx, linkTaskToIssue, arg.ID, arg.IssueID)
|
|
return err
|
|
}
|
|
|
|
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session FROM agent_task_queue
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
// Backs the issue-detail "agent live" banner. Includes 'queued' so the
|
|
// banner shows up the moment a task is enqueued — not only after a runtime
|
|
// claims it. The queued window can be long when the runtime is offline or
|
|
// busy on a prior task, and a silent UI during that window looks like the
|
|
// platform never received the trigger.
|
|
func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listActiveTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgentTasks = `-- name: ListAgentTasks :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session FROM agent_task_queue
|
|
WHERE agent_id = $1
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listAgentTasks, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgents = `-- name: ListAgents :many
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model FROM agent
|
|
WHERE workspace_id = $1 AND archived_at IS NULL
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, listAgents, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAllAgents = `-- name: ListAllAgents :many
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model FROM agent
|
|
WHERE workspace_id = $1
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, listAllAgents, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session FROM agent_task_queue
|
|
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
|
ORDER BY priority DESC, created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listPendingTasksByRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listQueuedClaimCandidatesByRuntime = `-- name: ListQueuedClaimCandidatesByRuntime :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session FROM agent_task_queue
|
|
WHERE runtime_id = $1 AND status = 'queued'
|
|
ORDER BY priority DESC, created_at ASC
|
|
`
|
|
|
|
// Returns rows the runtime can attempt to claim. Status is restricted to
|
|
// 'queued' (in contrast to ListPendingTasksByRuntime which also includes
|
|
// 'dispatched') because dispatched rows are by definition already owned
|
|
// and cannot be re-claimed — including them in the candidate list pads
|
|
// the result with rows that always lose the per-(issue, agent) race in
|
|
// ClaimAgentTask, wasting CPU and a SELECT every poll cycle when the
|
|
// runtime is busy on a long-running task. Backed by the partial index
|
|
// idx_agent_task_queue_claim_candidates so the warm path is cheap.
|
|
func (q *Queries) ListQueuedClaimCandidatesByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listQueuedClaimCandidatesByRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listTasksByIssue = `-- name: ListTasksByIssue :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session FROM agent_task_queue
|
|
WHERE issue_id = $1
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
|
|
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.status IN ('queued', 'dispatched', 'running')
|
|
|
|
UNION ALL
|
|
|
|
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.trigger_summary, t.force_fresh_session FROM (
|
|
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.trigger_summary, atq.force_fresh_session
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.status IN ('completed', 'failed')
|
|
ORDER BY atq.agent_id, atq.completed_at DESC NULLS LAST
|
|
) t
|
|
`
|
|
|
|
// Returns the tasks needed to derive each agent's current presence:
|
|
// - All active tasks (queued / dispatched / running) — for working signal + counts
|
|
// - Each agent's most recent OUTCOME task (completed / failed) — for sticky
|
|
// failed signal
|
|
//
|
|
// The front-end picks "active wins, else latest outcome" — see derive-presence.ts.
|
|
//
|
|
// Cancelled tasks are excluded from the outcome half on purpose: cancel is a
|
|
// procedural signal ("attempt aborted"), not an outcome. It tells us nothing
|
|
// about whether the agent works, so it must NOT be allowed to mask a prior
|
|
// failure. Concretely: if an agent fails and then the user cancels the queued
|
|
// retry (or the parent issue closes and cascades cancels), the failed signal
|
|
// has to stay red. Only a real success (completed) or a fresh attempt (active)
|
|
// clears it.
|
|
//
|
|
// No UI windows in SQL: stickiness is decided by "is the latest outcome a
|
|
// failure?", not a 2-minute clock. JOINs agent because agent_task_queue has
|
|
// no workspace_id column.
|
|
func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listWorkspaceAgentTaskSnapshot, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const recoverOrphanedTasksForRuntime = `-- name: RecoverOrphanedTasksForRuntime :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed',
|
|
completed_at = now(),
|
|
error = 'daemon restarted while task was in flight',
|
|
failure_reason = 'runtime_recovery'
|
|
WHERE runtime_id = $1 AND status IN ('dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
// Called by the daemon at startup. Atomically fails any dispatched/running
|
|
// task that the prior incarnation of this runtime owned but did not
|
|
// finalize. Returns the failed rows so callers can hand them to the
|
|
// auto-retry path.
|
|
func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, recoverOrphanedTasksForRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const refreshAgentStatusFromTasks = `-- name: RefreshAgentStatusFromTasks :one
|
|
UPDATE agent AS a
|
|
SET status = CASE WHEN EXISTS (
|
|
SELECT 1 FROM agent_task_queue q
|
|
WHERE q.agent_id = a.id AND q.status IN ('dispatched', 'running')
|
|
) THEN 'working' ELSE 'idle' END,
|
|
updated_at = now()
|
|
WHERE a.id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
func (q *Queries) RefreshAgentStatusFromTasks(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, refreshAgentStatusFromTasks, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const restoreAgent = `-- name: RestoreAgent :one
|
|
UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
func (q *Queries) RestoreAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, restoreAgent, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const startAgentTask = `-- name: StartAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'running', started_at = now()
|
|
WHERE id = $1 AND status = 'dispatched'
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, trigger_summary, force_fresh_session
|
|
`
|
|
|
|
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, startAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.TriggerSummary,
|
|
&i.ForceFreshSession,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgent = `-- name: UpdateAgent :one
|
|
UPDATE agent SET
|
|
name = COALESCE($2, name),
|
|
description = COALESCE($3, description),
|
|
avatar_url = COALESCE($4, avatar_url),
|
|
runtime_config = COALESCE($5, runtime_config),
|
|
runtime_mode = COALESCE($6, runtime_mode),
|
|
runtime_id = COALESCE($7, runtime_id),
|
|
visibility = COALESCE($8, visibility),
|
|
status = COALESCE($9, status),
|
|
max_concurrent_tasks = COALESCE($10, max_concurrent_tasks),
|
|
instructions = COALESCE($11, instructions),
|
|
custom_env = COALESCE($12, custom_env),
|
|
custom_args = COALESCE($13, custom_args),
|
|
mcp_config = COALESCE($14, mcp_config),
|
|
model = COALESCE($15, model),
|
|
updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
type UpdateAgentParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Name pgtype.Text `json:"name"`
|
|
Description pgtype.Text `json:"description"`
|
|
AvatarUrl pgtype.Text `json:"avatar_url"`
|
|
RuntimeConfig []byte `json:"runtime_config"`
|
|
RuntimeMode pgtype.Text `json:"runtime_mode"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Visibility pgtype.Text `json:"visibility"`
|
|
Status pgtype.Text `json:"status"`
|
|
MaxConcurrentTasks pgtype.Int4 `json:"max_concurrent_tasks"`
|
|
Instructions pgtype.Text `json:"instructions"`
|
|
CustomEnv []byte `json:"custom_env"`
|
|
CustomArgs []byte `json:"custom_args"`
|
|
McpConfig []byte `json:"mcp_config"`
|
|
Model pgtype.Text `json:"model"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, updateAgent,
|
|
arg.ID,
|
|
arg.Name,
|
|
arg.Description,
|
|
arg.AvatarUrl,
|
|
arg.RuntimeConfig,
|
|
arg.RuntimeMode,
|
|
arg.RuntimeID,
|
|
arg.Visibility,
|
|
arg.Status,
|
|
arg.MaxConcurrentTasks,
|
|
arg.Instructions,
|
|
arg.CustomEnv,
|
|
arg.CustomArgs,
|
|
arg.McpConfig,
|
|
arg.Model,
|
|
)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentStatus = `-- name: UpdateAgentStatus :one
|
|
UPDATE agent SET status = $2, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
type UpdateAgentStatusParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgentStatus(ctx context.Context, arg UpdateAgentStatusParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, updateAgentStatus, arg.ID, arg.Status)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentTaskSession = `-- name: UpdateAgentTaskSession :exec
|
|
UPDATE agent_task_queue
|
|
SET session_id = COALESCE($2, session_id),
|
|
work_dir = COALESCE($3, work_dir)
|
|
WHERE id = $1 AND status IN ('dispatched', 'running')
|
|
`
|
|
|
|
type UpdateAgentTaskSessionParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
// Pins the resume pointer mid-flight so a daemon crash leaves a usable
|
|
// session_id/work_dir on the task row. No-op if the task is no longer
|
|
// in dispatched/running.
|
|
func (q *Queries) UpdateAgentTaskSession(ctx context.Context, arg UpdateAgentTaskSessionParams) error {
|
|
_, err := q.db.Exec(ctx, updateAgentTaskSession, arg.ID, arg.SessionID, arg.WorkDir)
|
|
return err
|
|
}
|