mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
Implements the Agentflow feature end-to-end: **Database** (migration 032): - `agentflow` table (title, description/prompt, agent, status, concurrency policy) - `agentflow_trigger` table (schedule/webhook/api with cron + timezone) - `agentflow_run` table (execution history with status tracking) - Extends `agent_task_queue` with nullable `issue_id` and `agentflow_run_id` **Backend**: - CRUD API for agentflows, triggers, and runs - Server-side scheduler goroutine (30s polling, CAS-safe trigger claiming) - Cron expression parsing via robfig/cron/v3 with timezone support - Concurrency policies: skip_if_active, coalesce, always_run - Manual trigger endpoint (POST /agentflows/:id/run) - Task service extended with EnqueueTaskForAgentflow() - Broadcast methods handle nullable issue_id for agentflow tasks **Daemon**: - Task type extended with agentflow context (title, description, run_id) - BuildPrompt generates agentflow-specific prompts - Execution environment writes agentflow context to CLAUDE.md - Claim response includes agentflow data for workspace/repo resolution **Frontend**: - Sidebar entry (Agentflows with Zap icon) - List + detail page with triggers and run history tabs - Create dialog with agent selection, cron, timezone, concurrency policy - Pause/activate toggle and manual run button - Zustand store for agentflow state management - TypeScript types and API client methods Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
765 lines
21 KiB
Go
765 lines
21 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.30.0
|
|
// source: agentflow.sql
|
|
|
|
package db
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const archiveAgentflow = `-- name: ArchiveAgentflow :one
|
|
UPDATE agentflow SET status = 'archived', updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
|
|
`
|
|
|
|
func (q *Queries) ArchiveAgentflow(ctx context.Context, id pgtype.UUID) (Agentflow, error) {
|
|
row := q.db.QueryRow(ctx, archiveAgentflow, id)
|
|
var i Agentflow
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Title,
|
|
&i.Description,
|
|
&i.AgentID,
|
|
&i.Status,
|
|
&i.ConcurrencyPolicy,
|
|
&i.Variables,
|
|
&i.CreatedBy,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const claimDueScheduleTriggers = `-- name: ClaimDueScheduleTriggers :many
|
|
UPDATE agentflow_trigger t SET
|
|
next_run_at = NULL, -- will be recalculated by caller
|
|
last_fired_at = now()
|
|
FROM agentflow a
|
|
WHERE t.agentflow_id = a.id
|
|
AND t.kind = 'schedule'
|
|
AND t.enabled = true
|
|
AND a.status = 'active'
|
|
AND t.next_run_at IS NOT NULL
|
|
AND t.next_run_at <= now()
|
|
RETURNING t.id, t.agentflow_id, t.kind, t.enabled, t.cron_expression, t.timezone, t.next_run_at, t.public_id, t.secret_hash, t.signing_mode, t.last_fired_at, t.created_at, a.workspace_id, a.agent_id, a.title AS agentflow_title, a.description AS agentflow_description, a.concurrency_policy
|
|
`
|
|
|
|
type ClaimDueScheduleTriggersRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
|
Kind string `json:"kind"`
|
|
Enabled bool `json:"enabled"`
|
|
CronExpression pgtype.Text `json:"cron_expression"`
|
|
Timezone pgtype.Text `json:"timezone"`
|
|
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
|
PublicID pgtype.Text `json:"public_id"`
|
|
SecretHash pgtype.Text `json:"secret_hash"`
|
|
SigningMode pgtype.Text `json:"signing_mode"`
|
|
LastFiredAt pgtype.Timestamptz `json:"last_fired_at"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
AgentflowTitle string `json:"agentflow_title"`
|
|
AgentflowDescription pgtype.Text `json:"agentflow_description"`
|
|
ConcurrencyPolicy string `json:"concurrency_policy"`
|
|
}
|
|
|
|
// Atomically claims all schedule triggers that are due.
|
|
// Uses CAS (compare-and-swap) on next_run_at to prevent double-firing
|
|
// across multiple server instances.
|
|
func (q *Queries) ClaimDueScheduleTriggers(ctx context.Context) ([]ClaimDueScheduleTriggersRow, error) {
|
|
rows, err := q.db.Query(ctx, claimDueScheduleTriggers)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []ClaimDueScheduleTriggersRow{}
|
|
for rows.Next() {
|
|
var i ClaimDueScheduleTriggersRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.Kind,
|
|
&i.Enabled,
|
|
&i.CronExpression,
|
|
&i.Timezone,
|
|
&i.NextRunAt,
|
|
&i.PublicID,
|
|
&i.SecretHash,
|
|
&i.SigningMode,
|
|
&i.LastFiredAt,
|
|
&i.CreatedAt,
|
|
&i.WorkspaceID,
|
|
&i.AgentID,
|
|
&i.AgentflowTitle,
|
|
&i.AgentflowDescription,
|
|
&i.ConcurrencyPolicy,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const completeAgentflowRun = `-- name: CompleteAgentflowRun :one
|
|
UPDATE agentflow_run SET
|
|
status = $2,
|
|
agent_output = $3,
|
|
linked_issue_id = $4,
|
|
completed_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
|
|
`
|
|
|
|
type CompleteAgentflowRunParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Status string `json:"status"`
|
|
AgentOutput pgtype.Text `json:"agent_output"`
|
|
LinkedIssueID pgtype.UUID `json:"linked_issue_id"`
|
|
}
|
|
|
|
func (q *Queries) CompleteAgentflowRun(ctx context.Context, arg CompleteAgentflowRunParams) (AgentflowRun, error) {
|
|
row := q.db.QueryRow(ctx, completeAgentflowRun,
|
|
arg.ID,
|
|
arg.Status,
|
|
arg.AgentOutput,
|
|
arg.LinkedIssueID,
|
|
)
|
|
var i AgentflowRun
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.TriggerID,
|
|
&i.SourceKind,
|
|
&i.Status,
|
|
&i.LinkedIssueID,
|
|
&i.Payload,
|
|
&i.AgentOutput,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.IdempotencyKey,
|
|
&i.CreatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createAgentflow = `-- name: CreateAgentflow :one
|
|
INSERT INTO agentflow (
|
|
workspace_id, title, description, agent_id, status,
|
|
concurrency_policy, variables, created_by
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
|
|
`
|
|
|
|
type CreateAgentflowParams struct {
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
Title string `json:"title"`
|
|
Description pgtype.Text `json:"description"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
Status string `json:"status"`
|
|
ConcurrencyPolicy string `json:"concurrency_policy"`
|
|
Variables []byte `json:"variables"`
|
|
CreatedBy pgtype.UUID `json:"created_by"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgentflow(ctx context.Context, arg CreateAgentflowParams) (Agentflow, error) {
|
|
row := q.db.QueryRow(ctx, createAgentflow,
|
|
arg.WorkspaceID,
|
|
arg.Title,
|
|
arg.Description,
|
|
arg.AgentID,
|
|
arg.Status,
|
|
arg.ConcurrencyPolicy,
|
|
arg.Variables,
|
|
arg.CreatedBy,
|
|
)
|
|
var i Agentflow
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Title,
|
|
&i.Description,
|
|
&i.AgentID,
|
|
&i.Status,
|
|
&i.ConcurrencyPolicy,
|
|
&i.Variables,
|
|
&i.CreatedBy,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createAgentflowRun = `-- name: CreateAgentflowRun :one
|
|
INSERT INTO agentflow_run (
|
|
agentflow_id, trigger_id, source_kind, status, payload, idempotency_key
|
|
) VALUES ($1, $2, $3, $4, $5, $6)
|
|
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
|
|
`
|
|
|
|
type CreateAgentflowRunParams struct {
|
|
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
|
TriggerID pgtype.UUID `json:"trigger_id"`
|
|
SourceKind string `json:"source_kind"`
|
|
Status string `json:"status"`
|
|
Payload []byte `json:"payload"`
|
|
IdempotencyKey pgtype.Text `json:"idempotency_key"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgentflowRun(ctx context.Context, arg CreateAgentflowRunParams) (AgentflowRun, error) {
|
|
row := q.db.QueryRow(ctx, createAgentflowRun,
|
|
arg.AgentflowID,
|
|
arg.TriggerID,
|
|
arg.SourceKind,
|
|
arg.Status,
|
|
arg.Payload,
|
|
arg.IdempotencyKey,
|
|
)
|
|
var i AgentflowRun
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.TriggerID,
|
|
&i.SourceKind,
|
|
&i.Status,
|
|
&i.LinkedIssueID,
|
|
&i.Payload,
|
|
&i.AgentOutput,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.IdempotencyKey,
|
|
&i.CreatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createAgentflowTask = `-- name: CreateAgentflowTask :one
|
|
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, agentflow_run_id)
|
|
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
|
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
|
|
`
|
|
|
|
type CreateAgentflowTaskParams struct {
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
RuntimeID pgtype.UUID `json:"runtime_id"`
|
|
Priority int32 `json:"priority"`
|
|
AgentflowRunID pgtype.UUID `json:"agentflow_run_id"`
|
|
}
|
|
|
|
// Creates a task in the queue for an agentflow run (issue_id is NULL).
|
|
func (q *Queries) CreateAgentflowTask(ctx context.Context, arg CreateAgentflowTaskParams) (AgentTaskQueue, error) {
|
|
row := q.db.QueryRow(ctx, createAgentflowTask,
|
|
arg.AgentID,
|
|
arg.RuntimeID,
|
|
arg.Priority,
|
|
arg.AgentflowRunID,
|
|
)
|
|
var i AgentTaskQueue
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentID,
|
|
&i.IssueID,
|
|
&i.Status,
|
|
&i.Priority,
|
|
&i.DispatchedAt,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.Result,
|
|
&i.Error,
|
|
&i.CreatedAt,
|
|
&i.Context,
|
|
&i.RuntimeID,
|
|
&i.SessionID,
|
|
&i.WorkDir,
|
|
&i.TriggerCommentID,
|
|
&i.AgentflowRunID,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const createAgentflowTrigger = `-- name: CreateAgentflowTrigger :one
|
|
INSERT INTO agentflow_trigger (
|
|
agentflow_id, kind, enabled,
|
|
cron_expression, timezone, next_run_at,
|
|
public_id, secret_hash, signing_mode
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
RETURNING id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at
|
|
`
|
|
|
|
type CreateAgentflowTriggerParams struct {
|
|
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
|
Kind string `json:"kind"`
|
|
Enabled bool `json:"enabled"`
|
|
CronExpression pgtype.Text `json:"cron_expression"`
|
|
Timezone pgtype.Text `json:"timezone"`
|
|
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
|
PublicID pgtype.Text `json:"public_id"`
|
|
SecretHash pgtype.Text `json:"secret_hash"`
|
|
SigningMode pgtype.Text `json:"signing_mode"`
|
|
}
|
|
|
|
func (q *Queries) CreateAgentflowTrigger(ctx context.Context, arg CreateAgentflowTriggerParams) (AgentflowTrigger, error) {
|
|
row := q.db.QueryRow(ctx, createAgentflowTrigger,
|
|
arg.AgentflowID,
|
|
arg.Kind,
|
|
arg.Enabled,
|
|
arg.CronExpression,
|
|
arg.Timezone,
|
|
arg.NextRunAt,
|
|
arg.PublicID,
|
|
arg.SecretHash,
|
|
arg.SigningMode,
|
|
)
|
|
var i AgentflowTrigger
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.Kind,
|
|
&i.Enabled,
|
|
&i.CronExpression,
|
|
&i.Timezone,
|
|
&i.NextRunAt,
|
|
&i.PublicID,
|
|
&i.SecretHash,
|
|
&i.SigningMode,
|
|
&i.LastFiredAt,
|
|
&i.CreatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const deleteAgentflowTrigger = `-- name: DeleteAgentflowTrigger :exec
|
|
DELETE FROM agentflow_trigger WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) DeleteAgentflowTrigger(ctx context.Context, id pgtype.UUID) error {
|
|
_, err := q.db.Exec(ctx, deleteAgentflowTrigger, id)
|
|
return err
|
|
}
|
|
|
|
const getAgentflow = `-- name: GetAgentflow :one
|
|
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgentflow(ctx context.Context, id pgtype.UUID) (Agentflow, error) {
|
|
row := q.db.QueryRow(ctx, getAgentflow, id)
|
|
var i Agentflow
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Title,
|
|
&i.Description,
|
|
&i.AgentID,
|
|
&i.Status,
|
|
&i.ConcurrencyPolicy,
|
|
&i.Variables,
|
|
&i.CreatedBy,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentflowInWorkspace = `-- name: GetAgentflowInWorkspace :one
|
|
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow WHERE id = $1 AND workspace_id = $2
|
|
`
|
|
|
|
type GetAgentflowInWorkspaceParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
|
}
|
|
|
|
func (q *Queries) GetAgentflowInWorkspace(ctx context.Context, arg GetAgentflowInWorkspaceParams) (Agentflow, error) {
|
|
row := q.db.QueryRow(ctx, getAgentflowInWorkspace, arg.ID, arg.WorkspaceID)
|
|
var i Agentflow
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Title,
|
|
&i.Description,
|
|
&i.AgentID,
|
|
&i.Status,
|
|
&i.ConcurrencyPolicy,
|
|
&i.Variables,
|
|
&i.CreatedBy,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentflowRun = `-- name: GetAgentflowRun :one
|
|
SELECT id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at FROM agentflow_run WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgentflowRun(ctx context.Context, id pgtype.UUID) (AgentflowRun, error) {
|
|
row := q.db.QueryRow(ctx, getAgentflowRun, id)
|
|
var i AgentflowRun
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.TriggerID,
|
|
&i.SourceKind,
|
|
&i.Status,
|
|
&i.LinkedIssueID,
|
|
&i.Payload,
|
|
&i.AgentOutput,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.IdempotencyKey,
|
|
&i.CreatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const getAgentflowTrigger = `-- name: GetAgentflowTrigger :one
|
|
SELECT id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at FROM agentflow_trigger WHERE id = $1
|
|
`
|
|
|
|
func (q *Queries) GetAgentflowTrigger(ctx context.Context, id pgtype.UUID) (AgentflowTrigger, error) {
|
|
row := q.db.QueryRow(ctx, getAgentflowTrigger, id)
|
|
var i AgentflowTrigger
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.Kind,
|
|
&i.Enabled,
|
|
&i.CronExpression,
|
|
&i.Timezone,
|
|
&i.NextRunAt,
|
|
&i.PublicID,
|
|
&i.SecretHash,
|
|
&i.SigningMode,
|
|
&i.LastFiredAt,
|
|
&i.CreatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const hasActiveAgentflowRun = `-- name: HasActiveAgentflowRun :one
|
|
SELECT count(*) > 0 AS has_active FROM agentflow_run
|
|
WHERE agentflow_id = $1 AND status IN ('received', 'executing')
|
|
`
|
|
|
|
// Check if agentflow has a run currently executing (for concurrency policy).
|
|
func (q *Queries) HasActiveAgentflowRun(ctx context.Context, agentflowID pgtype.UUID) (bool, error) {
|
|
row := q.db.QueryRow(ctx, hasActiveAgentflowRun, agentflowID)
|
|
var has_active bool
|
|
err := row.Scan(&has_active)
|
|
return has_active, err
|
|
}
|
|
|
|
const listAgentflowRuns = `-- name: ListAgentflowRuns :many
|
|
SELECT id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at FROM agentflow_run
|
|
WHERE agentflow_id = $1
|
|
ORDER BY created_at DESC
|
|
LIMIT $2 OFFSET $3
|
|
`
|
|
|
|
type ListAgentflowRunsParams struct {
|
|
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
|
Limit int32 `json:"limit"`
|
|
Offset int32 `json:"offset"`
|
|
}
|
|
|
|
func (q *Queries) ListAgentflowRuns(ctx context.Context, arg ListAgentflowRunsParams) ([]AgentflowRun, error) {
|
|
rows, err := q.db.Query(ctx, listAgentflowRuns, arg.AgentflowID, arg.Limit, arg.Offset)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentflowRun{}
|
|
for rows.Next() {
|
|
var i AgentflowRun
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.TriggerID,
|
|
&i.SourceKind,
|
|
&i.Status,
|
|
&i.LinkedIssueID,
|
|
&i.Payload,
|
|
&i.AgentOutput,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.IdempotencyKey,
|
|
&i.CreatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgentflowTriggers = `-- name: ListAgentflowTriggers :many
|
|
SELECT id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at FROM agentflow_trigger
|
|
WHERE agentflow_id = $1
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
func (q *Queries) ListAgentflowTriggers(ctx context.Context, agentflowID pgtype.UUID) ([]AgentflowTrigger, error) {
|
|
rows, err := q.db.Query(ctx, listAgentflowTriggers, agentflowID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []AgentflowTrigger{}
|
|
for rows.Next() {
|
|
var i AgentflowTrigger
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.Kind,
|
|
&i.Enabled,
|
|
&i.CronExpression,
|
|
&i.Timezone,
|
|
&i.NextRunAt,
|
|
&i.PublicID,
|
|
&i.SecretHash,
|
|
&i.SigningMode,
|
|
&i.LastFiredAt,
|
|
&i.CreatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAgentflows = `-- name: ListAgentflows :many
|
|
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow
|
|
WHERE workspace_id = $1 AND status != 'archived'
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListAgentflows(ctx context.Context, workspaceID pgtype.UUID) ([]Agentflow, error) {
|
|
rows, err := q.db.Query(ctx, listAgentflows, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agentflow{}
|
|
for rows.Next() {
|
|
var i Agentflow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Title,
|
|
&i.Description,
|
|
&i.AgentID,
|
|
&i.Status,
|
|
&i.ConcurrencyPolicy,
|
|
&i.Variables,
|
|
&i.CreatedBy,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listAllAgentflows = `-- name: ListAllAgentflows :many
|
|
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow
|
|
WHERE workspace_id = $1
|
|
ORDER BY created_at DESC
|
|
`
|
|
|
|
func (q *Queries) ListAllAgentflows(ctx context.Context, workspaceID pgtype.UUID) ([]Agentflow, error) {
|
|
rows, err := q.db.Query(ctx, listAllAgentflows, workspaceID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
items := []Agentflow{}
|
|
for rows.Next() {
|
|
var i Agentflow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Title,
|
|
&i.Description,
|
|
&i.AgentID,
|
|
&i.Status,
|
|
&i.ConcurrencyPolicy,
|
|
&i.Variables,
|
|
&i.CreatedBy,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const setTriggerNextRunAt = `-- name: SetTriggerNextRunAt :exec
|
|
UPDATE agentflow_trigger SET next_run_at = $2
|
|
WHERE id = $1
|
|
`
|
|
|
|
type SetTriggerNextRunAtParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
|
}
|
|
|
|
func (q *Queries) SetTriggerNextRunAt(ctx context.Context, arg SetTriggerNextRunAtParams) error {
|
|
_, err := q.db.Exec(ctx, setTriggerNextRunAt, arg.ID, arg.NextRunAt)
|
|
return err
|
|
}
|
|
|
|
const updateAgentflow = `-- name: UpdateAgentflow :one
|
|
UPDATE agentflow SET
|
|
title = COALESCE($2, title),
|
|
description = COALESCE($3, description),
|
|
agent_id = COALESCE($4, agent_id),
|
|
status = COALESCE($5, status),
|
|
concurrency_policy = COALESCE($6, concurrency_policy),
|
|
variables = COALESCE($7, variables),
|
|
updated_at = now()
|
|
WHERE id = $1
|
|
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
|
|
`
|
|
|
|
type UpdateAgentflowParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Title pgtype.Text `json:"title"`
|
|
Description pgtype.Text `json:"description"`
|
|
AgentID pgtype.UUID `json:"agent_id"`
|
|
Status pgtype.Text `json:"status"`
|
|
ConcurrencyPolicy pgtype.Text `json:"concurrency_policy"`
|
|
Variables []byte `json:"variables"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgentflow(ctx context.Context, arg UpdateAgentflowParams) (Agentflow, error) {
|
|
row := q.db.QueryRow(ctx, updateAgentflow,
|
|
arg.ID,
|
|
arg.Title,
|
|
arg.Description,
|
|
arg.AgentID,
|
|
arg.Status,
|
|
arg.ConcurrencyPolicy,
|
|
arg.Variables,
|
|
)
|
|
var i Agentflow
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.WorkspaceID,
|
|
&i.Title,
|
|
&i.Description,
|
|
&i.AgentID,
|
|
&i.Status,
|
|
&i.ConcurrencyPolicy,
|
|
&i.Variables,
|
|
&i.CreatedBy,
|
|
&i.CreatedAt,
|
|
&i.UpdatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentflowRunStatus = `-- name: UpdateAgentflowRunStatus :one
|
|
UPDATE agentflow_run SET
|
|
status = $2,
|
|
started_at = CASE WHEN $2 = 'executing' THEN now() ELSE started_at END,
|
|
completed_at = CASE WHEN $2 IN ('completed', 'failed', 'skipped', 'coalesced') THEN now() ELSE completed_at END
|
|
WHERE id = $1
|
|
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
|
|
`
|
|
|
|
type UpdateAgentflowRunStatusParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgentflowRunStatus(ctx context.Context, arg UpdateAgentflowRunStatusParams) (AgentflowRun, error) {
|
|
row := q.db.QueryRow(ctx, updateAgentflowRunStatus, arg.ID, arg.Status)
|
|
var i AgentflowRun
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.TriggerID,
|
|
&i.SourceKind,
|
|
&i.Status,
|
|
&i.LinkedIssueID,
|
|
&i.Payload,
|
|
&i.AgentOutput,
|
|
&i.StartedAt,
|
|
&i.CompletedAt,
|
|
&i.IdempotencyKey,
|
|
&i.CreatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const updateAgentflowTrigger = `-- name: UpdateAgentflowTrigger :one
|
|
UPDATE agentflow_trigger SET
|
|
enabled = COALESCE($2, enabled),
|
|
cron_expression = COALESCE($3, cron_expression),
|
|
timezone = COALESCE($4, timezone),
|
|
next_run_at = COALESCE($5, next_run_at)
|
|
WHERE id = $1
|
|
RETURNING id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at
|
|
`
|
|
|
|
type UpdateAgentflowTriggerParams struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
Enabled pgtype.Bool `json:"enabled"`
|
|
CronExpression pgtype.Text `json:"cron_expression"`
|
|
Timezone pgtype.Text `json:"timezone"`
|
|
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
|
}
|
|
|
|
func (q *Queries) UpdateAgentflowTrigger(ctx context.Context, arg UpdateAgentflowTriggerParams) (AgentflowTrigger, error) {
|
|
row := q.db.QueryRow(ctx, updateAgentflowTrigger,
|
|
arg.ID,
|
|
arg.Enabled,
|
|
arg.CronExpression,
|
|
arg.Timezone,
|
|
arg.NextRunAt,
|
|
)
|
|
var i AgentflowTrigger
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.AgentflowID,
|
|
&i.Kind,
|
|
&i.Enabled,
|
|
&i.CronExpression,
|
|
&i.Timezone,
|
|
&i.NextRunAt,
|
|
&i.PublicID,
|
|
&i.SecretHash,
|
|
&i.SigningMode,
|
|
&i.LastFiredAt,
|
|
&i.CreatedAt,
|
|
)
|
|
return i, err
|
|
}
|