Compare commits

...

1 Commits

Author SHA1 Message Date
Devv
ee58915168 feat(agentflow): add Agentflow MVP — scheduled agent tasks
Implements the Agentflow feature end-to-end:

**Database** (migration 032):
- `agentflow` table (title, description/prompt, agent, status, concurrency policy)
- `agentflow_trigger` table (schedule/webhook/api with cron + timezone)
- `agentflow_run` table (execution history with status tracking)
- Extends `agent_task_queue` with nullable `issue_id` and `agentflow_run_id`

**Backend**:
- CRUD API for agentflows, triggers, and runs
- Server-side scheduler goroutine (30s polling, CAS-safe trigger claiming)
- Cron expression parsing via robfig/cron/v3 with timezone support
- Concurrency policies: skip_if_active, coalesce, always_run
- Manual trigger endpoint (POST /agentflows/:id/run)
- Task service extended with EnqueueTaskForAgentflow()
- Broadcast methods handle nullable issue_id for agentflow tasks

**Daemon**:
- Task type extended with agentflow context (title, description, run_id)
- BuildPrompt generates agentflow-specific prompts
- Execution environment writes agentflow context to CLAUDE.md
- Claim response includes agentflow data for workspace/repo resolution

**Frontend**:
- Sidebar entry (Agentflows with Zap icon)
- List + detail page with triggers and run history tabs
- Create dialog with agent selection, cron, timezone, concurrency policy
- Pause/activate toggle and manual run button
- Zustand store for agentflow state management
- TypeScript types and API client methods

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-07 03:04:47 -07:00
30 changed files with 2639 additions and 69 deletions

View File

@@ -15,6 +15,7 @@ import {
BookOpenText,
SquarePen,
CircleUser,
Zap,
} from "lucide-react";
import { WorkspaceAvatar } from "@/features/workspace";
import { useIssueDraftStore } from "@/features/issues/stores/draft-store";
@@ -53,6 +54,7 @@ const primaryNav = [
const workspaceNav = [
{ href: "/agents", label: "Agents", icon: Bot },
{ href: "/agentflows", label: "Agentflows", icon: Zap },
{ href: "/runtimes", label: "Runtimes", icon: Monitor },
{ href: "/skills", label: "Skills", icon: BookOpenText },
{ href: "/settings", label: "Settings", icon: Settings },

View File

@@ -0,0 +1,7 @@
"use client";
import { AgentflowsPage } from "@/features/agentflows";
export default function Page() {
return <AgentflowsPage />;
}

View File

@@ -0,0 +1,453 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import { Plus, Play, Pause, Clock, Zap, Trash2, MoreHorizontal, Loader2 } from "lucide-react";
import { Button } from "@/components/ui/button";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { Textarea } from "@/components/ui/textarea";
import { Badge } from "@/components/ui/badge";
import {
Dialog,
DialogContent,
DialogHeader,
DialogTitle,
DialogFooter,
} from "@/components/ui/dialog";
import {
DropdownMenu,
DropdownMenuTrigger,
DropdownMenuContent,
DropdownMenuItem,
} from "@/components/ui/dropdown-menu";
import {
Select,
SelectContent,
SelectItem,
SelectTrigger,
SelectValue,
} from "@/components/ui/select";
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
import { toast } from "sonner";
import { api } from "@/shared/api";
import { useWorkspaceStore } from "@/features/workspace";
import { useAgentflowStore } from "../store";
import type {
Agentflow,
AgentflowTrigger,
AgentflowRun,
AgentflowConcurrencyPolicy,
} from "@/shared/types";
function formatDate(dateStr: string | null): string {
if (!dateStr) return "—";
return new Date(dateStr).toLocaleString();
}
function StatusBadge({ status }: { status: string }) {
const variants: Record<string, string> = {
active: "bg-green-100 text-green-800",
paused: "bg-yellow-100 text-yellow-800",
archived: "bg-gray-100 text-gray-600",
};
return (
<span className={`inline-flex items-center rounded-full px-2 py-0.5 text-xs font-medium ${variants[status] ?? "bg-gray-100 text-gray-600"}`}>
{status}
</span>
);
}
function RunStatusBadge({ status }: { status: string }) {
const variants: Record<string, string> = {
received: "bg-blue-100 text-blue-800",
executing: "bg-yellow-100 text-yellow-800",
completed: "bg-green-100 text-green-800",
failed: "bg-red-100 text-red-800",
skipped: "bg-gray-100 text-gray-600",
coalesced: "bg-gray-100 text-gray-600",
};
return (
<span className={`inline-flex items-center rounded-full px-2 py-0.5 text-xs font-medium ${variants[status] ?? "bg-gray-100 text-gray-600"}`}>
{status}
</span>
);
}
// ── Create Dialog ──────────────────────────────────────────────
function CreateAgentflowDialog({
open,
onClose,
}: {
open: boolean;
onClose: () => void;
}) {
const agents = useWorkspaceStore((s) => s.agents);
const addAgentflow = useAgentflowStore((s) => s.addAgentflow);
const [title, setTitle] = useState("");
const [description, setDescription] = useState("");
const [agentId, setAgentId] = useState("");
const [concurrencyPolicy, setConcurrencyPolicy] = useState<AgentflowConcurrencyPolicy>("skip_if_active");
const [cronExpression, setCronExpression] = useState("");
const [timezone, setTimezone] = useState(Intl.DateTimeFormat().resolvedOptions().timeZone);
const [creating, setCreating] = useState(false);
const handleCreate = async () => {
if (!title.trim() || !agentId) return;
setCreating(true);
try {
const af = await api.createAgentflow({
title: title.trim(),
description: description.trim() || undefined,
agent_id: agentId,
concurrency_policy: concurrencyPolicy,
});
addAgentflow(af);
// Create schedule trigger if cron was provided
if (cronExpression.trim()) {
await api.createAgentflowTrigger(af.id, {
kind: "schedule",
cron_expression: cronExpression.trim(),
timezone,
});
}
toast.success("Agentflow created");
onClose();
setTitle("");
setDescription("");
setAgentId("");
setCronExpression("");
} catch {
toast.error("Failed to create agentflow");
} finally {
setCreating(false);
}
};
return (
<Dialog open={open} onOpenChange={(v) => !v && onClose()}>
<DialogContent className="sm:max-w-lg">
<DialogHeader>
<DialogTitle>Create Agentflow</DialogTitle>
</DialogHeader>
<div className="space-y-4 py-2">
<div className="space-y-2">
<Label>Title</Label>
<Input
placeholder="e.g. Daily SSL Check"
value={title}
onChange={(e) => setTitle(e.target.value)}
/>
</div>
<div className="space-y-2">
<Label>Agent</Label>
<Select value={agentId} onValueChange={(v) => v && setAgentId(v)}>
<SelectTrigger>
<SelectValue placeholder="Select an agent" />
</SelectTrigger>
<SelectContent>
{agents
.filter((a) => !a.archived_at)
.map((a) => (
<SelectItem key={a.id} value={a.id}>
{a.name}
</SelectItem>
))}
</SelectContent>
</Select>
</div>
<div className="space-y-2">
<Label>Prompt / Instructions</Label>
<Textarea
placeholder="What should the agent do when this agentflow triggers?"
rows={4}
value={description}
onChange={(e) => setDescription(e.target.value)}
/>
</div>
<div className="space-y-2">
<Label>Schedule (cron expression)</Label>
<Input
placeholder="e.g. 0 10 * * 1-5 (weekdays at 10am)"
value={cronExpression}
onChange={(e) => setCronExpression(e.target.value)}
/>
<p className="text-xs text-muted-foreground">
Standard 5-field cron format. Leave empty to trigger manually only.
</p>
</div>
<div className="space-y-2">
<Label>Timezone</Label>
<Input
value={timezone}
onChange={(e) => setTimezone(e.target.value)}
/>
</div>
<div className="space-y-2">
<Label>Concurrency Policy</Label>
<Select value={concurrencyPolicy} onValueChange={(v) => setConcurrencyPolicy(v as AgentflowConcurrencyPolicy)}>
<SelectTrigger>
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="skip_if_active">Skip if active</SelectItem>
<SelectItem value="coalesce">Coalesce</SelectItem>
<SelectItem value="always_run">Always run</SelectItem>
</SelectContent>
</Select>
</div>
</div>
<DialogFooter>
<Button variant="outline" onClick={onClose}>Cancel</Button>
<Button onClick={handleCreate} disabled={creating || !title.trim() || !agentId}>
{creating ? <Loader2 className="h-4 w-4 animate-spin mr-2" /> : null}
Create
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
);
}
// ── Detail Panel ──────────────────────────────────────────────
function AgentflowDetail({ agentflow }: { agentflow: Agentflow }) {
const agents = useWorkspaceStore((s) => s.agents);
const { triggers, runs, fetchTriggers, fetchRuns, updateAgentflow } = useAgentflowStore();
const afTriggers = triggers[agentflow.id] ?? [];
const afRuns = runs[agentflow.id] ?? [];
const agent = agents.find((a) => a.id === agentflow.agent_id);
useEffect(() => {
fetchTriggers(agentflow.id);
fetchRuns(agentflow.id);
}, [agentflow.id, fetchTriggers, fetchRuns]);
const handleRun = async () => {
try {
await api.runAgentflow(agentflow.id);
toast.success("Agentflow triggered");
fetchRuns(agentflow.id);
} catch {
toast.error("Failed to trigger agentflow");
}
};
const handleToggleStatus = async () => {
const newStatus = agentflow.status === "active" ? "paused" : "active";
try {
const updated = await api.updateAgentflow(agentflow.id, { status: newStatus });
updateAgentflow(agentflow.id, updated);
toast.success(`Agentflow ${newStatus}`);
} catch {
toast.error("Failed to update status");
}
};
return (
<div className="flex flex-col h-full">
<div className="flex items-center justify-between p-4 border-b">
<div>
<h2 className="text-lg font-semibold">{agentflow.title}</h2>
<p className="text-sm text-muted-foreground">
Agent: {agent?.name ?? "Unknown"} · <StatusBadge status={agentflow.status} />
</p>
</div>
<div className="flex items-center gap-2">
<Button variant="outline" size="sm" onClick={handleToggleStatus}>
{agentflow.status === "active" ? (
<><Pause className="h-4 w-4 mr-1" /> Pause</>
) : (
<><Play className="h-4 w-4 mr-1" /> Activate</>
)}
</Button>
<Button size="sm" onClick={handleRun}>
<Zap className="h-4 w-4 mr-1" /> Run Now
</Button>
</div>
</div>
<Tabs defaultValue="triggers" className="flex-1 overflow-hidden flex flex-col">
<TabsList className="mx-4 mt-2">
<TabsTrigger value="triggers">Triggers</TabsTrigger>
<TabsTrigger value="runs">Runs</TabsTrigger>
<TabsTrigger value="details">Details</TabsTrigger>
</TabsList>
<TabsContent value="triggers" className="flex-1 overflow-auto p-4">
{afTriggers.length === 0 ? (
<p className="text-sm text-muted-foreground">No triggers configured. This agentflow can only be triggered manually.</p>
) : (
<div className="space-y-3">
{afTriggers.map((t) => (
<TriggerCard key={t.id} trigger={t} onRefresh={() => fetchTriggers(agentflow.id)} />
))}
</div>
)}
</TabsContent>
<TabsContent value="runs" className="flex-1 overflow-auto p-4">
{afRuns.length === 0 ? (
<p className="text-sm text-muted-foreground">No runs yet.</p>
) : (
<div className="space-y-2">
{afRuns.map((r) => (
<div key={r.id} className="flex items-center justify-between rounded-md border px-3 py-2 text-sm">
<div className="flex items-center gap-2">
<RunStatusBadge status={r.status} />
<span className="text-muted-foreground">{r.source_kind}</span>
</div>
<div className="text-muted-foreground text-xs">
{formatDate(r.created_at)}
</div>
</div>
))}
</div>
)}
</TabsContent>
<TabsContent value="details" className="flex-1 overflow-auto p-4">
<div className="space-y-4">
<div>
<Label className="text-xs text-muted-foreground">Prompt / Instructions</Label>
<p className="mt-1 text-sm whitespace-pre-wrap">{agentflow.description || "No description"}</p>
</div>
<div>
<Label className="text-xs text-muted-foreground">Concurrency Policy</Label>
<p className="mt-1 text-sm">{agentflow.concurrency_policy}</p>
</div>
<div>
<Label className="text-xs text-muted-foreground">Created</Label>
<p className="mt-1 text-sm">{formatDate(agentflow.created_at)}</p>
</div>
</div>
</TabsContent>
</Tabs>
</div>
);
}
function TriggerCard({ trigger, onRefresh }: { trigger: AgentflowTrigger; onRefresh: () => void }) {
const handleToggle = async () => {
try {
await api.updateAgentflowTrigger(trigger.id, { enabled: !trigger.enabled });
onRefresh();
} catch {
toast.error("Failed to update trigger");
}
};
const handleDelete = async () => {
try {
await api.deleteAgentflowTrigger(trigger.id);
toast.success("Trigger deleted");
onRefresh();
} catch {
toast.error("Failed to delete trigger");
}
};
return (
<div className="flex items-center justify-between rounded-md border px-3 py-2">
<div className="flex items-center gap-3">
<Clock className="h-4 w-4 text-muted-foreground" />
<div>
<p className="text-sm font-medium">{trigger.kind}: {trigger.cron_expression ?? "—"}</p>
<p className="text-xs text-muted-foreground">
{trigger.timezone ?? "UTC"} · Next: {formatDate(trigger.next_run_at)} · {trigger.enabled ? "Enabled" : "Disabled"}
</p>
</div>
</div>
<div className="flex items-center gap-1">
<Button variant="ghost" size="sm" onClick={handleToggle}>
{trigger.enabled ? "Disable" : "Enable"}
</Button>
<Button variant="ghost" size="sm" onClick={handleDelete}>
<Trash2 className="h-4 w-4 text-destructive" />
</Button>
</div>
</div>
);
}
// ── Main Page ──────────────────────────────────────────────────
export function AgentflowsPage() {
const { agentflows, loading, activeAgentflowId, fetch, setActiveAgentflow } = useAgentflowStore();
const agents = useWorkspaceStore((s) => s.agents);
const [showCreate, setShowCreate] = useState(false);
useEffect(() => {
fetch();
}, [fetch]);
const activeAgentflow = agentflows.find((af) => af.id === activeAgentflowId) ?? null;
return (
<div className="flex h-full">
{/* List panel */}
<div className="w-80 border-r flex flex-col">
<div className="flex items-center justify-between p-4 border-b">
<h1 className="text-lg font-semibold">Agentflows</h1>
<Button size="sm" onClick={() => setShowCreate(true)}>
<Plus className="h-4 w-4 mr-1" /> New
</Button>
</div>
<div className="flex-1 overflow-auto">
{loading ? (
<div className="flex items-center justify-center py-8">
<Loader2 className="h-5 w-5 animate-spin text-muted-foreground" />
</div>
) : agentflows.length === 0 ? (
<div className="flex flex-col items-center justify-center py-8 text-center px-4">
<Zap className="h-8 w-8 text-muted-foreground mb-2" />
<p className="text-sm text-muted-foreground">No agentflows yet</p>
<p className="text-xs text-muted-foreground mt-1">Create one to schedule recurring agent tasks</p>
</div>
) : (
<div className="divide-y">
{agentflows.map((af) => {
const agent = agents.find((a) => a.id === af.agent_id);
return (
<button
key={af.id}
onClick={() => setActiveAgentflow(af.id)}
className={`w-full text-left px-4 py-3 hover:bg-accent/50 transition-colors ${
activeAgentflowId === af.id ? "bg-accent" : ""
}`}
>
<div className="flex items-center justify-between">
<span className="text-sm font-medium truncate">{af.title}</span>
<StatusBadge status={af.status} />
</div>
<p className="text-xs text-muted-foreground mt-0.5">
{agent?.name ?? "Unknown agent"}
</p>
</button>
);
})}
</div>
)}
</div>
</div>
{/* Detail panel */}
<div className="flex-1">
{activeAgentflow ? (
<AgentflowDetail agentflow={activeAgentflow} />
) : (
<div className="flex items-center justify-center h-full text-muted-foreground text-sm">
Select an agentflow to view details
</div>
)}
</div>
<CreateAgentflowDialog open={showCreate} onClose={() => setShowCreate(false)} />
</div>
);
}

View File

@@ -0,0 +1,2 @@
export { useAgentflowStore } from "./store";
export { AgentflowsPage } from "./components/agentflows-page";

View File

@@ -0,0 +1,79 @@
"use client";
import { create } from "zustand";
import { api } from "@/shared/api";
import type { Agentflow, AgentflowTrigger, AgentflowRun } from "@/shared/types";
interface AgentflowState {
agentflows: Agentflow[];
loading: boolean;
activeAgentflowId: string | null;
triggers: Record<string, AgentflowTrigger[]>;
runs: Record<string, AgentflowRun[]>;
fetch: () => Promise<void>;
setActiveAgentflow: (id: string | null) => void;
addAgentflow: (af: Agentflow) => void;
updateAgentflow: (id: string, updates: Partial<Agentflow>) => void;
removeAgentflow: (id: string) => void;
fetchTriggers: (agentflowId: string) => Promise<void>;
fetchRuns: (agentflowId: string) => Promise<void>;
}
export const useAgentflowStore = create<AgentflowState>((set, get) => ({
agentflows: [],
loading: false,
activeAgentflowId: null,
triggers: {},
runs: {},
fetch: async () => {
set({ loading: true });
try {
const agentflows = await api.listAgentflows();
set({ agentflows, loading: false });
} catch {
set({ loading: false });
}
},
setActiveAgentflow: (id) => set({ activeAgentflowId: id }),
addAgentflow: (af) =>
set((state) => ({ agentflows: [af, ...state.agentflows] })),
updateAgentflow: (id, updates) =>
set((state) => ({
agentflows: state.agentflows.map((af) =>
af.id === id ? { ...af, ...updates } : af
),
})),
removeAgentflow: (id) =>
set((state) => ({
agentflows: state.agentflows.filter((af) => af.id !== id),
})),
fetchTriggers: async (agentflowId) => {
try {
const triggers = await api.listAgentflowTriggers(agentflowId);
set((state) => ({
triggers: { ...state.triggers, [agentflowId]: triggers },
}));
} catch {
// ignore
}
},
fetchRuns: async (agentflowId) => {
try {
const runs = await api.listAgentflowRuns(agentflowId, { limit: 50 });
set((state) => ({
runs: { ...state.runs, [agentflowId]: runs },
}));
} catch {
// ignore
}
},
}));

View File

@@ -35,6 +35,13 @@ import type {
TimelineEntry,
TaskMessagePayload,
Attachment,
Agentflow,
AgentflowTrigger,
AgentflowRun,
CreateAgentflowRequest,
UpdateAgentflowRequest,
CreateAgentflowTriggerRequest,
UpdateAgentflowTriggerRequest,
} from "@/shared/types";
import { type Logger, noopLogger } from "@/shared/logger";
@@ -579,4 +586,69 @@ export class ApiClient {
async deleteAttachment(id: string): Promise<void> {
await this.fetch(`/api/attachments/${id}`, { method: "DELETE" });
}
// ── Agentflows ──────────────────────────────────────────────────
async listAgentflows(): Promise<Agentflow[]> {
return this.fetch("/api/agentflows");
}
async getAgentflow(id: string): Promise<Agentflow> {
return this.fetch(`/api/agentflows/${id}`);
}
async createAgentflow(data: CreateAgentflowRequest): Promise<Agentflow> {
return this.fetch("/api/agentflows", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(data),
});
}
async updateAgentflow(id: string, data: UpdateAgentflowRequest): Promise<Agentflow> {
return this.fetch(`/api/agentflows/${id}`, {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(data),
});
}
async archiveAgentflow(id: string): Promise<Agentflow> {
return this.fetch(`/api/agentflows/${id}/archive`, { method: "POST" });
}
async listAgentflowTriggers(agentflowId: string): Promise<AgentflowTrigger[]> {
return this.fetch(`/api/agentflows/${agentflowId}/triggers`);
}
async createAgentflowTrigger(agentflowId: string, data: CreateAgentflowTriggerRequest): Promise<AgentflowTrigger> {
return this.fetch(`/api/agentflows/${agentflowId}/triggers`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(data),
});
}
async updateAgentflowTrigger(triggerId: string, data: UpdateAgentflowTriggerRequest): Promise<AgentflowTrigger> {
return this.fetch(`/api/agentflow-triggers/${triggerId}`, {
method: "PUT",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(data),
});
}
async deleteAgentflowTrigger(triggerId: string): Promise<void> {
await this.fetch(`/api/agentflow-triggers/${triggerId}`, { method: "DELETE" });
}
async listAgentflowRuns(agentflowId: string, params?: { limit?: number; offset?: number }): Promise<AgentflowRun[]> {
const search = new URLSearchParams();
if (params?.limit) search.set("limit", String(params.limit));
if (params?.offset) search.set("offset", String(params.offset));
return this.fetch(`/api/agentflows/${agentflowId}/runs?${search}`);
}
async runAgentflow(id: string): Promise<AgentflowRun> {
return this.fetch(`/api/agentflows/${id}/run`, { method: "POST" });
}
}

