mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
* fix: polish quick-create UX (kind labeling, dark toast, placeholder)
Three small fixes shaken out from using the agent-create flow:
- AgentTaskResponse now carries a `kind` discriminator
("comment" | "autopilot" | "chat" | "quick_create" | "direct"), computed
from the existing FK shape with no extra DB access. The Activity row
uses it to label quick-create tasks as "Creating issue" instead of
falling through to the generic "Untracked" — once the agent finishes
and the new issue is linked, the row transitions to the normal
identifier+title display.
- Sonner Toaster reads `resolvedTheme` instead of `theme`, so toasts
follow the actual dark/light state. Forwarding "system" let sonner
pick its own answer from `prefers-color-scheme`, which in the Electron
renderer can disagree with next-themes' `html.dark` class — the toast
rendered light on a dark UI.
- Agent-create placeholder rephrased to a more conversational example
with a project reference: "let Bohan fix the inbox loading slowness
in the Web project". Drops the priority hint (priority isn't widely
used) and matches how people actually instruct the agent.
* fix(quick-create): link new issue back to task on completion
Addresses the review on PR #1831: completed quick-create tasks were
left with issue_id=NULL forever, so the activity row stayed on
"Creating issue" instead of transitioning to the normal MUL-XXX +
title rendering once the agent finished.
- Server: notifyQuickCreateCompleted now writes the resolved issue id
back to agent_task_queue.issue_id via a new LinkTaskToIssue query
(guarded by `issue_id IS NULL` so it only ever fills the unset
quick-create case). Best-effort: a write failure logs but doesn't
block the inbox notification.
- Frontend: defensive wording fallback — kind=quick_create rows in
terminal status (completed/failed/cancelled) now render as
"Quick create" instead of the active "Creating issue" label,
covering rows whose link write failed or whose agent never
produced an issue at all.
1868 lines
56 KiB
Go
1868 lines
56 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.30.0
|
|
// source: agent.sql
|
|
|
|
package db
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const archiveAgent = `-- name: ArchiveAgent :one
|
|
UPDATE agent SET archived_at = now(), archived_by = $2, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
type ArchiveAgentParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
ArchivedBy pgtype.UUID `json:"archived_by"`
|
|
}
|
|
|
|
func (q *Queries) ArchiveAgent(ctx context.Context, arg ArchiveAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, archiveAgent, arg.ID, arg.ArchivedBy)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const cancelAgentTask = `-- name: CancelAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, cancelAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const cancelAgentTasksByAgent = `-- name: CancelAgentTasksByAgent :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE agent_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
// Bulk-cancel every active (queued/dispatched/running) task for an agent.
|
|
// Returns the affected rows so callers can broadcast task:cancelled events.
|
|
// Mirrors the shape of CancelAgentTasksByIssue / CancelAgentTasksByIssueAndAgent
|
|
// (also :many + RETURNING + completed_at) so the three sibling cancel paths
|
|
// behave consistently.
|
|
func (q *Queries) CancelAgentTasksByAgent(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByAgent, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByIssue = `-- name: CancelAgentTasksByIssue :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
// Cancels every active task on the issue and returns the affected rows so the
|
|
// caller can reconcile each agent's status and broadcast task:cancelled events
|
|
// (#1587). Prior :exec form silently dropped that info, so internal cancel
|
|
// paths (issue status flips to cancelled/done, etc.) left agents stuck at
|
|
// status="working" with no self-correction.
|
|
func (q *Queries) CancelAgentTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByIssueAndAgent = `-- name: CancelAgentTasksByIssueAndAgent :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
type CancelAgentTasksByIssueAndAgentParams struct {
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
}
|
|
|
|
// Cancels active tasks for a single (issue, agent) pair without touching
|
|
// tasks belonging to other agents on the same issue. Used by the manual
|
|
// rerun flow so re-running the assignee doesn't collateral-cancel a
|
|
// still-running @-mention agent on the same issue.
|
|
func (q *Queries) CancelAgentTasksByIssueAndAgent(ctx context.Context, arg CancelAgentTasksByIssueAndAgentParams) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByIssueAndAgent, arg.IssueID, arg.AgentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const cancelAgentTasksByTriggerComment = `-- name: CancelAgentTasksByTriggerComment :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'cancelled', completed_at = now()
|
|
WHERE trigger_comment_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
// Cancels active tasks whose trigger is the given comment. Called when a
|
|
// comment is deleted so the agent does not run with the now-deleted content
|
|
// already embedded in its prompt. Must run BEFORE the comment row is deleted
|
|
// because the FK ON DELETE SET NULL would otherwise nullify trigger_comment_id
|
|
// and we'd lose the ability to find the affected tasks.
|
|
func (q *Queries) CancelAgentTasksByTriggerComment(ctx context.Context, triggerCommentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, cancelAgentTasksByTriggerComment, triggerCommentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const claimAgentTask = `-- name: ClaimAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'dispatched', dispatched_at = now()
|
|
WHERE id = (
|
|
SELECT atq.id FROM agent_task_queue atq
|
|
WHERE atq.agent_id = $1 AND atq.status = 'queued'
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM agent_task_queue active
|
|
WHERE active.agent_id = atq.agent_id
|
|
AND active.status IN ('dispatched', 'running')
|
|
AND (
|
|
(atq.issue_id IS NOT NULL AND active.issue_id = atq.issue_id)
|
|
OR (atq.chat_session_id IS NOT NULL AND active.chat_session_id = atq.chat_session_id)
|
|
OR (
|
|
atq.issue_id IS NULL
|
|
AND atq.chat_session_id IS NULL
|
|
AND atq.autopilot_run_id IS NULL
|
|
AND active.issue_id IS NULL
|
|
AND active.chat_session_id IS NULL
|
|
AND active.autopilot_run_id IS NULL
|
|
)
|
|
)
|
|
)
|
|
ORDER BY atq.priority DESC, atq.created_at ASC
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
// Claims the next queued task for an agent, enforcing per-(issue, agent) serialization:
|
|
// a task is only claimable when no other task for the same issue AND same agent is
|
|
// already dispatched or running. This allows different agents to work on the same
|
|
// issue in parallel while preventing a single agent from running duplicate tasks.
|
|
// Chat tasks (issue_id IS NULL) use chat_session_id for serialization instead.
|
|
// Quick-create tasks have no issue / chat / autopilot link, so they serialize on
|
|
// "any other quick-create-shaped task" (all four FKs NULL) for the same agent —
|
|
// otherwise a user mashing the create button could fire concurrent quick-creates
|
|
// whose completion lookup would race over "most recent issue by this agent".
|
|
func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, claimAgentTask, agentID)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const clearAgentMcpConfig = `-- name: ClearAgentMcpConfig :one
|
|
UPDATE agent SET mcp_config = NULL, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
func (q *Queries) ClearAgentMcpConfig(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, clearAgentMcpConfig, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const completeAgentTask = `-- name: CompleteAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
|
|
WHERE id = $1 AND status = 'running'
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
type CompleteAgentTaskParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Result []byte `json:"result"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, completeAgentTask,
|
|
arg.ID,
|
|
arg.Result,
|
|
arg.SessionID,
|
|
arg.WorkDir,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const countRunningTasks = `-- name: CountRunningTasks :one
|
|
SELECT count(*) FROM agent_task_queue
|
|
WHERE agent_id = $1 AND status IN ('dispatched', 'running')
|
|
`
|
|
|
|
func (q *Queries) CountRunningTasks(ctx context.Context, agentID pgtype.UUID) (int64, error) {
|
|
row := q.db.QueryRow(ctx, countRunningTasks, agentID)
|
|
var count int64
|
|
err := row.Scan(&count)
|
|
return count, err
|
|
}
|
|
|
|
const createAgent = `-- name: CreateAgent :one
|
|
INSERT INTO agent (
|
|
workspace_id, name, description, avatar_url, runtime_mode,
|
|
runtime_config, runtime_id, visibility, max_concurrent_tasks, owner_id,
|
|
instructions, custom_env, custom_args, mcp_config, model
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
type CreateAgentParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Name string `json:"name"`
|
|
Description string `json:"description"`
|
|
AvatarUrl pgtype.Text `json:"avatar_url"`
|
|
RuntimeMode string `json:"runtime_mode"`
|
|
RuntimeConfig []byte `json:"runtime_config"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Visibility string `json:"visibility"`
|
|
MaxConcurrentTasks int32 `json:"max_concurrent_tasks"`
|
|
OwnerID pgtype.UUID `json:"owner_id"`
|
|
Instructions string `json:"instructions"`
|
|
CustomEnv []byte `json:"custom_env"`
|
|
CustomArgs []byte `json:"custom_args"`
|
|
McpConfig []byte `json:"mcp_config"`
|
|
Model pgtype.Text `json:"model"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, createAgent,
|
|
arg.WorkspaceID,
|
|
arg.Name,
|
|
arg.Description,
|
|
arg.AvatarUrl,
|
|
arg.RuntimeMode,
|
|
arg.RuntimeConfig,
|
|
arg.RuntimeID,
|
|
arg.Visibility,
|
|
arg.MaxConcurrentTasks,
|
|
arg.OwnerID,
|
|
arg.Instructions,
|
|
arg.CustomEnv,
|
|
arg.CustomArgs,
|
|
arg.McpConfig,
|
|
arg.Model,
|
|
)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createAgentTask = `-- name: CreateAgentTask :one
|
|
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id, trigger_summary)
|
|
VALUES ($1, $2, $3, 'queued', $4, $5, $6)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
type CreateAgentTaskParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
Priority int32 `json:"priority"`
|
|
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
|
|
TriggerSummary pgtype.Text `json:"trigger_summary"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createAgentTask,
|
|
arg.AgentID,
|
|
arg.RuntimeID,
|
|
arg.IssueID,
|
|
arg.Priority,
|
|
arg.TriggerCommentID,
|
|
arg.TriggerSummary,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createQuickCreateTask = `-- name: CreateQuickCreateTask :one
|
|
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, context)
|
|
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
type CreateQuickCreateTaskParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Priority int32 `json:"priority"`
|
|
Context []byte `json:"context"`
|
|
}
|
|
|
|
// Quick-create tasks have no issue / chat / autopilot link; the entire job
|
|
// description (prompt, requester, workspace) lives in context JSONB. The
|
|
// daemon detects this variant via context.type == "quick_create".
|
|
func (q *Queries) CreateQuickCreateTask(ctx context.Context, arg CreateQuickCreateTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createQuickCreateTask,
|
|
arg.AgentID,
|
|
arg.RuntimeID,
|
|
arg.Priority,
|
|
arg.Context,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createRetryTask = `-- name: CreateRetryTask :one
|
|
INSERT INTO agent_task_queue (
|
|
agent_id, runtime_id, issue_id, chat_session_id, autopilot_run_id,
|
|
status, priority, trigger_comment_id, trigger_summary, context,
|
|
session_id, work_dir,
|
|
attempt, max_attempts, parent_task_id
|
|
)
|
|
SELECT
|
|
p.agent_id, p.runtime_id, p.issue_id, p.chat_session_id, p.autopilot_run_id,
|
|
'queued', p.priority, p.trigger_comment_id, p.trigger_summary, p.context,
|
|
p.session_id, p.work_dir,
|
|
p.attempt + 1, p.max_attempts, p.id
|
|
FROM agent_task_queue p
|
|
WHERE p.id = $1
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
// Clones a parent task into a fresh queued attempt. Carries forward the
|
|
// agent's resume context (session_id/work_dir) so the child can continue
|
|
// the conversation when the backend supports it. attempt is incremented;
|
|
// max_attempts and trigger_comment_id are inherited.
|
|
func (q *Queries) CreateRetryTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createRetryTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const failAgentTask = `-- name: FailAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed',
|
|
completed_at = now(),
|
|
error = $2,
|
|
failure_reason = COALESCE($3, 'agent_error'),
|
|
session_id = COALESCE($4, session_id),
|
|
work_dir = COALESCE($5, work_dir)
|
|
WHERE id = $1 AND status IN ('dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
type FailAgentTaskParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Error pgtype.Text `json:"error"`
|
|
FailureReason pgtype.Text `json:"failure_reason"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
// Marks a task as failed. session_id and work_dir are merged via COALESCE so
|
|
// if the agent already established a real session before failing (e.g. it
|
|
// crashed mid-conversation, was cancelled, or hit a tool error) the resume
|
|
// pointer is preserved on the task row. The next chat task can then fall
|
|
// back to GetLastChatTaskSession and continue the conversation instead of
|
|
// silently starting over.
|
|
//
|
|
// failure_reason is a coarse classifier consumed by the auto-retry path;
|
|
// 'agent_error' is the safe default when the daemon doesn't supply one.
|
|
func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, failAgentTask,
|
|
arg.ID,
|
|
arg.Error,
|
|
arg.FailureReason,
|
|
arg.SessionID,
|
|
arg.WorkDir,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const failStaleTasks = `-- name: FailStaleTasks :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed', completed_at = now(), error = 'task timed out',
|
|
failure_reason = 'timeout'
|
|
WHERE (status = 'dispatched' AND dispatched_at < now() - make_interval(secs => $1::double precision))
|
|
OR (status = 'running' AND started_at < now() - make_interval(secs => $2::double precision))
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
type FailStaleTasksParams struct {
|
|
DispatchTimeoutSecs float64 `json:"dispatch_timeout_secs"`
|
|
RunningTimeoutSecs float64 `json:"running_timeout_secs"`
|
|
}
|
|
|
|
// Fails tasks stuck in dispatched/running beyond the given thresholds.
|
|
// Handles cases where the daemon is alive but the task is orphaned
|
|
// (e.g. agent process hung, daemon failed to report completion).
|
|
func (q *Queries) FailStaleTasks(ctx context.Context, arg FailStaleTasksParams) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, failStaleTasks, arg.DispatchTimeoutSecs, arg.RunningTimeoutSecs)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getAgent = `-- name: GetAgent :one
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model FROM agent
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, getAgent, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentInWorkspace = `-- name: GetAgentInWorkspace :one
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model FROM agent
|
|
WHERE id = $1 AND workspace_id = $2
|
|
`
|
|
|
|
type GetAgentInWorkspaceParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspaceParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, getAgentInWorkspace, arg.ID, arg.WorkspaceID)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentTask = `-- name: GetAgentTask :one
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
|
WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, getAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getLastTaskSession = `-- name: GetLastTaskSession :one
|
|
SELECT session_id, work_dir FROM agent_task_queue
|
|
WHERE agent_id = $1 AND issue_id = $2
|
|
AND status IN ('completed', 'failed')
|
|
AND session_id IS NOT NULL
|
|
ORDER BY COALESCE(completed_at, started_at, dispatched_at, created_at) DESC
|
|
LIMIT 1
|
|
`
|
|
|
|
type GetLastTaskSessionParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
}
|
|
|
|
type GetLastTaskSessionRow struct {
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
// Returns the session_id and work_dir from the most recent task for a given
|
|
// (agent_id, issue_id) pair, used for session resumption. We accept both
|
|
// 'completed' and 'failed' tasks: a failed task may have established a real
|
|
// agent session before crashing (orphaned by a daemon restart, runtime offline,
|
|
// or sweeper timeout), and the daemon pins the resume pointer mid-flight via
|
|
// UpdateAgentTaskSession. Without this, an auto-retry / manual rerun of a
|
|
// mid-run failure would silently start a fresh conversation and lose the
|
|
// in-flight context — exactly what MUL-1128's B branch is meant to fix.
|
|
func (q *Queries) GetLastTaskSession(ctx context.Context, arg GetLastTaskSessionParams) (GetLastTaskSessionRow, error) {
|
|
row := q.db.QueryRow(ctx, getLastTaskSession, arg.AgentID, arg.IssueID)
|
|
var i GetLastTaskSessionRow
|
|
err := row.Scan(&i.SessionID, &i.WorkDir)
|
|
return i, err
|
|
}
|
|
|
|
const getWorkspaceAgentActivity30d = `-- name: GetWorkspaceAgentActivity30d :many
|
|
SELECT
|
|
atq.agent_id,
|
|
DATE_TRUNC('day', atq.completed_at)::timestamptz AS bucket,
|
|
COUNT(*)::int AS task_count,
|
|
COUNT(*) FILTER (WHERE atq.status = 'failed')::int AS failed_count
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.completed_at IS NOT NULL
|
|
AND atq.completed_at > now() - INTERVAL '30 days'
|
|
GROUP BY atq.agent_id, bucket
|
|
ORDER BY atq.agent_id, bucket
|
|
`
|
|
|
|
type GetWorkspaceAgentActivity30dRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
Bucket pgtype.Timestamptz `json:"bucket"`
|
|
TaskCount int32 `json:"task_count"`
|
|
FailedCount int32 `json:"failed_count"`
|
|
}
|
|
|
|
// Returns per-agent daily activity buckets for the last 30 days. Single
|
|
// workspace-wide read backs both surfaces:
|
|
// - Agents list ACTIVITY column — uses only the trailing 7 buckets
|
|
// - Agent detail "Last 30 days" panel — uses the full 30
|
|
//
|
|
// 30 days contains 7 days, so one fetch + a client-side .slice(-7) wins
|
|
// over fetching twice. Days with no completion produce no row; the
|
|
// front-end zero-fills.
|
|
//
|
|
// Anchored on completed_at (not created_at) because the sparkline answers
|
|
// "what did this agent produce?" not "what was queued at it?". A task that's
|
|
// still in flight has no completed_at and contributes nothing here — that's
|
|
// correct: in-flight tasks are surfaced via the live presence indicator,
|
|
// not the historical trend.
|
|
func (q *Queries) GetWorkspaceAgentActivity30d(ctx context.Context, workspaceID pgtype.UUID) ([]GetWorkspaceAgentActivity30dRow, error) {
|
|
rows, err := q.db.Query(ctx, getWorkspaceAgentActivity30d, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []GetWorkspaceAgentActivity30dRow{}
|
|
for rows.Next() {
|
|
var i GetWorkspaceAgentActivity30dRow
|
|
if err := rows.Scan(
|
|
&i.AgentID,
|
|
&i.Bucket,
|
|
&i.TaskCount,
|
|
&i.FailedCount,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const getWorkspaceAgentRunCounts = `-- name: GetWorkspaceAgentRunCounts :many
|
|
SELECT
|
|
atq.agent_id,
|
|
COUNT(*)::int AS run_count
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.created_at > now() - INTERVAL '30 days'
|
|
GROUP BY atq.agent_id
|
|
`
|
|
|
|
type GetWorkspaceAgentRunCountsRow struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RunCount int32 `json:"run_count"`
|
|
}
|
|
|
|
// Total task runs per agent over the trailing 30 days, used by the Agents
|
|
// list RUNS column. 30-day window keeps the count meaningful (a long-dormant
|
|
// agent shouldn't show "5,420 runs from 2 years ago") and keeps the scan
|
|
// bounded as the workspace ages.
|
|
func (q *Queries) GetWorkspaceAgentRunCounts(ctx context.Context, workspaceID pgtype.UUID) ([]GetWorkspaceAgentRunCountsRow, error) {
|
|
rows, err := q.db.Query(ctx, getWorkspaceAgentRunCounts, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []GetWorkspaceAgentRunCountsRow{}
|
|
for rows.Next() {
|
|
var i GetWorkspaceAgentRunCountsRow
|
|
if err := rows.Scan(&i.AgentID, &i.RunCount); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const hasActiveTaskForIssue = `-- name: HasActiveTaskForIssue :one
|
|
SELECT count(*) > 0 AS has_active FROM agent_task_queue
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched', 'running')
|
|
`
|
|
|
|
// Returns true if there is any queued, dispatched, or running task for the issue.
|
|
func (q *Queries) HasActiveTaskForIssue(ctx context.Context, issueID pgtype.UUID) (bool, error) {
|
|
row := q.db.QueryRow(ctx, hasActiveTaskForIssue, issueID)
|
|
var has_active bool
|
|
err := row.Scan(&has_active)
|
|
return has_active, err
|
|
}
|
|
|
|
const hasPendingTaskForIssue = `-- name: HasPendingTaskForIssue :one
|
|
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
|
WHERE issue_id = $1 AND status IN ('queued', 'dispatched')
|
|
`
|
|
|
|
// Returns true if there is a queued or dispatched (but not yet running) task for the issue.
|
|
// Used by the coalescing queue: allow enqueue when a task is running (so
|
|
// the agent picks up new comments on the next cycle) but skip if a pending
|
|
// task already exists (natural dedup).
|
|
func (q *Queries) HasPendingTaskForIssue(ctx context.Context, issueID pgtype.UUID) (bool, error) {
|
|
row := q.db.QueryRow(ctx, hasPendingTaskForIssue, issueID)
|
|
var has_pending bool
|
|
err := row.Scan(&has_pending)
|
|
return has_pending, err
|
|
}
|
|
|
|
const hasPendingTaskForIssueAndAgent = `-- name: HasPendingTaskForIssueAndAgent :one
|
|
SELECT count(*) > 0 AS has_pending FROM agent_task_queue
|
|
WHERE issue_id = $1 AND agent_id = $2 AND status IN ('queued', 'dispatched')
|
|
`
|
|
|
|
type HasPendingTaskForIssueAndAgentParams struct {
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
}
|
|
|
|
// Returns true if a specific agent already has a queued or dispatched task
|
|
// for the given issue. Used by @mention trigger dedup.
|
|
func (q *Queries) HasPendingTaskForIssueAndAgent(ctx context.Context, arg HasPendingTaskForIssueAndAgentParams) (bool, error) {
|
|
row := q.db.QueryRow(ctx, hasPendingTaskForIssueAndAgent, arg.IssueID, arg.AgentID)
|
|
var has_pending bool
|
|
err := row.Scan(&has_pending)
|
|
return has_pending, err
|
|
}
|
|
|
|
const linkTaskToIssue = `-- name: LinkTaskToIssue :exec
|
|
UPDATE agent_task_queue
|
|
SET issue_id = $2
|
|
WHERE id = $1 AND issue_id IS NULL
|
|
`
|
|
|
|
type LinkTaskToIssueParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
IssueID pgtype.UUID `json:"issue_id"`
|
|
}
|
|
|
|
// Attaches the issue a quick-create task produced back to the task row, once
|
|
// the agent has finished and the issue exists. Guarded by `issue_id IS NULL`
|
|
// so this never overwrites an issue id that was set at task creation (only
|
|
// quick-create tasks land here unset). Fixes the activity row staying on
|
|
// "Creating issue" forever after completion.
|
|
func (q *Queries) LinkTaskToIssue(ctx context.Context, arg LinkTaskToIssueParams) error {
|
|
_, err := q.db.Exec(ctx, linkTaskToIssue, arg.ID, arg.IssueID)
|
|
return err
|
|
}
|
|
|
|
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
|
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listActiveTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgentTasks = `-- name: ListAgentTasks :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
|
WHERE agent_id = $1
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listAgentTasks, agentID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgents = `-- name: ListAgents :many
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model FROM agent
|
|
WHERE workspace_id = $1 AND archived_at IS NULL
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, listAgents, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAllAgents = `-- name: ListAllAgents :many
|
|
SELECT id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model FROM agent
|
|
WHERE workspace_id = $1
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([]Agent, error) {
|
|
rows, err := q.db.Query(ctx, listAllAgents, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agent{}
|
|
for rows.Next() {
|
|
var i Agent
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
|
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
|
ORDER BY priority DESC, created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listPendingTasksByRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listTasksByIssue = `-- name: ListTasksByIssue :many
|
|
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary FROM agent_task_queue
|
|
WHERE issue_id = $1
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listTasksByIssue, issueID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listWorkspaceAgentTaskSnapshot = `-- name: ListWorkspaceAgentTaskSnapshot :many
|
|
SELECT atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.status IN ('queued', 'dispatched', 'running')
|
|
|
|
UNION ALL
|
|
|
|
SELECT t.id, t.agent_id, t.issue_id, t.status, t.priority, t.dispatched_at, t.started_at, t.completed_at, t.result, t.error, t.created_at, t.context, t.runtime_id, t.session_id, t.work_dir, t.trigger_comment_id, t.chat_session_id, t.autopilot_run_id, t.attempt, t.max_attempts, t.parent_task_id, t.failure_reason, t.last_heartbeat_at, t.trigger_summary FROM (
|
|
SELECT DISTINCT ON (atq.agent_id) atq.id, atq.agent_id, atq.issue_id, atq.status, atq.priority, atq.dispatched_at, atq.started_at, atq.completed_at, atq.result, atq.error, atq.created_at, atq.context, atq.runtime_id, atq.session_id, atq.work_dir, atq.trigger_comment_id, atq.chat_session_id, atq.autopilot_run_id, atq.attempt, atq.max_attempts, atq.parent_task_id, atq.failure_reason, atq.last_heartbeat_at, atq.trigger_summary
|
|
FROM agent_task_queue atq
|
|
JOIN agent a ON a.id = atq.agent_id
|
|
WHERE a.workspace_id = $1
|
|
AND atq.status IN ('completed', 'failed')
|
|
ORDER BY atq.agent_id, atq.completed_at DESC NULLS LAST
|
|
) t
|
|
`
|
|
|
|
// Returns the tasks needed to derive each agent's current presence:
|
|
// - All active tasks (queued / dispatched / running) — for working signal + counts
|
|
// - Each agent's most recent OUTCOME task (completed / failed) — for sticky
|
|
// failed signal
|
|
//
|
|
// The front-end picks "active wins, else latest outcome" — see derive-presence.ts.
|
|
//
|
|
// Cancelled tasks are excluded from the outcome half on purpose: cancel is a
|
|
// procedural signal ("attempt aborted"), not an outcome. It tells us nothing
|
|
// about whether the agent works, so it must NOT be allowed to mask a prior
|
|
// failure. Concretely: if an agent fails and then the user cancels the queued
|
|
// retry (or the parent issue closes and cascades cancels), the failed signal
|
|
// has to stay red. Only a real success (completed) or a fresh attempt (active)
|
|
// clears it.
|
|
//
|
|
// No UI windows in SQL: stickiness is decided by "is the latest outcome a
|
|
// failure?", not a 2-minute clock. JOINs agent because agent_task_queue has
|
|
// no workspace_id column.
|
|
func (q *Queries) ListWorkspaceAgentTaskSnapshot(ctx context.Context, workspaceID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, listWorkspaceAgentTaskSnapshot, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const recoverOrphanedTasksForRuntime = `-- name: RecoverOrphanedTasksForRuntime :many
|
|
UPDATE agent_task_queue
|
|
SET status = 'failed',
|
|
completed_at = now(),
|
|
error = 'daemon restarted while task was in flight',
|
|
failure_reason = 'runtime_recovery'
|
|
WHERE runtime_id = $1 AND status IN ('dispatched', 'running')
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
// Called by the daemon at startup. Atomically fails any dispatched/running
|
|
// task that the prior incarnation of this runtime owned but did not
|
|
// finalize. Returns the failed rows so callers can hand them to the
|
|
// auto-retry path.
|
|
func (q *Queries) RecoverOrphanedTasksForRuntime(ctx context.Context, runtimeID pgtype.UUID) ([]AgentTaskQueue, error) {
|
|
rows, err := q.db.Query(ctx, recoverOrphanedTasksForRuntime, runtimeID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentTaskQueue{}
|
|
for rows.Next() {
|
|
var i AgentTaskQueue
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const refreshAgentStatusFromTasks = `-- name: RefreshAgentStatusFromTasks :one
|
|
UPDATE agent AS a
|
|
SET status = CASE WHEN EXISTS (
|
|
SELECT 1 FROM agent_task_queue q
|
|
WHERE q.agent_id = a.id AND q.status IN ('dispatched', 'running')
|
|
) THEN 'working' ELSE 'idle' END,
|
|
updated_at = now()
|
|
WHERE a.id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
func (q *Queries) RefreshAgentStatusFromTasks(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, refreshAgentStatusFromTasks, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const restoreAgent = `-- name: RestoreAgent :one
|
|
UPDATE agent SET archived_at = NULL, archived_by = NULL, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
func (q *Queries) RestoreAgent(ctx context.Context, id pgtype.UUID) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, restoreAgent, id)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const startAgentTask = `-- name: StartAgentTask :one
|
|
UPDATE agent_task_queue
|
|
SET status = 'running', started_at = now()
|
|
WHERE id = $1 AND status = 'dispatched'
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, chat_session_id, autopilot_run_id, attempt, max_attempts, parent_task_id, failure_reason, last_heartbeat_at, trigger_summary
|
|
`
|
|
|
|
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, startAgentTask, id)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.ChatSessionID,
|
|
&i.AutopilotRunID,
|
|
&i.Attempt,
|
|
&i.MaxAttempts,
|
|
&i.ParentTaskID,
|
|
&i.FailureReason,
|
|
&i.LastHeartbeatAt,
|
|
&i.TriggerSummary,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgent = `-- name: UpdateAgent :one
|
|
UPDATE agent SET
|
|
name = COALESCE($2, name),
|
|
description = COALESCE($3, description),
|
|
avatar_url = COALESCE($4, avatar_url),
|
|
runtime_config = COALESCE($5, runtime_config),
|
|
runtime_mode = COALESCE($6, runtime_mode),
|
|
runtime_id = COALESCE($7, runtime_id),
|
|
visibility = COALESCE($8, visibility),
|
|
status = COALESCE($9, status),
|
|
max_concurrent_tasks = COALESCE($10, max_concurrent_tasks),
|
|
instructions = COALESCE($11, instructions),
|
|
custom_env = COALESCE($12, custom_env),
|
|
custom_args = COALESCE($13, custom_args),
|
|
mcp_config = COALESCE($14, mcp_config),
|
|
model = COALESCE($15, model),
|
|
updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
type UpdateAgentParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Name pgtype.Text `json:"name"`
|
|
Description pgtype.Text `json:"description"`
|
|
AvatarUrl pgtype.Text `json:"avatar_url"`
|
|
RuntimeConfig []byte `json:"runtime_config"`
|
|
RuntimeMode pgtype.Text `json:"runtime_mode"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Visibility pgtype.Text `json:"visibility"`
|
|
Status pgtype.Text `json:"status"`
|
|
MaxConcurrentTasks pgtype.Int4 `json:"max_concurrent_tasks"`
|
|
Instructions pgtype.Text `json:"instructions"`
|
|
CustomEnv []byte `json:"custom_env"`
|
|
CustomArgs []byte `json:"custom_args"`
|
|
McpConfig []byte `json:"mcp_config"`
|
|
Model pgtype.Text `json:"model"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgent(ctx context.Context, arg UpdateAgentParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, updateAgent,
|
|
arg.ID,
|
|
arg.Name,
|
|
arg.Description,
|
|
arg.AvatarUrl,
|
|
arg.RuntimeConfig,
|
|
arg.RuntimeMode,
|
|
arg.RuntimeID,
|
|
arg.Visibility,
|
|
arg.Status,
|
|
arg.MaxConcurrentTasks,
|
|
arg.Instructions,
|
|
arg.CustomEnv,
|
|
arg.CustomArgs,
|
|
arg.McpConfig,
|
|
arg.Model,
|
|
)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentStatus = `-- name: UpdateAgentStatus :one
|
|
UPDATE agent SET status = $2, updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, name, avatar_url, runtime_mode, runtime_config, visibility, status, max_concurrent_tasks, owner_id, created_at, updated_at, description, runtime_id, instructions, archived_at, archived_by, custom_env, custom_args, mcp_config, model
|
|
`
|
|
|
|
type UpdateAgentStatusParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgentStatus(ctx context.Context, arg UpdateAgentStatusParams) (Agent, error) {
|
|
row := q.db.QueryRow(ctx, updateAgentStatus, arg.ID, arg.Status)
|
|
var i Agent
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Name,
|
|
&i.AvatarUrl,
|
|
&i.RuntimeMode,
|
|
&i.RuntimeConfig,
|
|
&i.Visibility,
|
|
&i.Status,
|
|
&i.MaxConcurrentTasks,
|
|
&i.OwnerID,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
&i.Description,
|
|
&i.RuntimeID,
|
|
&i.Instructions,
|
|
&i.ArchivedAt,
|
|
&i.ArchivedBy,
|
|
&i.CustomEnv,
|
|
&i.CustomArgs,
|
|
&i.McpConfig,
|
|
&i.Model,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentTaskSession = `-- name: UpdateAgentTaskSession :exec
|
|
UPDATE agent_task_queue
|
|
SET session_id = COALESCE($2, session_id),
|
|
work_dir = COALESCE($3, work_dir),
|
|
last_heartbeat_at = now()
|
|
WHERE id = $1 AND status IN ('dispatched', 'running')
|
|
`
|
|
|
|
type UpdateAgentTaskSessionParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
SessionID pgtype.Text `json:"session_id"`
|
|
WorkDir pgtype.Text `json:"work_dir"`
|
|
}
|
|
|
|
// Pins the resume pointer mid-flight so a daemon crash leaves a usable
|
|
// session_id/work_dir on the task row. No-op if the task is no longer
|
|
// in dispatched/running.
|
|
func (q *Queries) UpdateAgentTaskSession(ctx context.Context, arg UpdateAgentTaskSessionParams) error {
|
|
_, err := q.db.Exec(ctx, updateAgentTaskSession, arg.ID, arg.SessionID, arg.WorkDir)
|
|
return err
|
|
}
|