View File

@@ -0,0 +1,79 @@
export type AgentflowStatus = "active" | "paused" | "archived";
export type AgentflowConcurrencyPolicy = "skip_if_active" | "coalesce" | "always_run";
export type AgentflowTriggerKind = "schedule" | "webhook" | "api";
export type AgentflowRunStatus = "received" | "executing" | "completed" | "failed" | "skipped" | "coalesced";
export type AgentflowRunSourceKind = "schedule" | "webhook" | "api" | "manual";
export interface Agentflow {
id: string;
workspace_id: string;
title: string;
description: string | null;
agent_id: string;
status: AgentflowStatus;
concurrency_policy: AgentflowConcurrencyPolicy;
variables: unknown[];
created_by: string;
created_at: string;
updated_at: string;
}
export interface AgentflowTrigger {
id: string;
agentflow_id: string;
kind: AgentflowTriggerKind;
enabled: boolean;
cron_expression: string | null;
timezone: string | null;
next_run_at: string | null;
public_id?: string | null;
last_fired_at: string | null;
created_at: string;
}
export interface AgentflowRun {
id: string;
agentflow_id: string;
trigger_id: string | null;
source_kind: AgentflowRunSourceKind;
status: AgentflowRunStatus;
linked_issue_id: string | null;
payload: unknown | null;
agent_output: string | null;
started_at: string | null;
completed_at: string | null;
created_at: string;
}
export interface CreateAgentflowRequest {
title: string;
description?: string;
agent_id: string;
status?: AgentflowStatus;
concurrency_policy?: AgentflowConcurrencyPolicy;
variables?: unknown[];
}
export interface UpdateAgentflowRequest {
title?: string;
description?: string;
agent_id?: string;
status?: AgentflowStatus;
concurrency_policy?: AgentflowConcurrencyPolicy;
variables?: unknown[];
}
export interface CreateAgentflowTriggerRequest {
kind: AgentflowTriggerKind;
enabled?: boolean;
cron_expression?: string;
timezone?: string;
next_run_at?: string;
}
export interface UpdateAgentflowTriggerRequest {
enabled?: boolean;
cron_expression?: string;
timezone?: string;
next_run_at?: string;
}

View File

@@ -32,3 +32,17 @@ export type { IssueSubscriber } from "./subscriber";
export type * from "./events";
export type * from "./api";
export type { Attachment } from "./attachment";
export type {
Agentflow,
AgentflowStatus,
AgentflowConcurrencyPolicy,
AgentflowTrigger,
AgentflowTriggerKind,
AgentflowRun,
AgentflowRunStatus,
AgentflowRunSourceKind,
CreateAgentflowRequest,
UpdateAgentflowRequest,
CreateAgentflowTriggerRequest,
UpdateAgentflowTriggerRequest,
} from "./agentflow";

View File

@@ -0,0 +1,156 @@
package main
import (
"context"
"log/slog"
"time"
"github.com/jackc/pgx/v5/pgtype"
"github.com/robfig/cron/v3"
"github.com/multica-ai/multica/server/internal/events"
"github.com/multica-ai/multica/server/internal/service"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
const (
// agentflowScheduleInterval is how often we check for due triggers.
agentflowScheduleInterval = 30 * time.Second
)
// runAgentflowScheduler periodically checks for schedule triggers that are due
// and dispatches agentflow runs. Modeled after runRuntimeSweeper.
func runAgentflowScheduler(ctx context.Context, queries *db.Queries, bus *events.Bus) {
taskSvc := service.NewTaskService(queries, nil, bus)
ticker := time.NewTicker(agentflowScheduleInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
tickScheduledTriggers(ctx, queries, taskSvc)
}
}
}
// tickScheduledTriggers atomically claims all due schedule triggers and
// dispatches agentflow runs for each.
func tickScheduledTriggers(ctx context.Context, queries *db.Queries, taskSvc *service.TaskService) {
dueTriggers, err := queries.ClaimDueScheduleTriggers(ctx)
if err != nil {
slog.Warn("agentflow scheduler: failed to claim due triggers", "error", err)
return
}
if len(dueTriggers) == 0 {
return
}
slog.Info("agentflow scheduler: processing due triggers", "count", len(dueTriggers))
for _, t := range dueTriggers {
dispatchScheduledTrigger(ctx, queries, taskSvc, t)
}
}
// dispatchScheduledTrigger creates an agentflow run, enqueues a task,
// and recalculates the trigger's next_run_at.
func dispatchScheduledTrigger(ctx context.Context, queries *db.Queries, taskSvc *service.TaskService, t db.ClaimDueScheduleTriggersRow) {
afID := util.UUIDToString(t.AgentflowID)
triggerID := util.UUIDToString(t.ID)
// Check concurrency policy
if t.ConcurrencyPolicy == "skip_if_active" || t.ConcurrencyPolicy == "coalesce" {
hasActive, err := queries.HasActiveAgentflowRun(ctx, t.AgentflowID)
if err == nil && hasActive {
status := "skipped"
if t.ConcurrencyPolicy == "coalesce" {
status = "coalesced"
}
slog.Info("agentflow scheduler: "+status+" due to concurrency policy",
"agentflow_id", afID, "trigger_id", triggerID, "policy", t.ConcurrencyPolicy)
queries.CreateAgentflowRun(ctx, db.CreateAgentflowRunParams{
AgentflowID: t.AgentflowID,
TriggerID: t.ID,
SourceKind: "schedule",
Status: status,
})
// Still recalculate next_run_at
recalcNextRun(ctx, queries, t)
return
}
}
// Create the run
run, err := queries.CreateAgentflowRun(ctx, db.CreateAgentflowRunParams{
AgentflowID: t.AgentflowID,
TriggerID: t.ID,
SourceKind: "schedule",
Status: "received",
})
if err != nil {
slog.Error("agentflow scheduler: failed to create run", "agentflow_id", afID, "error", err)
recalcNextRun(ctx, queries, t)
return
}
// Build a minimal Agentflow struct for the task service
af := db.Agentflow{
ID: t.AgentflowID,
WorkspaceID: t.WorkspaceID,
AgentID: t.AgentID,
Title: t.AgentflowTitle,
ConcurrencyPolicy: t.ConcurrencyPolicy,
}
if t.AgentflowDescription.Valid {
af.Description = t.AgentflowDescription
}
if err := taskSvc.EnqueueTaskForAgentflow(ctx, af, run); err != nil {
slog.Error("agentflow scheduler: failed to enqueue task",
"agentflow_id", afID, "run_id", util.UUIDToString(run.ID), "error", err)
queries.UpdateAgentflowRunStatus(ctx, db.UpdateAgentflowRunStatusParams{
ID: run.ID,
Status: "failed",
})
}
// Recalculate next_run_at
recalcNextRun(ctx, queries, t)
}
// recalcNextRun parses the cron expression and sets the next run time.
func recalcNextRun(ctx context.Context, queries *db.Queries, t db.ClaimDueScheduleTriggersRow) {
if !t.CronExpression.Valid || t.CronExpression.String == "" {
return
}
// Parse timezone
loc := time.UTC
if t.Timezone.Valid && t.Timezone.String != "" {
if parsed, err := time.LoadLocation(t.Timezone.String); err == nil {
loc = parsed
}
}
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
schedule, err := parser.Parse(t.CronExpression.String)
if err != nil {
slog.Error("agentflow scheduler: invalid cron expression",
"trigger_id", util.UUIDToString(t.ID),
"cron", t.CronExpression.String,
"error", err)
return
}
now := time.Now().In(loc)
next := schedule.Next(now)
queries.SetTriggerNextRunAt(ctx, db.SetTriggerNextRunAtParams{
ID: t.ID,
NextRunAt: pgtype.Timestamptz{Time: next, Valid: true},
})
}

View File

@@ -69,6 +69,9 @@ func main() {
sweepCtx, sweepCancel := context.WithCancel(context.Background())
go runRuntimeSweeper(sweepCtx, queries, bus)
// Start agentflow scheduler to dispatch due scheduled triggers.
go runAgentflowScheduler(sweepCtx, queries, bus)
// Graceful shutdown
go func() {
slog.Info("server starting", "port", port)

View File

@@ -220,6 +220,25 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
})
})
// Agentflows
r.Route("/api/agentflows", func(r chi.Router) {
r.Get("/", h.ListAgentflows)
r.With(middleware.RequireWorkspaceRole(queries, "owner", "admin")).Post("/", h.CreateAgentflow)
r.Route("/{id}", func(r chi.Router) {
r.Get("/", h.GetAgentflow)
r.Put("/", h.UpdateAgentflow)
r.Post("/archive", h.ArchiveAgentflow)
r.Get("/triggers", h.ListAgentflowTriggers)
r.Post("/triggers", h.CreateAgentflowTrigger)
r.Get("/runs", h.ListAgentflowRuns)
r.Post("/run", h.RunAgentflow)
})
})
r.Route("/api/agentflow-triggers/{triggerId}", func(r chi.Router) {
r.Put("/", h.UpdateAgentflowTrigger)
r.Delete("/", h.DeleteAgentflowTrigger)
})
// Runtimes
r.Route("/api/runtimes", func(r chi.Router) {
r.Get("/", h.ListAgentRuntimes)

View File

@@ -38,6 +38,7 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/spf13/pflag v1.0.9 // indirect
golang.org/x/sync v0.20.0 // indirect
golang.org/x/text v0.35.0 // indirect

View File

@@ -66,6 +66,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/resend/resend-go/v2 v2.28.0 h1:ttM1/VZR4fApBv3xI1TneSKi1pbfFsVrq7fXFlHKtj4=
github.com/resend/resend-go/v2 v2.28.0/go.mod h1:3YCb8c8+pLiqhtRFXTyFwlLvfjQtluxOr9HEh2BwCkQ=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4=

View File

@@ -728,7 +728,7 @@ func (d *Daemon) pollLoop(ctx context.Context) error {
continue
}
if task != nil {
d.logger.Info("task received", "task", shortID(task.ID), "issue", task.IssueID)
d.logger.Info("task received", "task", shortID(task.ID), "issue", task.EffectiveIssueID())
wg.Add(1)
go func(t Task) {
defer wg.Done()
@@ -772,7 +772,7 @@ func (d *Daemon) handleTask(ctx context.Context, task Task) {
if task.Agent != nil {
agentName = task.Agent.Name
}
taskLog.Info("picked task", "issue", task.IssueID, "agent", agentName, "provider", provider)
taskLog.Info("picked task", "issue", task.EffectiveIssueID(), "agent", agentName, "provider", provider, "agentflow", task.IsAgentflow())
if err := d.client.StartTask(ctx, task.ID); err != nil {
taskLog.Error("start task failed", "error", err)
@@ -872,13 +872,20 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
// Repos are passed as metadata only — the agent checks them out on demand
// via `multica repo checkout <url>`.
taskCtx := execenv.TaskContextForEnv{
IssueID: task.IssueID,
IssueID: task.EffectiveIssueID(),
TriggerCommentID: task.TriggerCommentID,
AgentName: agentName,
AgentInstructions: instructions,
AgentSkills: convertSkillsForEnv(skills),
Repos: convertReposForEnv(task.Repos),
}
if task.IsAgentflow() {
taskCtx.IsAgentflow = true
taskCtx.AgentflowTitle = task.Agentflow.Title
if task.Agentflow.Description != nil {
taskCtx.AgentflowDescription = *task.Agentflow.Description
}
}
// Try to reuse the workdir from a previous task on the same (agent, issue) pair.
var env *execenv.Environment

View File

@@ -23,7 +23,7 @@ func TestBuildPromptContainsIssueID(t *testing.T) {
issueID := "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
prompt := BuildPrompt(Task{
IssueID: issueID,
IssueID: &issueID,
Agent: &AgentData{
Name: "Local Codex",
Skills: []SkillData{
@@ -53,8 +53,9 @@ func TestBuildPromptContainsIssueID(t *testing.T) {
func TestBuildPromptNoIssueDetails(t *testing.T) {
t.Parallel()
testID := "test-id"
prompt := BuildPrompt(Task{
IssueID: "test-id",
IssueID: &testID,
Agent: &AgentData{Name: "Test"},
})

View File

@@ -114,18 +114,29 @@ func writeSkillFiles(skillsDir string, skills []SkillContextForEnv) error {
func renderIssueContext(provider string, ctx TaskContextForEnv) string {
var b strings.Builder
b.WriteString("# Task Assignment\n\n")
fmt.Fprintf(&b, "**Issue ID:** %s\n\n", ctx.IssueID)
if ctx.TriggerCommentID != "" {
b.WriteString("**Trigger:** Comment Reply\n")
b.WriteString("**Triggering comment ID:** `" + ctx.TriggerCommentID + "`\n\n")
if ctx.IsAgentflow {
b.WriteString("# Agentflow Task\n\n")
fmt.Fprintf(&b, "**Agentflow:** %s\n\n", ctx.AgentflowTitle)
if ctx.AgentflowDescription != "" {
b.WriteString("## Instructions\n\n")
b.WriteString(ctx.AgentflowDescription)
b.WriteString("\n\n")
}
b.WriteString("Execute the instructions above. If the work requires tracking, create an issue using `multica issue create`.\n\n")
} else {
b.WriteString("**Trigger:** New Assignment\n\n")
}
b.WriteString("# Task Assignment\n\n")
fmt.Fprintf(&b, "**Issue ID:** %s\n\n", ctx.IssueID)
b.WriteString("## Quick Start\n\n")
fmt.Fprintf(&b, "Run `multica issue get %s --output json` to fetch the full issue details.\n\n", ctx.IssueID)
if ctx.TriggerCommentID != "" {
b.WriteString("**Trigger:** Comment Reply\n")
b.WriteString("**Triggering comment ID:** `" + ctx.TriggerCommentID + "`\n\n")
} else {
b.WriteString("**Trigger:** New Assignment\n\n")
}
b.WriteString("## Quick Start\n\n")
fmt.Fprintf(&b, "Run `multica issue get %s --output json` to fetch the full issue details.\n\n", ctx.IssueID)
}
if len(ctx.AgentSkills) > 0 {
b.WriteString("## Agent Skills\n\n")

View File

@@ -34,6 +34,11 @@ type TaskContextForEnv struct {
AgentInstructions string // agent identity/persona instructions, injected into CLAUDE.md
AgentSkills []SkillContextForEnv
Repos []RepoContextForEnv // workspace repos available for checkout
// Agentflow fields (set when task is triggered by an agentflow)
IsAgentflow bool
AgentflowTitle string
AgentflowDescription string // prompt template
}
// SkillContextForEnv represents a skill to be written into the execution environment.

View File

@@ -78,7 +78,19 @@ func buildMetaSkillContent(provider string, ctx TaskContextForEnv) string {
b.WriteString("### Workflow\n\n")
if ctx.TriggerCommentID != "" {
if ctx.IsAgentflow {
// Agentflow-triggered: execute the prompt template
fmt.Fprintf(&b, "**This task was triggered by Agentflow: %s**\n\n", ctx.AgentflowTitle)
if ctx.AgentflowDescription != "" {
b.WriteString("## Instructions\n\n")
b.WriteString(ctx.AgentflowDescription)
b.WriteString("\n\n")
}
b.WriteString("Execute the instructions above.\n")
b.WriteString("- If the work requires tracking, create an issue using `multica issue create --title \"...\" --description \"...\"`\n")
b.WriteString("- If the work involves code changes, use `multica repo checkout <url>` to check out a repository\n")
b.WriteString("- Post results as appropriate (comments on related issues, new issues, etc.)\n\n")
} else if ctx.TriggerCommentID != "" {
// Comment-triggered: focus on reading and replying
b.WriteString("**This task was triggered by a comment.** Your primary job is to respond.\n\n")
fmt.Fprintf(&b, "1. Run `multica issue get %s --output json` to understand the issue context\n", ctx.IssueID)

View File

@@ -11,7 +11,21 @@ import (
func BuildPrompt(task Task) string {
var b strings.Builder
b.WriteString("You are running as a local coding agent for a Multica workspace.\n\n")
fmt.Fprintf(&b, "Your assigned issue ID is: %s\n\n", task.IssueID)
fmt.Fprintf(&b, "Start by running `multica issue get %s --output json` to understand your task, then complete it.\n", task.IssueID)
if task.IsAgentflow() {
af := task.Agentflow
fmt.Fprintf(&b, "This task was triggered by Agentflow: **%s**\n\n", af.Title)
if af.Description != nil && *af.Description != "" {
b.WriteString("## Instructions\n\n")
b.WriteString(*af.Description)
b.WriteString("\n\n")
}
b.WriteString("Execute the instructions above. If the work requires tracking, create an issue using `multica issue create`.\n")
} else {
issueID := task.EffectiveIssueID()
fmt.Fprintf(&b, "Your assigned issue ID is: %s\n\n", issueID)
fmt.Fprintf(&b, "Start by running `multica issue get %s --output json` to understand your task, then complete it.\n", issueID)
}
return b.String()
}

View File

@@ -26,13 +26,37 @@ type Task struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
IssueID *string `json:"issue_id"` // nil for agentflow tasks
WorkspaceID string `json:"workspace_id"`
Agent *AgentData `json:"agent,omitempty"`
Repos []RepoData `json:"repos,omitempty"`
PriorSessionID string `json:"prior_session_id,omitempty"` // Claude session ID from a previous task on this issue
PriorWorkDir string `json:"prior_work_dir,omitempty"` // work_dir from a previous task on this issue
TriggerCommentID string `json:"trigger_comment_id,omitempty"` // comment that triggered this task
PriorSessionID string `json:"prior_session_id,omitempty"`
PriorWorkDir string `json:"prior_work_dir,omitempty"`
TriggerCommentID string `json:"trigger_comment_id,omitempty"`
AgentflowRunID *string `json:"agentflow_run_id,omitempty"`
Agentflow *AgentflowData `json:"agentflow,omitempty"`
}
// AgentflowData holds agentflow context for agentflow-triggered tasks.
type AgentflowData struct {
ID string `json:"id"`
Title string `json:"title"`
Description *string `json:"description"` // prompt template
RunID string `json:"run_id"`
SourceKind string `json:"source_kind"`
}
// IsAgentflow returns true if this task was triggered by an agentflow.
func (t Task) IsAgentflow() bool {
return t.Agentflow != nil
}
// EffectiveIssueID returns the issue ID or empty string if nil.
func (t Task) EffectiveIssueID() string {
if t.IssueID != nil {
return *t.IssueID
}
return ""
}
// AgentData holds agent details returned by the claim endpoint.

View File

@@ -96,7 +96,7 @@ type AgentTaskResponse struct {
ID string `json:"id"`
AgentID string `json:"agent_id"`
RuntimeID string `json:"runtime_id"`
IssueID string `json:"issue_id"`
IssueID *string `json:"issue_id"`
WorkspaceID string `json:"workspace_id"`
Status string `json:"status"`
Priority int32 `json:"priority"`
@@ -111,6 +111,18 @@ type AgentTaskResponse struct {
PriorSessionID string `json:"prior_session_id,omitempty"` // session ID from a previous task on same issue
PriorWorkDir string `json:"prior_work_dir,omitempty"` // work_dir from a previous task on same issue
TriggerCommentID *string `json:"trigger_comment_id,omitempty"` // comment that triggered this task
AgentflowRunID *string `json:"agentflow_run_id,omitempty"` // agentflow run that triggered this task
Agentflow *TaskAgentflowData `json:"agentflow,omitempty"` // agentflow context for agentflow-triggered tasks
}
// TaskAgentflowData holds agentflow info included in claim responses so the
// daemon can build the prompt for agentflow-triggered tasks.
type TaskAgentflowData struct {
ID string `json:"id"`
Title string `json:"title"`
Description *string `json:"description"` // prompt template
RunID string `json:"run_id"`
SourceKind string `json:"source_kind"`
}
// TaskAgentData holds agent info included in claim responses so the daemon
@@ -128,19 +140,20 @@ func taskToResponse(t db.AgentTaskQueue) AgentTaskResponse {
json.Unmarshal(t.Result, &result)
}
return AgentTaskResponse{
ID: uuidToString(t.ID),
AgentID: uuidToString(t.AgentID),
RuntimeID: uuidToString(t.RuntimeID),
IssueID: uuidToString(t.IssueID),
Status: t.Status,
Priority: t.Priority,
DispatchedAt: timestampToPtr(t.DispatchedAt),
StartedAt: timestampToPtr(t.StartedAt),
CompletedAt: timestampToPtr(t.CompletedAt),
Result: result,
ID: uuidToString(t.ID),
AgentID: uuidToString(t.AgentID),
RuntimeID: uuidToString(t.RuntimeID),
IssueID: uuidToPtr(t.IssueID),
Status: t.Status,
Priority: t.Priority,
DispatchedAt: timestampToPtr(t.DispatchedAt),
StartedAt: timestampToPtr(t.StartedAt),
CompletedAt: timestampToPtr(t.CompletedAt),
Result: result,
Error: textToPtr(t.Error),
CreatedAt: timestampToString(t.CreatedAt),
TriggerCommentID: uuidToPtr(t.TriggerCommentID),
AgentflowRunID: uuidToPtr(t.AgentflowRunID),
}
}

View File

@@ -0,0 +1,484 @@
package handler
import (
"encoding/json"
"net/http"
"strconv"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// ── Response types ──────────────────────────────────────────────────────
type AgentflowResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
Title string `json:"title"`
Description *string `json:"description"`
AgentID string `json:"agent_id"`
Status string `json:"status"`
ConcurrencyPolicy string `json:"concurrency_policy"`
Variables json.RawMessage `json:"variables"`
CreatedBy string `json:"created_by"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
type AgentflowTriggerResponse struct {
ID string `json:"id"`
AgentflowID string `json:"agentflow_id"`
Kind string `json:"kind"`
Enabled bool `json:"enabled"`
CronExpression *string `json:"cron_expression"`
Timezone *string `json:"timezone"`
NextRunAt *string `json:"next_run_at"`
PublicID *string `json:"public_id,omitempty"`
LastFiredAt *string `json:"last_fired_at"`
CreatedAt string `json:"created_at"`
}
type AgentflowRunResponse struct {
ID string `json:"id"`
AgentflowID string `json:"agentflow_id"`
TriggerID *string `json:"trigger_id"`
SourceKind string `json:"source_kind"`
Status string `json:"status"`
LinkedIssueID *string `json:"linked_issue_id"`
Payload json.RawMessage `json:"payload"`
AgentOutput *string `json:"agent_output"`
StartedAt *string `json:"started_at"`
CompletedAt *string `json:"completed_at"`
CreatedAt string `json:"created_at"`
}
// ── Converters ──────────────────────────────────────────────────────────
func agentflowToResponse(a db.Agentflow) AgentflowResponse {
vars := json.RawMessage(a.Variables)
if len(vars) == 0 {
vars = json.RawMessage("[]")
}
return AgentflowResponse{
ID: uuidToString(a.ID),
WorkspaceID: uuidToString(a.WorkspaceID),
Title: a.Title,
Description: textToPtr(a.Description),
AgentID: uuidToString(a.AgentID),
Status: a.Status,
ConcurrencyPolicy: a.ConcurrencyPolicy,
Variables: vars,
CreatedBy: uuidToString(a.CreatedBy),
CreatedAt: timestampToString(a.CreatedAt),
UpdatedAt: timestampToString(a.UpdatedAt),
}
}
func triggerToResponse(t db.AgentflowTrigger) AgentflowTriggerResponse {
return AgentflowTriggerResponse{
ID: uuidToString(t.ID),
AgentflowID: uuidToString(t.AgentflowID),
Kind: t.Kind,
Enabled: t.Enabled,
CronExpression: textToPtr(t.CronExpression),
Timezone: textToPtr(t.Timezone),
NextRunAt: timestampToPtr(t.NextRunAt),
PublicID: textToPtr(t.PublicID),
LastFiredAt: timestampToPtr(t.LastFiredAt),
CreatedAt: timestampToString(t.CreatedAt),
}
}
func runToResponse(r db.AgentflowRun) AgentflowRunResponse {
payload := json.RawMessage(r.Payload)
if len(payload) == 0 {
payload = nil
}
return AgentflowRunResponse{
ID: uuidToString(r.ID),
AgentflowID: uuidToString(r.AgentflowID),
TriggerID: uuidToPtr(r.TriggerID),
SourceKind: r.SourceKind,
Status: r.Status,
LinkedIssueID: uuidToPtr(r.LinkedIssueID),
Payload: payload,
AgentOutput: textToPtr(r.AgentOutput),
StartedAt: timestampToPtr(r.StartedAt),
CompletedAt: timestampToPtr(r.CompletedAt),
CreatedAt: timestampToString(r.CreatedAt),
}
}
// ── Agentflow CRUD ──────────────────────────────────────────────────────
type CreateAgentflowRequest struct {
Title string `json:"title"`
Description *string `json:"description"`
AgentID string `json:"agent_id"`
Status string `json:"status"`
ConcurrencyPolicy string `json:"concurrency_policy"`
Variables json.RawMessage `json:"variables"`
}
func (h *Handler) CreateAgentflow(w http.ResponseWriter, r *http.Request) {
var req CreateAgentflowRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.Title == "" {
writeError(w, http.StatusBadRequest, "title is required")
return
}
if req.AgentID == "" {
writeError(w, http.StatusBadRequest, "agent_id is required")
return
}
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := resolveWorkspaceID(r)
status := req.Status
if status == "" {
status = "active"
}
policy := req.ConcurrencyPolicy
if policy == "" {
policy = "skip_if_active"
}
vars := req.Variables
if len(vars) == 0 {
vars = json.RawMessage("[]")
}
af, err := h.Queries.CreateAgentflow(r.Context(), db.CreateAgentflowParams{
WorkspaceID: parseUUID(workspaceID),
Title: req.Title,
Description: ptrToText(req.Description),
AgentID: parseUUID(req.AgentID),
Status: status,
ConcurrencyPolicy: policy,
Variables: vars,
CreatedBy: parseUUID(userID),
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create agentflow")
return
}
writeJSON(w, http.StatusCreated, agentflowToResponse(af))
}
func (h *Handler) ListAgentflows(w http.ResponseWriter, r *http.Request) {
workspaceID := resolveWorkspaceID(r)
flows, err := h.Queries.ListAgentflows(r.Context(), parseUUID(workspaceID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list agentflows")
return
}
resp := make([]AgentflowResponse, len(flows))
for i, f := range flows {
resp[i] = agentflowToResponse(f)
}
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) GetAgentflow(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
workspaceID := resolveWorkspaceID(r)
af, err := h.Queries.GetAgentflowInWorkspace(r.Context(), db.GetAgentflowInWorkspaceParams{
ID: parseUUID(id),
WorkspaceID: parseUUID(workspaceID),
})
if err != nil {
writeError(w, http.StatusNotFound, "agentflow not found")
return
}
writeJSON(w, http.StatusOK, agentflowToResponse(af))
}
type UpdateAgentflowRequest struct {
Title *string `json:"title"`
Description *string `json:"description"`
AgentID *string `json:"agent_id"`
Status *string `json:"status"`
ConcurrencyPolicy *string `json:"concurrency_policy"`
Variables json.RawMessage `json:"variables"`
}
func (h *Handler) UpdateAgentflow(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
var req UpdateAgentflowRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
params := db.UpdateAgentflowParams{ID: parseUUID(id)}
if req.Title != nil {
params.Title = ptrToText(req.Title)
}
if req.Description != nil {
params.Description = ptrToText(req.Description)
}
if req.AgentID != nil {
params.AgentID = parseUUID(*req.AgentID)
}
if req.Status != nil {
params.Status = ptrToText(req.Status)
}
if req.ConcurrencyPolicy != nil {
params.ConcurrencyPolicy = ptrToText(req.ConcurrencyPolicy)
}
if len(req.Variables) > 0 {
params.Variables = req.Variables
}
af, err := h.Queries.UpdateAgentflow(r.Context(), params)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to update agentflow")
return
}
writeJSON(w, http.StatusOK, agentflowToResponse(af))
}
func (h *Handler) ArchiveAgentflow(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
af, err := h.Queries.ArchiveAgentflow(r.Context(), parseUUID(id))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to archive agentflow")
return
}
writeJSON(w, http.StatusOK, agentflowToResponse(af))
}
// ── Triggers ────────────────────────────────────────────────────────────
type CreateTriggerRequest struct {
Kind string `json:"kind"`
Enabled *bool `json:"enabled"`
CronExpression *string `json:"cron_expression"`
Timezone *string `json:"timezone"`
NextRunAt *string `json:"next_run_at"`
}
func (h *Handler) CreateAgentflowTrigger(w http.ResponseWriter, r *http.Request) {
agentflowID := chi.URLParam(r, "id")
var req CreateTriggerRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.Kind == "" {
writeError(w, http.StatusBadRequest, "kind is required")
return
}
enabled := true
if req.Enabled != nil {
enabled = *req.Enabled
}
var nextRunAt pgtype.Timestamptz
if req.NextRunAt != nil {
nextRunAt = parseTimestamp(*req.NextRunAt)
}
t, err := h.Queries.CreateAgentflowTrigger(r.Context(), db.CreateAgentflowTriggerParams{
AgentflowID: parseUUID(agentflowID),
Kind: req.Kind,
Enabled: enabled,
CronExpression: ptrToText(req.CronExpression),
Timezone: ptrToText(req.Timezone),
NextRunAt: nextRunAt,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create trigger")
return
}
writeJSON(w, http.StatusCreated, triggerToResponse(t))
}
func (h *Handler) ListAgentflowTriggers(w http.ResponseWriter, r *http.Request) {
agentflowID := chi.URLParam(r, "id")
triggers, err := h.Queries.ListAgentflowTriggers(r.Context(), parseUUID(agentflowID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list triggers")
return
}
resp := make([]AgentflowTriggerResponse, len(triggers))
for i, t := range triggers {
resp[i] = triggerToResponse(t)
}
writeJSON(w, http.StatusOK, resp)
}
type UpdateTriggerRequest struct {
Enabled *bool `json:"enabled"`
CronExpression *string `json:"cron_expression"`
Timezone *string `json:"timezone"`
NextRunAt *string `json:"next_run_at"`
}
func (h *Handler) UpdateAgentflowTrigger(w http.ResponseWriter, r *http.Request) {
triggerID := chi.URLParam(r, "triggerId")
var req UpdateTriggerRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
params := db.UpdateAgentflowTriggerParams{ID: parseUUID(triggerID)}
if req.Enabled != nil {
params.Enabled = pgtype.Bool{Bool: *req.Enabled, Valid: true}
}
if req.CronExpression != nil {
params.CronExpression = ptrToText(req.CronExpression)
}
if req.Timezone != nil {
params.Timezone = ptrToText(req.Timezone)
}
if req.NextRunAt != nil {
params.NextRunAt = parseTimestamp(*req.NextRunAt)
}
t, err := h.Queries.UpdateAgentflowTrigger(r.Context(), params)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to update trigger")
return
}
writeJSON(w, http.StatusOK, triggerToResponse(t))
}
func (h *Handler) DeleteAgentflowTrigger(w http.ResponseWriter, r *http.Request) {
triggerID := chi.URLParam(r, "triggerId")
err := h.Queries.DeleteAgentflowTrigger(r.Context(), parseUUID(triggerID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to delete trigger")
return
}
w.WriteHeader(http.StatusNoContent)
}
// ── Runs ────────────────────────────────────────────────────────────────
func (h *Handler) ListAgentflowRuns(w http.ResponseWriter, r *http.Request) {
agentflowID := chi.URLParam(r, "id")
limit := int32(50)
offset := int32(0)
if l := r.URL.Query().Get("limit"); l != "" {
if v, err := strconv.Atoi(l); err == nil {
limit = int32(v)
}
}
if o := r.URL.Query().Get("offset"); o != "" {
if v, err := strconv.Atoi(o); err == nil {
offset = int32(v)
}
}
runs, err := h.Queries.ListAgentflowRuns(r.Context(), db.ListAgentflowRunsParams{
AgentflowID: parseUUID(agentflowID),
Limit: limit,
Offset: offset,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list runs")
return
}
resp := make([]AgentflowRunResponse, len(runs))
for i, run := range runs {
resp[i] = runToResponse(run)
}
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) RunAgentflow(w http.ResponseWriter, r *http.Request) {
agentflowID := chi.URLParam(r, "id")
workspaceID := resolveWorkspaceID(r)
af, err := h.Queries.GetAgentflowInWorkspace(r.Context(), db.GetAgentflowInWorkspaceParams{
ID: parseUUID(agentflowID),
WorkspaceID: parseUUID(workspaceID),
})
if err != nil {
writeError(w, http.StatusNotFound, "agentflow not found")
return
}
// Check concurrency policy
if af.ConcurrencyPolicy == "skip_if_active" || af.ConcurrencyPolicy == "coalesce" {
hasActive, err := h.Queries.HasActiveAgentflowRun(r.Context(), af.ID)
if err == nil && hasActive {
if af.ConcurrencyPolicy == "skip_if_active" {
writeError(w, http.StatusConflict, "agentflow already has an active run")
return
}
// coalesce: skip silently
run, _ := h.Queries.CreateAgentflowRun(r.Context(), db.CreateAgentflowRunParams{
AgentflowID: af.ID,
SourceKind: "manual",
Status: "coalesced",
})
writeJSON(w, http.StatusOK, runToResponse(run))
return
}
}
// Create run
run, err := h.Queries.CreateAgentflowRun(r.Context(), db.CreateAgentflowRunParams{
AgentflowID: af.ID,
SourceKind: "manual",
Status: "received",
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create run")
return
}
// Enqueue task for the agent
err = h.TaskService.EnqueueTaskForAgentflow(r.Context(), af, run)
if err != nil {
// Mark run as failed
h.Queries.UpdateAgentflowRunStatus(r.Context(), db.UpdateAgentflowRunStatusParams{
ID: run.ID,
Status: "failed",
})
writeError(w, http.StatusInternalServerError, "failed to enqueue agentflow task")
return
}
writeJSON(w, http.StatusCreated, runToResponse(run))
}
// ── Helpers ─────────────────────────────────────────────────────────────
func parseTimestamp(s string) pgtype.Timestamptz {
var ts pgtype.Timestamptz
if err := ts.Scan(s); err != nil {
return pgtype.Timestamptz{}
}
return ts
}

View File

@@ -246,10 +246,34 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
}
}
// Include workspace ID and repos so the daemon can set up worktrees.
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
resp.WorkspaceID = uuidToString(issue.WorkspaceID)
if ws, err := h.Queries.GetWorkspace(r.Context(), issue.WorkspaceID); err == nil && ws.Repos != nil {
// Resolve workspace ID and repos.
var workspaceID pgtype.UUID
if task.IssueID.Valid {
// Standard issue-triggered task
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
workspaceID = issue.WorkspaceID
resp.WorkspaceID = uuidToString(issue.WorkspaceID)
}
} else if task.AgentflowRunID.Valid {
// Agentflow-triggered task: resolve workspace via agentflow
if run, err := h.Queries.GetAgentflowRun(r.Context(), task.AgentflowRunID); err == nil {
if af, err := h.Queries.GetAgentflow(r.Context(), run.AgentflowID); err == nil {
workspaceID = af.WorkspaceID
resp.WorkspaceID = uuidToString(af.WorkspaceID)
resp.Agentflow = &TaskAgentflowData{
ID: uuidToString(af.ID),
Title: af.Title,
Description: textToPtr(af.Description),
RunID: uuidToString(run.ID),
SourceKind: run.SourceKind,
}
}
}
}
// Include repos so the daemon can set up worktrees.
if workspaceID.Valid {
if ws, err := h.Queries.GetWorkspace(r.Context(), workspaceID); err == nil && ws.Repos != nil {
var repos []RepoData
if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 {
resp.Repos = repos
@@ -259,13 +283,15 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
// Look up the prior session for this (agent, issue) pair so the daemon
// can resume the Claude Code conversation context.
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
AgentID: task.AgentID,
IssueID: task.IssueID,
}); err == nil && prior.SessionID.Valid {
resp.PriorSessionID = prior.SessionID.String
if prior.WorkDir.Valid {
resp.PriorWorkDir = prior.WorkDir.String
if task.IssueID.Valid {
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
AgentID: task.AgentID,
IssueID: task.IssueID,
}); err == nil && prior.SessionID.Valid {
resp.PriorSessionID = prior.SessionID.String
if prior.WorkDir.Valid {
resp.PriorWorkDir = prior.WorkDir.String
}
}
}

View File

@@ -108,6 +108,41 @@ func (s *TaskService) EnqueueTaskForMention(ctx context.Context, issue db.Issue,
return task, nil
}
// EnqueueTaskForAgentflow creates a queued task for an agentflow run.
// The task has no issue_id — the agent may or may not create one during execution.
func (s *TaskService) EnqueueTaskForAgentflow(ctx context.Context, af db.Agentflow, run db.AgentflowRun) error {
agent, err := s.Queries.GetAgent(ctx, af.AgentID)
if err != nil {
slog.Error("agentflow task enqueue failed: agent not found", "agentflow_id", util.UUIDToString(af.ID), "agent_id", util.UUIDToString(af.AgentID), "error", err)
return fmt.Errorf("load agent: %w", err)
}
if agent.ArchivedAt.Valid {
return fmt.Errorf("agent is archived")
}
if !agent.RuntimeID.Valid {
return fmt.Errorf("agent has no runtime")
}
task, err := s.Queries.CreateAgentflowTask(ctx, db.CreateAgentflowTaskParams{
AgentID: af.AgentID,
RuntimeID: agent.RuntimeID,
Priority: 1, // default priority for agentflow tasks
AgentflowRunID: run.ID,
})
if err != nil {
slog.Error("agentflow task enqueue failed", "agentflow_id", util.UUIDToString(af.ID), "error", err)
return fmt.Errorf("create task: %w", err)
}
slog.Info("agentflow task enqueued",
"task_id", util.UUIDToString(task.ID),
"agentflow_id", util.UUIDToString(af.ID),
"run_id", util.UUIDToString(run.ID),
"agent_id", util.UUIDToString(af.AgentID),
)
return nil
}
// CancelTasksForIssue cancels all active tasks for an issue.
func (s *TaskService) CancelTasksForIssue(ctx context.Context, issueID pgtype.UUID) error {
return s.Queries.CancelAgentTasksByIssue(ctx, issueID)
@@ -402,12 +437,9 @@ func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTa
payload["task_id"] = util.UUIDToString(task.ID)
payload["runtime_id"] = util.UUIDToString(task.RuntimeID)
workspaceID := ""
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
workspaceID = util.UUIDToString(issue.WorkspaceID)
}
workspaceID := s.resolveTaskWorkspaceID(ctx, task)
if workspaceID == "" {
return // Issue deleted; skip broadcast to avoid global leak
return
}
s.Bus.Publish(events.Event{
Type: protocol.EventTaskDispatch,
@@ -419,12 +451,9 @@ func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTa
}
func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string, task db.AgentTaskQueue) {
workspaceID := ""
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
workspaceID = util.UUIDToString(issue.WorkspaceID)
}
workspaceID := s.resolveTaskWorkspaceID(ctx, task)
if workspaceID == "" {
return // Issue deleted; skip broadcast to avoid global leak
return
}
s.Bus.Publish(events.Event{
Type: eventType,
@@ -432,14 +461,39 @@ func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string,
ActorType: "system",
ActorID: "",
Payload: map[string]any{
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToString(task.IssueID),
"status": task.Status,
"task_id": util.UUIDToString(task.ID),
"agent_id": util.UUIDToString(task.AgentID),
"issue_id": util.UUIDToPtr(task.IssueID),
"agentflow_run_id": util.UUIDToPtr(task.AgentflowRunID),
"status": task.Status,
},
})
}
// resolveTaskWorkspaceID derives the workspace ID for a task, which may be
// associated with an issue (normal task) or an agentflow run (agentflow task).
func (s *TaskService) resolveTaskWorkspaceID(ctx context.Context, task db.AgentTaskQueue) string {
// Try issue first (most common path)
if task.IssueID.Valid {
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
return util.UUIDToString(issue.WorkspaceID)
}
}
// Fallback: look up via agentflow run → agentflow → workspace
if task.AgentflowRunID.Valid {
if run, err := s.Queries.GetAgentflowRun(ctx, task.AgentflowRunID); err == nil {
if af, err := s.Queries.GetAgentflow(ctx, run.AgentflowID); err == nil {
return util.UUIDToString(af.WorkspaceID)
}
}
}
// Last resort: look up via agent
if agent, err := s.Queries.GetAgent(ctx, task.AgentID); err == nil {
return util.UUIDToString(agent.WorkspaceID)
}
return ""
}
func (s *TaskService) broadcastIssueUpdated(issue db.Issue) {
prefix := s.getIssuePrefix(issue.WorkspaceID)
s.Bus.Publish(events.Event{

View File

@@ -0,0 +1,6 @@
ALTER TABLE agent_task_queue DROP COLUMN IF EXISTS agentflow_run_id;
ALTER TABLE agent_task_queue ALTER COLUMN issue_id SET NOT NULL;
DROP TABLE IF EXISTS agentflow_run;
DROP TABLE IF EXISTS agentflow_trigger;
DROP TABLE IF EXISTS agentflow;

View File

@@ -0,0 +1,74 @@
-- Agentflow: scheduled/triggered agent tasks that run independently of issues.
-- Agentflows define reusable agent tasks with prompt templates.
CREATE TABLE agentflow (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE,
title TEXT NOT NULL,
description TEXT, -- prompt template, supports {{VARIABLE}} interpolation
agent_id UUID NOT NULL REFERENCES agent(id) ON DELETE CASCADE,
status TEXT NOT NULL DEFAULT 'active'
CHECK (status IN ('active', 'paused', 'archived')),
concurrency_policy TEXT NOT NULL DEFAULT 'skip_if_active'
CHECK (concurrency_policy IN ('skip_if_active', 'coalesce', 'always_run')),
variables JSONB NOT NULL DEFAULT '[]', -- variable definitions array
created_by UUID NOT NULL REFERENCES "user"(id),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_agentflow_workspace ON agentflow(workspace_id)
WHERE status != 'archived';
-- Triggers define when an agentflow fires (schedule, webhook, api).
CREATE TABLE agentflow_trigger (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agentflow_id UUID NOT NULL REFERENCES agentflow(id) ON DELETE CASCADE,
kind TEXT NOT NULL CHECK (kind IN ('schedule', 'webhook', 'api')),
enabled BOOLEAN NOT NULL DEFAULT true,
-- schedule fields
cron_expression TEXT,
timezone TEXT DEFAULT 'UTC',
next_run_at TIMESTAMPTZ,
-- webhook fields (reserved for phase 2)
public_id TEXT UNIQUE,
secret_hash TEXT,
signing_mode TEXT CHECK (signing_mode IN ('bearer', 'hmac_sha256')),
last_fired_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- Efficient lookup for the scheduler: find schedule triggers that are due.
CREATE INDEX idx_agentflow_trigger_schedule_due
ON agentflow_trigger(next_run_at)
WHERE kind = 'schedule' AND enabled = true;
-- Runs track each execution of an agentflow.
CREATE TABLE agentflow_run (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agentflow_id UUID NOT NULL REFERENCES agentflow(id) ON DELETE CASCADE,
trigger_id UUID REFERENCES agentflow_trigger(id) ON DELETE SET NULL,
source_kind TEXT NOT NULL CHECK (source_kind IN ('schedule', 'webhook', 'api', 'manual')),
status TEXT NOT NULL DEFAULT 'received'
CHECK (status IN ('received', 'executing', 'completed', 'failed', 'skipped', 'coalesced')),
linked_issue_id UUID REFERENCES issue(id) ON DELETE SET NULL, -- nullable: agent decides if issue needed
payload JSONB, -- variable values + trigger payload snapshot
agent_output TEXT, -- agent's execution summary
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
idempotency_key TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_agentflow_run_agentflow ON agentflow_run(agentflow_id, created_at DESC);
-- Extend agent_task_queue to support agentflow-triggered tasks.
-- issue_id becomes nullable (agentflow tasks may not have an issue).
ALTER TABLE agent_task_queue ALTER COLUMN issue_id DROP NOT NULL;
-- Link tasks to agentflow runs.
ALTER TABLE agent_task_queue ADD COLUMN agentflow_run_id UUID
REFERENCES agentflow_run(id) ON DELETE SET NULL;

View File

@@ -53,7 +53,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one
UPDATE agent_task_queue
SET status = 'cancelled', completed_at = now()
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
`
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@@ -76,6 +76,7 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
)
return i, err
}
@@ -117,7 +118,7 @@ WHERE id = (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
`
// Claims the next queued task for an agent, enforcing per-issue serialization:
@@ -144,6 +145,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
)
return i, err
}
@@ -152,7 +154,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one
UPDATE agent_task_queue
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
WHERE id = $1 AND status = 'running'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
`
type CompleteAgentTaskParams struct {
@@ -187,6 +189,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
)
return i, err
}
@@ -272,7 +275,7 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent
const createAgentTask = `-- name: CreateAgentTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id)
VALUES ($1, $2, $3, 'queued', $4, $5)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
`
type CreateAgentTaskParams struct {
@@ -309,6 +312,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
)
return i, err
}
@@ -317,7 +321,7 @@ const failAgentTask = `-- name: FailAgentTask :one
UPDATE agent_task_queue
SET status = 'failed', completed_at = now(), error = $2
WHERE id = $1 AND status IN ('dispatched', 'running')
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
`
type FailAgentTaskParams struct {
@@ -345,6 +349,7 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
)
return i, err
}
@@ -461,7 +466,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa
}
const getAgentTask = `-- name: GetAgentTask :one
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
WHERE id = $1
`
@@ -485,6 +490,7 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
)
return i, err
}
@@ -564,7 +570,7 @@ func (q *Queries) HasPendingTaskForIssueAndAgent(ctx context.Context, arg HasPen
}
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
ORDER BY created_at DESC
`
@@ -595,6 +601,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
); err != nil {
return nil, err
}
@@ -607,7 +614,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
}
const listAgentTasks = `-- name: ListAgentTasks :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
WHERE agent_id = $1
ORDER BY created_at DESC
`
@@ -638,6 +645,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
); err != nil {
return nil, err
}
@@ -742,7 +750,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([
}
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
ORDER BY priority DESC, created_at ASC
`
@@ -773,6 +781,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
); err != nil {
return nil, err
}
@@ -785,7 +794,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
}
const listTasksByIssue = `-- name: ListTasksByIssue :many
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
WHERE issue_id = $1
ORDER BY created_at DESC
`
@@ -816,6 +825,7 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
); err != nil {
return nil, err
}
@@ -864,7 +874,7 @@ const startAgentTask = `-- name: StartAgentTask :one
UPDATE agent_task_queue
SET status = 'running', started_at = now()
WHERE id = $1 AND status = 'dispatched'
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
`
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
@@ -887,6 +897,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
)
return i, err
}

View File

@@ -0,0 +1,764 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.30.0
// source: agentflow.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const archiveAgentflow = `-- name: ArchiveAgentflow :one
UPDATE agentflow SET status = 'archived', updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
`
func (q *Queries) ArchiveAgentflow(ctx context.Context, id pgtype.UUID) (Agentflow, error) {
row := q.db.QueryRow(ctx, archiveAgentflow, id)
var i Agentflow
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.Description,
&i.AgentID,
&i.Status,
&i.ConcurrencyPolicy,
&i.Variables,
&i.CreatedBy,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const claimDueScheduleTriggers = `-- name: ClaimDueScheduleTriggers :many
UPDATE agentflow_trigger t SET
next_run_at = NULL, -- will be recalculated by caller
last_fired_at = now()
FROM agentflow a
WHERE t.agentflow_id = a.id
AND t.kind = 'schedule'
AND t.enabled = true
AND a.status = 'active'
AND t.next_run_at IS NOT NULL
AND t.next_run_at <= now()
RETURNING t.id, t.agentflow_id, t.kind, t.enabled, t.cron_expression, t.timezone, t.next_run_at, t.public_id, t.secret_hash, t.signing_mode, t.last_fired_at, t.created_at, a.workspace_id, a.agent_id, a.title AS agentflow_title, a.description AS agentflow_description, a.concurrency_policy
`
type ClaimDueScheduleTriggersRow struct {
ID pgtype.UUID `json:"id"`
AgentflowID pgtype.UUID `json:"agentflow_id"`
Kind string `json:"kind"`
Enabled bool `json:"enabled"`
CronExpression pgtype.Text `json:"cron_expression"`
Timezone pgtype.Text `json:"timezone"`
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
PublicID pgtype.Text `json:"public_id"`
SecretHash pgtype.Text `json:"secret_hash"`
SigningMode pgtype.Text `json:"signing_mode"`
LastFiredAt pgtype.Timestamptz `json:"last_fired_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
AgentID pgtype.UUID `json:"agent_id"`
AgentflowTitle string `json:"agentflow_title"`
AgentflowDescription pgtype.Text `json:"agentflow_description"`
ConcurrencyPolicy string `json:"concurrency_policy"`
}
// Atomically claims all schedule triggers that are due.
// Uses CAS (compare-and-swap) on next_run_at to prevent double-firing
// across multiple server instances.
func (q *Queries) ClaimDueScheduleTriggers(ctx context.Context) ([]ClaimDueScheduleTriggersRow, error) {
rows, err := q.db.Query(ctx, claimDueScheduleTriggers)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ClaimDueScheduleTriggersRow{}
for rows.Next() {
var i ClaimDueScheduleTriggersRow
if err := rows.Scan(
&i.ID,
&i.AgentflowID,
&i.Kind,
&i.Enabled,
&i.CronExpression,
&i.Timezone,
&i.NextRunAt,
&i.PublicID,
&i.SecretHash,
&i.SigningMode,
&i.LastFiredAt,
&i.CreatedAt,
&i.WorkspaceID,
&i.AgentID,
&i.AgentflowTitle,
&i.AgentflowDescription,
&i.ConcurrencyPolicy,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const completeAgentflowRun = `-- name: CompleteAgentflowRun :one
UPDATE agentflow_run SET
status = $2,
agent_output = $3,
linked_issue_id = $4,
completed_at = now()
WHERE id = $1
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
`
type CompleteAgentflowRunParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
AgentOutput pgtype.Text `json:"agent_output"`
LinkedIssueID pgtype.UUID `json:"linked_issue_id"`
}
func (q *Queries) CompleteAgentflowRun(ctx context.Context, arg CompleteAgentflowRunParams) (AgentflowRun, error) {
row := q.db.QueryRow(ctx, completeAgentflowRun,
arg.ID,
arg.Status,
arg.AgentOutput,
arg.LinkedIssueID,
)
var i AgentflowRun
err := row.Scan(
&i.ID,
&i.AgentflowID,
&i.TriggerID,
&i.SourceKind,
&i.Status,
&i.LinkedIssueID,
&i.Payload,
&i.AgentOutput,
&i.StartedAt,
&i.CompletedAt,
&i.IdempotencyKey,
&i.CreatedAt,
)
return i, err
}
const createAgentflow = `-- name: CreateAgentflow :one
INSERT INTO agentflow (
workspace_id, title, description, agent_id, status,
concurrency_policy, variables, created_by
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
`
type CreateAgentflowParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
Title string `json:"title"`
Description pgtype.Text `json:"description"`
AgentID pgtype.UUID `json:"agent_id"`
Status string `json:"status"`
ConcurrencyPolicy string `json:"concurrency_policy"`
Variables []byte `json:"variables"`
CreatedBy pgtype.UUID `json:"created_by"`
}
func (q *Queries) CreateAgentflow(ctx context.Context, arg CreateAgentflowParams) (Agentflow, error) {
row := q.db.QueryRow(ctx, createAgentflow,
arg.WorkspaceID,
arg.Title,
arg.Description,
arg.AgentID,
arg.Status,
arg.ConcurrencyPolicy,
arg.Variables,
arg.CreatedBy,
)
var i Agentflow
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.Description,
&i.AgentID,
&i.Status,
&i.ConcurrencyPolicy,
&i.Variables,
&i.CreatedBy,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const createAgentflowRun = `-- name: CreateAgentflowRun :one
INSERT INTO agentflow_run (
agentflow_id, trigger_id, source_kind, status, payload, idempotency_key
) VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
`
type CreateAgentflowRunParams struct {
AgentflowID pgtype.UUID `json:"agentflow_id"`
TriggerID pgtype.UUID `json:"trigger_id"`
SourceKind string `json:"source_kind"`
Status string `json:"status"`
Payload []byte `json:"payload"`
IdempotencyKey pgtype.Text `json:"idempotency_key"`
}
func (q *Queries) CreateAgentflowRun(ctx context.Context, arg CreateAgentflowRunParams) (AgentflowRun, error) {
row := q.db.QueryRow(ctx, createAgentflowRun,
arg.AgentflowID,
arg.TriggerID,
arg.SourceKind,
arg.Status,
arg.Payload,
arg.IdempotencyKey,
)
var i AgentflowRun
err := row.Scan(
&i.ID,
&i.AgentflowID,
&i.TriggerID,
&i.SourceKind,
&i.Status,
&i.LinkedIssueID,
&i.Payload,
&i.AgentOutput,
&i.StartedAt,
&i.CompletedAt,
&i.IdempotencyKey,
&i.CreatedAt,
)
return i, err
}
const createAgentflowTask = `-- name: CreateAgentflowTask :one
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, agentflow_run_id)
VALUES ($1, $2, NULL, 'queued', $3, $4)
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
`
type CreateAgentflowTaskParams struct {
AgentID pgtype.UUID `json:"agent_id"`
RuntimeID pgtype.UUID `json:"runtime_id"`
Priority int32 `json:"priority"`
AgentflowRunID pgtype.UUID `json:"agentflow_run_id"`
}
// Creates a task in the queue for an agentflow run (issue_id is NULL).
func (q *Queries) CreateAgentflowTask(ctx context.Context, arg CreateAgentflowTaskParams) (AgentTaskQueue, error) {
row := q.db.QueryRow(ctx, createAgentflowTask,
arg.AgentID,
arg.RuntimeID,
arg.Priority,
arg.AgentflowRunID,
)
var i AgentTaskQueue
err := row.Scan(
&i.ID,
&i.AgentID,
&i.IssueID,
&i.Status,
&i.Priority,
&i.DispatchedAt,
&i.StartedAt,
&i.CompletedAt,
&i.Result,
&i.Error,
&i.CreatedAt,
&i.Context,
&i.RuntimeID,
&i.SessionID,
&i.WorkDir,
&i.TriggerCommentID,
&i.AgentflowRunID,
)
return i, err
}
const createAgentflowTrigger = `-- name: CreateAgentflowTrigger :one
INSERT INTO agentflow_trigger (
agentflow_id, kind, enabled,
cron_expression, timezone, next_run_at,
public_id, secret_hash, signing_mode
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at
`
type CreateAgentflowTriggerParams struct {
AgentflowID pgtype.UUID `json:"agentflow_id"`
Kind string `json:"kind"`
Enabled bool `json:"enabled"`
CronExpression pgtype.Text `json:"cron_expression"`
Timezone pgtype.Text `json:"timezone"`
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
PublicID pgtype.Text `json:"public_id"`
SecretHash pgtype.Text `json:"secret_hash"`
SigningMode pgtype.Text `json:"signing_mode"`
}
func (q *Queries) CreateAgentflowTrigger(ctx context.Context, arg CreateAgentflowTriggerParams) (AgentflowTrigger, error) {
row := q.db.QueryRow(ctx, createAgentflowTrigger,
arg.AgentflowID,
arg.Kind,
arg.Enabled,
arg.CronExpression,
arg.Timezone,
arg.NextRunAt,
arg.PublicID,
arg.SecretHash,
arg.SigningMode,
)
var i AgentflowTrigger
err := row.Scan(
&i.ID,
&i.AgentflowID,
&i.Kind,
&i.Enabled,
&i.CronExpression,
&i.Timezone,
&i.NextRunAt,
&i.PublicID,
&i.SecretHash,
&i.SigningMode,
&i.LastFiredAt,
&i.CreatedAt,
)
return i, err
}
const deleteAgentflowTrigger = `-- name: DeleteAgentflowTrigger :exec
DELETE FROM agentflow_trigger WHERE id = $1
`
func (q *Queries) DeleteAgentflowTrigger(ctx context.Context, id pgtype.UUID) error {
_, err := q.db.Exec(ctx, deleteAgentflowTrigger, id)
return err
}
const getAgentflow = `-- name: GetAgentflow :one
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow WHERE id = $1
`
func (q *Queries) GetAgentflow(ctx context.Context, id pgtype.UUID) (Agentflow, error) {
row := q.db.QueryRow(ctx, getAgentflow, id)
var i Agentflow
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.Description,
&i.AgentID,
&i.Status,
&i.ConcurrencyPolicy,
&i.Variables,
&i.CreatedBy,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getAgentflowInWorkspace = `-- name: GetAgentflowInWorkspace :one
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow WHERE id = $1 AND workspace_id = $2
`
type GetAgentflowInWorkspaceParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
func (q *Queries) GetAgentflowInWorkspace(ctx context.Context, arg GetAgentflowInWorkspaceParams) (Agentflow, error) {
row := q.db.QueryRow(ctx, getAgentflowInWorkspace, arg.ID, arg.WorkspaceID)
var i Agentflow
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.Description,
&i.AgentID,
&i.Status,
&i.ConcurrencyPolicy,
&i.Variables,
&i.CreatedBy,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getAgentflowRun = `-- name: GetAgentflowRun :one
SELECT id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at FROM agentflow_run WHERE id = $1
`
func (q *Queries) GetAgentflowRun(ctx context.Context, id pgtype.UUID) (AgentflowRun, error) {
row := q.db.QueryRow(ctx, getAgentflowRun, id)
var i AgentflowRun
err := row.Scan(
&i.ID,
&i.AgentflowID,
&i.TriggerID,
&i.SourceKind,
&i.Status,
&i.LinkedIssueID,
&i.Payload,
&i.AgentOutput,
&i.StartedAt,
&i.CompletedAt,
&i.IdempotencyKey,
&i.CreatedAt,
)
return i, err
}
const getAgentflowTrigger = `-- name: GetAgentflowTrigger :one
SELECT id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at FROM agentflow_trigger WHERE id = $1
`
func (q *Queries) GetAgentflowTrigger(ctx context.Context, id pgtype.UUID) (AgentflowTrigger, error) {
row := q.db.QueryRow(ctx, getAgentflowTrigger, id)
var i AgentflowTrigger
err := row.Scan(
&i.ID,
&i.AgentflowID,
&i.Kind,
&i.Enabled,
&i.CronExpression,
&i.Timezone,
&i.NextRunAt,
&i.PublicID,
&i.SecretHash,
&i.SigningMode,
&i.LastFiredAt,
&i.CreatedAt,
)
return i, err
}
const hasActiveAgentflowRun = `-- name: HasActiveAgentflowRun :one
SELECT count(*) > 0 AS has_active FROM agentflow_run
WHERE agentflow_id = $1 AND status IN ('received', 'executing')
`
// Check if agentflow has a run currently executing (for concurrency policy).
func (q *Queries) HasActiveAgentflowRun(ctx context.Context, agentflowID pgtype.UUID) (bool, error) {
row := q.db.QueryRow(ctx, hasActiveAgentflowRun, agentflowID)
var has_active bool
err := row.Scan(&has_active)
return has_active, err
}
const listAgentflowRuns = `-- name: ListAgentflowRuns :many
SELECT id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at FROM agentflow_run
WHERE agentflow_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3
`
type ListAgentflowRunsParams struct {
AgentflowID pgtype.UUID `json:"agentflow_id"`
Limit int32 `json:"limit"`
Offset int32 `json:"offset"`
}
func (q *Queries) ListAgentflowRuns(ctx context.Context, arg ListAgentflowRunsParams) ([]AgentflowRun, error) {
rows, err := q.db.Query(ctx, listAgentflowRuns, arg.AgentflowID, arg.Limit, arg.Offset)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentflowRun{}
for rows.Next() {
var i AgentflowRun
if err := rows.Scan(
&i.ID,
&i.AgentflowID,
&i.TriggerID,
&i.SourceKind,
&i.Status,
&i.LinkedIssueID,
&i.Payload,
&i.AgentOutput,
&i.StartedAt,
&i.CompletedAt,
&i.IdempotencyKey,
&i.CreatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAgentflowTriggers = `-- name: ListAgentflowTriggers :many
SELECT id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at FROM agentflow_trigger
WHERE agentflow_id = $1
ORDER BY created_at ASC
`
func (q *Queries) ListAgentflowTriggers(ctx context.Context, agentflowID pgtype.UUID) ([]AgentflowTrigger, error) {
rows, err := q.db.Query(ctx, listAgentflowTriggers, agentflowID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AgentflowTrigger{}
for rows.Next() {
var i AgentflowTrigger
if err := rows.Scan(
&i.ID,
&i.AgentflowID,
&i.Kind,
&i.Enabled,
&i.CronExpression,
&i.Timezone,
&i.NextRunAt,
&i.PublicID,
&i.SecretHash,
&i.SigningMode,
&i.LastFiredAt,
&i.CreatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAgentflows = `-- name: ListAgentflows :many
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow
WHERE workspace_id = $1 AND status != 'archived'
ORDER BY created_at DESC
`
func (q *Queries) ListAgentflows(ctx context.Context, workspaceID pgtype.UUID) ([]Agentflow, error) {
rows, err := q.db.Query(ctx, listAgentflows, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []Agentflow{}
for rows.Next() {
var i Agentflow
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.Description,
&i.AgentID,
&i.Status,
&i.ConcurrencyPolicy,
&i.Variables,
&i.CreatedBy,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAllAgentflows = `-- name: ListAllAgentflows :many
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow
WHERE workspace_id = $1
ORDER BY created_at DESC
`
func (q *Queries) ListAllAgentflows(ctx context.Context, workspaceID pgtype.UUID) ([]Agentflow, error) {
rows, err := q.db.Query(ctx, listAllAgentflows, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []Agentflow{}
for rows.Next() {
var i Agentflow
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.Description,
&i.AgentID,
&i.Status,
&i.ConcurrencyPolicy,
&i.Variables,
&i.CreatedBy,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const setTriggerNextRunAt = `-- name: SetTriggerNextRunAt :exec
UPDATE agentflow_trigger SET next_run_at = $2
WHERE id = $1
`
type SetTriggerNextRunAtParams struct {
ID pgtype.UUID `json:"id"`
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
}
func (q *Queries) SetTriggerNextRunAt(ctx context.Context, arg SetTriggerNextRunAtParams) error {
_, err := q.db.Exec(ctx, setTriggerNextRunAt, arg.ID, arg.NextRunAt)
return err
}
const updateAgentflow = `-- name: UpdateAgentflow :one
UPDATE agentflow SET
title = COALESCE($2, title),
description = COALESCE($3, description),
agent_id = COALESCE($4, agent_id),
status = COALESCE($5, status),
concurrency_policy = COALESCE($6, concurrency_policy),
variables = COALESCE($7, variables),
updated_at = now()
WHERE id = $1
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
`
type UpdateAgentflowParams struct {
ID pgtype.UUID `json:"id"`
Title pgtype.Text `json:"title"`
Description pgtype.Text `json:"description"`
AgentID pgtype.UUID `json:"agent_id"`
Status pgtype.Text `json:"status"`
ConcurrencyPolicy pgtype.Text `json:"concurrency_policy"`
Variables []byte `json:"variables"`
}
func (q *Queries) UpdateAgentflow(ctx context.Context, arg UpdateAgentflowParams) (Agentflow, error) {
row := q.db.QueryRow(ctx, updateAgentflow,
arg.ID,
arg.Title,
arg.Description,
arg.AgentID,
arg.Status,
arg.ConcurrencyPolicy,
arg.Variables,
)
var i Agentflow
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.Title,
&i.Description,
&i.AgentID,
&i.Status,
&i.ConcurrencyPolicy,
&i.Variables,
&i.CreatedBy,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const updateAgentflowRunStatus = `-- name: UpdateAgentflowRunStatus :one
UPDATE agentflow_run SET
status = $2,
started_at = CASE WHEN $2 = 'executing' THEN now() ELSE started_at END,
completed_at = CASE WHEN $2 IN ('completed', 'failed', 'skipped', 'coalesced') THEN now() ELSE completed_at END
WHERE id = $1
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
`
type UpdateAgentflowRunStatusParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
}
func (q *Queries) UpdateAgentflowRunStatus(ctx context.Context, arg UpdateAgentflowRunStatusParams) (AgentflowRun, error) {
row := q.db.QueryRow(ctx, updateAgentflowRunStatus, arg.ID, arg.Status)
var i AgentflowRun
err := row.Scan(
&i.ID,
&i.AgentflowID,
&i.TriggerID,
&i.SourceKind,
&i.Status,
&i.LinkedIssueID,
&i.Payload,
&i.AgentOutput,
&i.StartedAt,
&i.CompletedAt,
&i.IdempotencyKey,
&i.CreatedAt,
)
return i, err
}
const updateAgentflowTrigger = `-- name: UpdateAgentflowTrigger :one
UPDATE agentflow_trigger SET
enabled = COALESCE($2, enabled),
cron_expression = COALESCE($3, cron_expression),
timezone = COALESCE($4, timezone),
next_run_at = COALESCE($5, next_run_at)
WHERE id = $1
RETURNING id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at
`
type UpdateAgentflowTriggerParams struct {
ID pgtype.UUID `json:"id"`
Enabled pgtype.Bool `json:"enabled"`
CronExpression pgtype.Text `json:"cron_expression"`
Timezone pgtype.Text `json:"timezone"`
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
}
func (q *Queries) UpdateAgentflowTrigger(ctx context.Context, arg UpdateAgentflowTriggerParams) (AgentflowTrigger, error) {
row := q.db.QueryRow(ctx, updateAgentflowTrigger,
arg.ID,
arg.Enabled,
arg.CronExpression,
arg.Timezone,
arg.NextRunAt,
)
var i AgentflowTrigger
err := row.Scan(
&i.ID,
&i.AgentflowID,
&i.Kind,
&i.Enabled,
&i.CronExpression,
&i.Timezone,
&i.NextRunAt,
&i.PublicID,
&i.SecretHash,
&i.SigningMode,
&i.LastFiredAt,
&i.CreatedAt,
)
return i, err
}

View File

@@ -79,6 +79,51 @@ type AgentTaskQueue struct {
SessionID pgtype.Text `json:"session_id"`
WorkDir pgtype.Text `json:"work_dir"`
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
AgentflowRunID pgtype.UUID `json:"agentflow_run_id"`
}
type Agentflow struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
Title string `json:"title"`
Description pgtype.Text `json:"description"`
AgentID pgtype.UUID `json:"agent_id"`
Status string `json:"status"`
ConcurrencyPolicy string `json:"concurrency_policy"`
Variables []byte `json:"variables"`
CreatedBy pgtype.UUID `json:"created_by"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type AgentflowRun struct {
ID pgtype.UUID `json:"id"`
AgentflowID pgtype.UUID `json:"agentflow_id"`
TriggerID pgtype.UUID `json:"trigger_id"`
SourceKind string `json:"source_kind"`
Status string `json:"status"`
LinkedIssueID pgtype.UUID `json:"linked_issue_id"`
Payload []byte `json:"payload"`
AgentOutput pgtype.Text `json:"agent_output"`
StartedAt pgtype.Timestamptz `json:"started_at"`
CompletedAt pgtype.Timestamptz `json:"completed_at"`
IdempotencyKey pgtype.Text `json:"idempotency_key"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type AgentflowTrigger struct {
ID pgtype.UUID `json:"id"`
AgentflowID pgtype.UUID `json:"agentflow_id"`
Kind string `json:"kind"`
Enabled bool `json:"enabled"`
CronExpression pgtype.Text `json:"cron_expression"`
Timezone pgtype.Text `json:"timezone"`
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
PublicID pgtype.Text `json:"public_id"`
SecretHash pgtype.Text `json:"secret_hash"`
SigningMode pgtype.Text `json:"signing_mode"`
LastFiredAt pgtype.Timestamptz `json:"last_fired_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type Attachment struct {

View File

@@ -0,0 +1,130 @@
-- name: CreateAgentflow :one
INSERT INTO agentflow (
workspace_id, title, description, agent_id, status,
concurrency_policy, variables, created_by
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
RETURNING *;
-- name: GetAgentflow :one
SELECT * FROM agentflow WHERE id = $1;
-- name: GetAgentflowInWorkspace :one
SELECT * FROM agentflow WHERE id = $1 AND workspace_id = $2;
-- name: ListAgentflows :many
SELECT * FROM agentflow
WHERE workspace_id = $1 AND status != 'archived'
ORDER BY created_at DESC;
-- name: ListAllAgentflows :many
SELECT * FROM agentflow
WHERE workspace_id = $1
ORDER BY created_at DESC;
-- name: UpdateAgentflow :one
UPDATE agentflow SET
title = COALESCE(sqlc.narg('title'), title),
description = COALESCE(sqlc.narg('description'), description),
agent_id = COALESCE(sqlc.narg('agent_id'), agent_id),
status = COALESCE(sqlc.narg('status'), status),
concurrency_policy = COALESCE(sqlc.narg('concurrency_policy'), concurrency_policy),
variables = COALESCE(sqlc.narg('variables'), variables),
updated_at = now()
WHERE id = $1
RETURNING *;
-- name: ArchiveAgentflow :one
UPDATE agentflow SET status = 'archived', updated_at = now()
WHERE id = $1
RETURNING *;
-- name: CreateAgentflowTrigger :one
INSERT INTO agentflow_trigger (
agentflow_id, kind, enabled,
cron_expression, timezone, next_run_at,
public_id, secret_hash, signing_mode
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING *;
-- name: GetAgentflowTrigger :one
SELECT * FROM agentflow_trigger WHERE id = $1;
-- name: ListAgentflowTriggers :many
SELECT * FROM agentflow_trigger
WHERE agentflow_id = $1
ORDER BY created_at ASC;
-- name: UpdateAgentflowTrigger :one
UPDATE agentflow_trigger SET
enabled = COALESCE(sqlc.narg('enabled'), enabled),
cron_expression = COALESCE(sqlc.narg('cron_expression'), cron_expression),
timezone = COALESCE(sqlc.narg('timezone'), timezone),
next_run_at = COALESCE(sqlc.narg('next_run_at'), next_run_at)
WHERE id = $1
RETURNING *;
-- name: DeleteAgentflowTrigger :exec
DELETE FROM agentflow_trigger WHERE id = $1;
-- name: ClaimDueScheduleTriggers :many
-- Atomically claims all schedule triggers that are due.
-- Uses CAS (compare-and-swap) on next_run_at to prevent double-firing
-- across multiple server instances.
UPDATE agentflow_trigger t SET
next_run_at = NULL, -- will be recalculated by caller
last_fired_at = now()
FROM agentflow a
WHERE t.agentflow_id = a.id
AND t.kind = 'schedule'
AND t.enabled = true
AND a.status = 'active'
AND t.next_run_at IS NOT NULL
AND t.next_run_at <= now()
RETURNING t.*, a.workspace_id, a.agent_id, a.title AS agentflow_title, a.description AS agentflow_description, a.concurrency_policy;
-- name: SetTriggerNextRunAt :exec
UPDATE agentflow_trigger SET next_run_at = $2
WHERE id = $1;
-- name: CreateAgentflowRun :one
INSERT INTO agentflow_run (
agentflow_id, trigger_id, source_kind, status, payload, idempotency_key
) VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *;
-- name: GetAgentflowRun :one
SELECT * FROM agentflow_run WHERE id = $1;
-- name: ListAgentflowRuns :many
SELECT * FROM agentflow_run
WHERE agentflow_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3;
-- name: UpdateAgentflowRunStatus :one
UPDATE agentflow_run SET
status = $2,
started_at = CASE WHEN $2 = 'executing' THEN now() ELSE started_at END,
completed_at = CASE WHEN $2 IN ('completed', 'failed', 'skipped', 'coalesced') THEN now() ELSE completed_at END
WHERE id = $1
RETURNING *;
-- name: CompleteAgentflowRun :one
UPDATE agentflow_run SET
status = $2,
agent_output = $3,
linked_issue_id = sqlc.narg('linked_issue_id'),
completed_at = now()
WHERE id = $1
RETURNING *;
-- name: HasActiveAgentflowRun :one
-- Check if agentflow has a run currently executing (for concurrency policy).
SELECT count(*) > 0 AS has_active FROM agentflow_run
WHERE agentflow_id = $1 AND status IN ('received', 'executing');
-- name: CreateAgentflowTask :one
-- Creates a task in the queue for an agentflow run (issue_id is NULL).
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, agentflow_run_id)
VALUES ($1, $2, NULL, 'queued', $3, $4)
RETURNING *;