mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
Compare commits
1 Commits
fix/sub-is
...
agent/eve/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee58915168 |
@@ -15,6 +15,7 @@ import {
|
||||
BookOpenText,
|
||||
SquarePen,
|
||||
CircleUser,
|
||||
Zap,
|
||||
} from "lucide-react";
|
||||
import { WorkspaceAvatar } from "@/features/workspace";
|
||||
import { useIssueDraftStore } from "@/features/issues/stores/draft-store";
|
||||
@@ -53,6 +54,7 @@ const primaryNav = [
|
||||
|
||||
const workspaceNav = [
|
||||
{ href: "/agents", label: "Agents", icon: Bot },
|
||||
{ href: "/agentflows", label: "Agentflows", icon: Zap },
|
||||
{ href: "/runtimes", label: "Runtimes", icon: Monitor },
|
||||
{ href: "/skills", label: "Skills", icon: BookOpenText },
|
||||
{ href: "/settings", label: "Settings", icon: Settings },
|
||||
|
||||
7
apps/web/app/(dashboard)/agentflows/page.tsx
Normal file
7
apps/web/app/(dashboard)/agentflows/page.tsx
Normal file
@@ -0,0 +1,7 @@
|
||||
"use client";
|
||||
|
||||
import { AgentflowsPage } from "@/features/agentflows";
|
||||
|
||||
export default function Page() {
|
||||
return <AgentflowsPage />;
|
||||
}
|
||||
453
apps/web/features/agentflows/components/agentflows-page.tsx
Normal file
453
apps/web/features/agentflows/components/agentflows-page.tsx
Normal file
@@ -0,0 +1,453 @@
|
||||
"use client";
|
||||
|
||||
import { useState, useEffect, useCallback } from "react";
|
||||
import { Plus, Play, Pause, Clock, Zap, Trash2, MoreHorizontal, Loader2 } from "lucide-react";
|
||||
import { Button } from "@/components/ui/button";
|
||||
import { Input } from "@/components/ui/input";
|
||||
import { Label } from "@/components/ui/label";
|
||||
import { Textarea } from "@/components/ui/textarea";
|
||||
import { Badge } from "@/components/ui/badge";
|
||||
import {
|
||||
Dialog,
|
||||
DialogContent,
|
||||
DialogHeader,
|
||||
DialogTitle,
|
||||
DialogFooter,
|
||||
} from "@/components/ui/dialog";
|
||||
import {
|
||||
DropdownMenu,
|
||||
DropdownMenuTrigger,
|
||||
DropdownMenuContent,
|
||||
DropdownMenuItem,
|
||||
} from "@/components/ui/dropdown-menu";
|
||||
import {
|
||||
Select,
|
||||
SelectContent,
|
||||
SelectItem,
|
||||
SelectTrigger,
|
||||
SelectValue,
|
||||
} from "@/components/ui/select";
|
||||
import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs";
|
||||
import { toast } from "sonner";
|
||||
import { api } from "@/shared/api";
|
||||
import { useWorkspaceStore } from "@/features/workspace";
|
||||
import { useAgentflowStore } from "../store";
|
||||
import type {
|
||||
Agentflow,
|
||||
AgentflowTrigger,
|
||||
AgentflowRun,
|
||||
AgentflowConcurrencyPolicy,
|
||||
} from "@/shared/types";
|
||||
|
||||
function formatDate(dateStr: string | null): string {
|
||||
if (!dateStr) return "—";
|
||||
return new Date(dateStr).toLocaleString();
|
||||
}
|
||||
|
||||
function StatusBadge({ status }: { status: string }) {
|
||||
const variants: Record<string, string> = {
|
||||
active: "bg-green-100 text-green-800",
|
||||
paused: "bg-yellow-100 text-yellow-800",
|
||||
archived: "bg-gray-100 text-gray-600",
|
||||
};
|
||||
return (
|
||||
<span className={`inline-flex items-center rounded-full px-2 py-0.5 text-xs font-medium ${variants[status] ?? "bg-gray-100 text-gray-600"}`}>
|
||||
{status}
|
||||
</span>
|
||||
);
|
||||
}
|
||||
|
||||
function RunStatusBadge({ status }: { status: string }) {
|
||||
const variants: Record<string, string> = {
|
||||
received: "bg-blue-100 text-blue-800",
|
||||
executing: "bg-yellow-100 text-yellow-800",
|
||||
completed: "bg-green-100 text-green-800",
|
||||
failed: "bg-red-100 text-red-800",
|
||||
skipped: "bg-gray-100 text-gray-600",
|
||||
coalesced: "bg-gray-100 text-gray-600",
|
||||
};
|
||||
return (
|
||||
<span className={`inline-flex items-center rounded-full px-2 py-0.5 text-xs font-medium ${variants[status] ?? "bg-gray-100 text-gray-600"}`}>
|
||||
{status}
|
||||
</span>
|
||||
);
|
||||
}
|
||||
|
||||
// ── Create Dialog ──────────────────────────────────────────────
|
||||
|
||||
function CreateAgentflowDialog({
|
||||
open,
|
||||
onClose,
|
||||
}: {
|
||||
open: boolean;
|
||||
onClose: () => void;
|
||||
}) {
|
||||
const agents = useWorkspaceStore((s) => s.agents);
|
||||
const addAgentflow = useAgentflowStore((s) => s.addAgentflow);
|
||||
|
||||
const [title, setTitle] = useState("");
|
||||
const [description, setDescription] = useState("");
|
||||
const [agentId, setAgentId] = useState("");
|
||||
const [concurrencyPolicy, setConcurrencyPolicy] = useState<AgentflowConcurrencyPolicy>("skip_if_active");
|
||||
const [cronExpression, setCronExpression] = useState("");
|
||||
const [timezone, setTimezone] = useState(Intl.DateTimeFormat().resolvedOptions().timeZone);
|
||||
const [creating, setCreating] = useState(false);
|
||||
|
||||
const handleCreate = async () => {
|
||||
if (!title.trim() || !agentId) return;
|
||||
setCreating(true);
|
||||
try {
|
||||
const af = await api.createAgentflow({
|
||||
title: title.trim(),
|
||||
description: description.trim() || undefined,
|
||||
agent_id: agentId,
|
||||
concurrency_policy: concurrencyPolicy,
|
||||
});
|
||||
addAgentflow(af);
|
||||
|
||||
// Create schedule trigger if cron was provided
|
||||
if (cronExpression.trim()) {
|
||||
await api.createAgentflowTrigger(af.id, {
|
||||
kind: "schedule",
|
||||
cron_expression: cronExpression.trim(),
|
||||
timezone,
|
||||
});
|
||||
}
|
||||
|
||||
toast.success("Agentflow created");
|
||||
onClose();
|
||||
setTitle("");
|
||||
setDescription("");
|
||||
setAgentId("");
|
||||
setCronExpression("");
|
||||
} catch {
|
||||
toast.error("Failed to create agentflow");
|
||||
} finally {
|
||||
setCreating(false);
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<Dialog open={open} onOpenChange={(v) => !v && onClose()}>
|
||||
<DialogContent className="sm:max-w-lg">
|
||||
<DialogHeader>
|
||||
<DialogTitle>Create Agentflow</DialogTitle>
|
||||
</DialogHeader>
|
||||
<div className="space-y-4 py-2">
|
||||
<div className="space-y-2">
|
||||
<Label>Title</Label>
|
||||
<Input
|
||||
placeholder="e.g. Daily SSL Check"
|
||||
value={title}
|
||||
onChange={(e) => setTitle(e.target.value)}
|
||||
/>
|
||||
</div>
|
||||
<div className="space-y-2">
|
||||
<Label>Agent</Label>
|
||||
<Select value={agentId} onValueChange={(v) => v && setAgentId(v)}>
|
||||
<SelectTrigger>
|
||||
<SelectValue placeholder="Select an agent" />
|
||||
</SelectTrigger>
|
||||
<SelectContent>
|
||||
{agents
|
||||
.filter((a) => !a.archived_at)
|
||||
.map((a) => (
|
||||
<SelectItem key={a.id} value={a.id}>
|
||||
{a.name}
|
||||
</SelectItem>
|
||||
))}
|
||||
</SelectContent>
|
||||
</Select>
|
||||
</div>
|
||||
<div className="space-y-2">
|
||||
<Label>Prompt / Instructions</Label>
|
||||
<Textarea
|
||||
placeholder="What should the agent do when this agentflow triggers?"
|
||||
rows={4}
|
||||
value={description}
|
||||
onChange={(e) => setDescription(e.target.value)}
|
||||
/>
|
||||
</div>
|
||||
<div className="space-y-2">
|
||||
<Label>Schedule (cron expression)</Label>
|
||||
<Input
|
||||
placeholder="e.g. 0 10 * * 1-5 (weekdays at 10am)"
|
||||
value={cronExpression}
|
||||
onChange={(e) => setCronExpression(e.target.value)}
|
||||
/>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
Standard 5-field cron format. Leave empty to trigger manually only.
|
||||
</p>
|
||||
</div>
|
||||
<div className="space-y-2">
|
||||
<Label>Timezone</Label>
|
||||
<Input
|
||||
value={timezone}
|
||||
onChange={(e) => setTimezone(e.target.value)}
|
||||
/>
|
||||
</div>
|
||||
<div className="space-y-2">
|
||||
<Label>Concurrency Policy</Label>
|
||||
<Select value={concurrencyPolicy} onValueChange={(v) => setConcurrencyPolicy(v as AgentflowConcurrencyPolicy)}>
|
||||
<SelectTrigger>
|
||||
<SelectValue />
|
||||
</SelectTrigger>
|
||||
<SelectContent>
|
||||
<SelectItem value="skip_if_active">Skip if active</SelectItem>
|
||||
<SelectItem value="coalesce">Coalesce</SelectItem>
|
||||
<SelectItem value="always_run">Always run</SelectItem>
|
||||
</SelectContent>
|
||||
</Select>
|
||||
</div>
|
||||
</div>
|
||||
<DialogFooter>
|
||||
<Button variant="outline" onClick={onClose}>Cancel</Button>
|
||||
<Button onClick={handleCreate} disabled={creating || !title.trim() || !agentId}>
|
||||
{creating ? <Loader2 className="h-4 w-4 animate-spin mr-2" /> : null}
|
||||
Create
|
||||
</Button>
|
||||
</DialogFooter>
|
||||
</DialogContent>
|
||||
</Dialog>
|
||||
);
|
||||
}
|
||||
|
||||
// ── Detail Panel ──────────────────────────────────────────────
|
||||
|
||||
function AgentflowDetail({ agentflow }: { agentflow: Agentflow }) {
|
||||
const agents = useWorkspaceStore((s) => s.agents);
|
||||
const { triggers, runs, fetchTriggers, fetchRuns, updateAgentflow } = useAgentflowStore();
|
||||
|
||||
const afTriggers = triggers[agentflow.id] ?? [];
|
||||
const afRuns = runs[agentflow.id] ?? [];
|
||||
const agent = agents.find((a) => a.id === agentflow.agent_id);
|
||||
|
||||
useEffect(() => {
|
||||
fetchTriggers(agentflow.id);
|
||||
fetchRuns(agentflow.id);
|
||||
}, [agentflow.id, fetchTriggers, fetchRuns]);
|
||||
|
||||
const handleRun = async () => {
|
||||
try {
|
||||
await api.runAgentflow(agentflow.id);
|
||||
toast.success("Agentflow triggered");
|
||||
fetchRuns(agentflow.id);
|
||||
} catch {
|
||||
toast.error("Failed to trigger agentflow");
|
||||
}
|
||||
};
|
||||
|
||||
const handleToggleStatus = async () => {
|
||||
const newStatus = agentflow.status === "active" ? "paused" : "active";
|
||||
try {
|
||||
const updated = await api.updateAgentflow(agentflow.id, { status: newStatus });
|
||||
updateAgentflow(agentflow.id, updated);
|
||||
toast.success(`Agentflow ${newStatus}`);
|
||||
} catch {
|
||||
toast.error("Failed to update status");
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<div className="flex flex-col h-full">
|
||||
<div className="flex items-center justify-between p-4 border-b">
|
||||
<div>
|
||||
<h2 className="text-lg font-semibold">{agentflow.title}</h2>
|
||||
<p className="text-sm text-muted-foreground">
|
||||
Agent: {agent?.name ?? "Unknown"} · <StatusBadge status={agentflow.status} />
|
||||
</p>
|
||||
</div>
|
||||
<div className="flex items-center gap-2">
|
||||
<Button variant="outline" size="sm" onClick={handleToggleStatus}>
|
||||
{agentflow.status === "active" ? (
|
||||
<><Pause className="h-4 w-4 mr-1" /> Pause</>
|
||||
) : (
|
||||
<><Play className="h-4 w-4 mr-1" /> Activate</>
|
||||
)}
|
||||
</Button>
|
||||
<Button size="sm" onClick={handleRun}>
|
||||
<Zap className="h-4 w-4 mr-1" /> Run Now
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<Tabs defaultValue="triggers" className="flex-1 overflow-hidden flex flex-col">
|
||||
<TabsList className="mx-4 mt-2">
|
||||
<TabsTrigger value="triggers">Triggers</TabsTrigger>
|
||||
<TabsTrigger value="runs">Runs</TabsTrigger>
|
||||
<TabsTrigger value="details">Details</TabsTrigger>
|
||||
</TabsList>
|
||||
|
||||
<TabsContent value="triggers" className="flex-1 overflow-auto p-4">
|
||||
{afTriggers.length === 0 ? (
|
||||
<p className="text-sm text-muted-foreground">No triggers configured. This agentflow can only be triggered manually.</p>
|
||||
) : (
|
||||
<div className="space-y-3">
|
||||
{afTriggers.map((t) => (
|
||||
<TriggerCard key={t.id} trigger={t} onRefresh={() => fetchTriggers(agentflow.id)} />
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
</TabsContent>
|
||||
|
||||
<TabsContent value="runs" className="flex-1 overflow-auto p-4">
|
||||
{afRuns.length === 0 ? (
|
||||
<p className="text-sm text-muted-foreground">No runs yet.</p>
|
||||
) : (
|
||||
<div className="space-y-2">
|
||||
{afRuns.map((r) => (
|
||||
<div key={r.id} className="flex items-center justify-between rounded-md border px-3 py-2 text-sm">
|
||||
<div className="flex items-center gap-2">
|
||||
<RunStatusBadge status={r.status} />
|
||||
<span className="text-muted-foreground">{r.source_kind}</span>
|
||||
</div>
|
||||
<div className="text-muted-foreground text-xs">
|
||||
{formatDate(r.created_at)}
|
||||
</div>
|
||||
</div>
|
||||
))}
|
||||
</div>
|
||||
)}
|
||||
</TabsContent>
|
||||
|
||||
<TabsContent value="details" className="flex-1 overflow-auto p-4">
|
||||
<div className="space-y-4">
|
||||
<div>
|
||||
<Label className="text-xs text-muted-foreground">Prompt / Instructions</Label>
|
||||
<p className="mt-1 text-sm whitespace-pre-wrap">{agentflow.description || "No description"}</p>
|
||||
</div>
|
||||
<div>
|
||||
<Label className="text-xs text-muted-foreground">Concurrency Policy</Label>
|
||||
<p className="mt-1 text-sm">{agentflow.concurrency_policy}</p>
|
||||
</div>
|
||||
<div>
|
||||
<Label className="text-xs text-muted-foreground">Created</Label>
|
||||
<p className="mt-1 text-sm">{formatDate(agentflow.created_at)}</p>
|
||||
</div>
|
||||
</div>
|
||||
</TabsContent>
|
||||
</Tabs>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
function TriggerCard({ trigger, onRefresh }: { trigger: AgentflowTrigger; onRefresh: () => void }) {
|
||||
const handleToggle = async () => {
|
||||
try {
|
||||
await api.updateAgentflowTrigger(trigger.id, { enabled: !trigger.enabled });
|
||||
onRefresh();
|
||||
} catch {
|
||||
toast.error("Failed to update trigger");
|
||||
}
|
||||
};
|
||||
|
||||
const handleDelete = async () => {
|
||||
try {
|
||||
await api.deleteAgentflowTrigger(trigger.id);
|
||||
toast.success("Trigger deleted");
|
||||
onRefresh();
|
||||
} catch {
|
||||
toast.error("Failed to delete trigger");
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<div className="flex items-center justify-between rounded-md border px-3 py-2">
|
||||
<div className="flex items-center gap-3">
|
||||
<Clock className="h-4 w-4 text-muted-foreground" />
|
||||
<div>
|
||||
<p className="text-sm font-medium">{trigger.kind}: {trigger.cron_expression ?? "—"}</p>
|
||||
<p className="text-xs text-muted-foreground">
|
||||
{trigger.timezone ?? "UTC"} · Next: {formatDate(trigger.next_run_at)} · {trigger.enabled ? "Enabled" : "Disabled"}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
<div className="flex items-center gap-1">
|
||||
<Button variant="ghost" size="sm" onClick={handleToggle}>
|
||||
{trigger.enabled ? "Disable" : "Enable"}
|
||||
</Button>
|
||||
<Button variant="ghost" size="sm" onClick={handleDelete}>
|
||||
<Trash2 className="h-4 w-4 text-destructive" />
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
// ── Main Page ──────────────────────────────────────────────────
|
||||
|
||||
export function AgentflowsPage() {
|
||||
const { agentflows, loading, activeAgentflowId, fetch, setActiveAgentflow } = useAgentflowStore();
|
||||
const agents = useWorkspaceStore((s) => s.agents);
|
||||
const [showCreate, setShowCreate] = useState(false);
|
||||
|
||||
useEffect(() => {
|
||||
fetch();
|
||||
}, [fetch]);
|
||||
|
||||
const activeAgentflow = agentflows.find((af) => af.id === activeAgentflowId) ?? null;
|
||||
|
||||
return (
|
||||
<div className="flex h-full">
|
||||
{/* List panel */}
|
||||
<div className="w-80 border-r flex flex-col">
|
||||
<div className="flex items-center justify-between p-4 border-b">
|
||||
<h1 className="text-lg font-semibold">Agentflows</h1>
|
||||
<Button size="sm" onClick={() => setShowCreate(true)}>
|
||||
<Plus className="h-4 w-4 mr-1" /> New
|
||||
</Button>
|
||||
</div>
|
||||
|
||||
<div className="flex-1 overflow-auto">
|
||||
{loading ? (
|
||||
<div className="flex items-center justify-center py-8">
|
||||
<Loader2 className="h-5 w-5 animate-spin text-muted-foreground" />
|
||||
</div>
|
||||
) : agentflows.length === 0 ? (
|
||||
<div className="flex flex-col items-center justify-center py-8 text-center px-4">
|
||||
<Zap className="h-8 w-8 text-muted-foreground mb-2" />
|
||||
<p className="text-sm text-muted-foreground">No agentflows yet</p>
|
||||
<p className="text-xs text-muted-foreground mt-1">Create one to schedule recurring agent tasks</p>
|
||||
</div>
|
||||
) : (
|
||||
<div className="divide-y">
|
||||
{agentflows.map((af) => {
|
||||
const agent = agents.find((a) => a.id === af.agent_id);
|
||||
return (
|
||||
<button
|
||||
key={af.id}
|
||||
onClick={() => setActiveAgentflow(af.id)}
|
||||
className={`w-full text-left px-4 py-3 hover:bg-accent/50 transition-colors ${
|
||||
activeAgentflowId === af.id ? "bg-accent" : ""
|
||||
}`}
|
||||
>
|
||||
<div className="flex items-center justify-between">
|
||||
<span className="text-sm font-medium truncate">{af.title}</span>
|
||||
<StatusBadge status={af.status} />
|
||||
</div>
|
||||
<p className="text-xs text-muted-foreground mt-0.5">
|
||||
{agent?.name ?? "Unknown agent"}
|
||||
</p>
|
||||
</button>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{/* Detail panel */}
|
||||
<div className="flex-1">
|
||||
{activeAgentflow ? (
|
||||
<AgentflowDetail agentflow={activeAgentflow} />
|
||||
) : (
|
||||
<div className="flex items-center justify-center h-full text-muted-foreground text-sm">
|
||||
Select an agentflow to view details
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
|
||||
<CreateAgentflowDialog open={showCreate} onClose={() => setShowCreate(false)} />
|
||||
</div>
|
||||
);
|
||||
}
|
||||
2
apps/web/features/agentflows/index.ts
Normal file
2
apps/web/features/agentflows/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export { useAgentflowStore } from "./store";
|
||||
export { AgentflowsPage } from "./components/agentflows-page";
|
||||
79
apps/web/features/agentflows/store.ts
Normal file
79
apps/web/features/agentflows/store.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
"use client";
|
||||
|
||||
import { create } from "zustand";
|
||||
import { api } from "@/shared/api";
|
||||
import type { Agentflow, AgentflowTrigger, AgentflowRun } from "@/shared/types";
|
||||
|
||||
interface AgentflowState {
|
||||
agentflows: Agentflow[];
|
||||
loading: boolean;
|
||||
activeAgentflowId: string | null;
|
||||
triggers: Record<string, AgentflowTrigger[]>;
|
||||
runs: Record<string, AgentflowRun[]>;
|
||||
|
||||
fetch: () => Promise<void>;
|
||||
setActiveAgentflow: (id: string | null) => void;
|
||||
addAgentflow: (af: Agentflow) => void;
|
||||
updateAgentflow: (id: string, updates: Partial<Agentflow>) => void;
|
||||
removeAgentflow: (id: string) => void;
|
||||
|
||||
fetchTriggers: (agentflowId: string) => Promise<void>;
|
||||
fetchRuns: (agentflowId: string) => Promise<void>;
|
||||
}
|
||||
|
||||
export const useAgentflowStore = create<AgentflowState>((set, get) => ({
|
||||
agentflows: [],
|
||||
loading: false,
|
||||
activeAgentflowId: null,
|
||||
triggers: {},
|
||||
runs: {},
|
||||
|
||||
fetch: async () => {
|
||||
set({ loading: true });
|
||||
try {
|
||||
const agentflows = await api.listAgentflows();
|
||||
set({ agentflows, loading: false });
|
||||
} catch {
|
||||
set({ loading: false });
|
||||
}
|
||||
},
|
||||
|
||||
setActiveAgentflow: (id) => set({ activeAgentflowId: id }),
|
||||
|
||||
addAgentflow: (af) =>
|
||||
set((state) => ({ agentflows: [af, ...state.agentflows] })),
|
||||
|
||||
updateAgentflow: (id, updates) =>
|
||||
set((state) => ({
|
||||
agentflows: state.agentflows.map((af) =>
|
||||
af.id === id ? { ...af, ...updates } : af
|
||||
),
|
||||
})),
|
||||
|
||||
removeAgentflow: (id) =>
|
||||
set((state) => ({
|
||||
agentflows: state.agentflows.filter((af) => af.id !== id),
|
||||
})),
|
||||
|
||||
fetchTriggers: async (agentflowId) => {
|
||||
try {
|
||||
const triggers = await api.listAgentflowTriggers(agentflowId);
|
||||
set((state) => ({
|
||||
triggers: { ...state.triggers, [agentflowId]: triggers },
|
||||
}));
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
},
|
||||
|
||||
fetchRuns: async (agentflowId) => {
|
||||
try {
|
||||
const runs = await api.listAgentflowRuns(agentflowId, { limit: 50 });
|
||||
set((state) => ({
|
||||
runs: { ...state.runs, [agentflowId]: runs },
|
||||
}));
|
||||
} catch {
|
||||
// ignore
|
||||
}
|
||||
},
|
||||
}));
|
||||
@@ -35,6 +35,13 @@ import type {
|
||||
TimelineEntry,
|
||||
TaskMessagePayload,
|
||||
Attachment,
|
||||
Agentflow,
|
||||
AgentflowTrigger,
|
||||
AgentflowRun,
|
||||
CreateAgentflowRequest,
|
||||
UpdateAgentflowRequest,
|
||||
CreateAgentflowTriggerRequest,
|
||||
UpdateAgentflowTriggerRequest,
|
||||
} from "@/shared/types";
|
||||
import { type Logger, noopLogger } from "@/shared/logger";
|
||||
|
||||
@@ -579,4 +586,69 @@ export class ApiClient {
|
||||
async deleteAttachment(id: string): Promise<void> {
|
||||
await this.fetch(`/api/attachments/${id}`, { method: "DELETE" });
|
||||
}
|
||||
|
||||
// ── Agentflows ──────────────────────────────────────────────────
|
||||
|
||||
async listAgentflows(): Promise<Agentflow[]> {
|
||||
return this.fetch("/api/agentflows");
|
||||
}
|
||||
|
||||
async getAgentflow(id: string): Promise<Agentflow> {
|
||||
return this.fetch(`/api/agentflows/${id}`);
|
||||
}
|
||||
|
||||
async createAgentflow(data: CreateAgentflowRequest): Promise<Agentflow> {
|
||||
return this.fetch("/api/agentflows", {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(data),
|
||||
});
|
||||
}
|
||||
|
||||
async updateAgentflow(id: string, data: UpdateAgentflowRequest): Promise<Agentflow> {
|
||||
return this.fetch(`/api/agentflows/${id}`, {
|
||||
method: "PUT",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(data),
|
||||
});
|
||||
}
|
||||
|
||||
async archiveAgentflow(id: string): Promise<Agentflow> {
|
||||
return this.fetch(`/api/agentflows/${id}/archive`, { method: "POST" });
|
||||
}
|
||||
|
||||
async listAgentflowTriggers(agentflowId: string): Promise<AgentflowTrigger[]> {
|
||||
return this.fetch(`/api/agentflows/${agentflowId}/triggers`);
|
||||
}
|
||||
|
||||
async createAgentflowTrigger(agentflowId: string, data: CreateAgentflowTriggerRequest): Promise<AgentflowTrigger> {
|
||||
return this.fetch(`/api/agentflows/${agentflowId}/triggers`, {
|
||||
method: "POST",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(data),
|
||||
});
|
||||
}
|
||||
|
||||
async updateAgentflowTrigger(triggerId: string, data: UpdateAgentflowTriggerRequest): Promise<AgentflowTrigger> {
|
||||
return this.fetch(`/api/agentflow-triggers/${triggerId}`, {
|
||||
method: "PUT",
|
||||
headers: { "Content-Type": "application/json" },
|
||||
body: JSON.stringify(data),
|
||||
});
|
||||
}
|
||||
|
||||
async deleteAgentflowTrigger(triggerId: string): Promise<void> {
|
||||
await this.fetch(`/api/agentflow-triggers/${triggerId}`, { method: "DELETE" });
|
||||
}
|
||||
|
||||
async listAgentflowRuns(agentflowId: string, params?: { limit?: number; offset?: number }): Promise<AgentflowRun[]> {
|
||||
const search = new URLSearchParams();
|
||||
if (params?.limit) search.set("limit", String(params.limit));
|
||||
if (params?.offset) search.set("offset", String(params.offset));
|
||||
return this.fetch(`/api/agentflows/${agentflowId}/runs?${search}`);
|
||||
}
|
||||
|
||||
async runAgentflow(id: string): Promise<AgentflowRun> {
|
||||
return this.fetch(`/api/agentflows/${id}/run`, { method: "POST" });
|
||||
}
|
||||
}
|
||||
|
||||
79
apps/web/shared/types/agentflow.ts
Normal file
79
apps/web/shared/types/agentflow.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
export type AgentflowStatus = "active" | "paused" | "archived";
|
||||
export type AgentflowConcurrencyPolicy = "skip_if_active" | "coalesce" | "always_run";
|
||||
export type AgentflowTriggerKind = "schedule" | "webhook" | "api";
|
||||
export type AgentflowRunStatus = "received" | "executing" | "completed" | "failed" | "skipped" | "coalesced";
|
||||
export type AgentflowRunSourceKind = "schedule" | "webhook" | "api" | "manual";
|
||||
|
||||
export interface Agentflow {
|
||||
id: string;
|
||||
workspace_id: string;
|
||||
title: string;
|
||||
description: string | null;
|
||||
agent_id: string;
|
||||
status: AgentflowStatus;
|
||||
concurrency_policy: AgentflowConcurrencyPolicy;
|
||||
variables: unknown[];
|
||||
created_by: string;
|
||||
created_at: string;
|
||||
updated_at: string;
|
||||
}
|
||||
|
||||
export interface AgentflowTrigger {
|
||||
id: string;
|
||||
agentflow_id: string;
|
||||
kind: AgentflowTriggerKind;
|
||||
enabled: boolean;
|
||||
cron_expression: string | null;
|
||||
timezone: string | null;
|
||||
next_run_at: string | null;
|
||||
public_id?: string | null;
|
||||
last_fired_at: string | null;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export interface AgentflowRun {
|
||||
id: string;
|
||||
agentflow_id: string;
|
||||
trigger_id: string | null;
|
||||
source_kind: AgentflowRunSourceKind;
|
||||
status: AgentflowRunStatus;
|
||||
linked_issue_id: string | null;
|
||||
payload: unknown | null;
|
||||
agent_output: string | null;
|
||||
started_at: string | null;
|
||||
completed_at: string | null;
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export interface CreateAgentflowRequest {
|
||||
title: string;
|
||||
description?: string;
|
||||
agent_id: string;
|
||||
status?: AgentflowStatus;
|
||||
concurrency_policy?: AgentflowConcurrencyPolicy;
|
||||
variables?: unknown[];
|
||||
}
|
||||
|
||||
export interface UpdateAgentflowRequest {
|
||||
title?: string;
|
||||
description?: string;
|
||||
agent_id?: string;
|
||||
status?: AgentflowStatus;
|
||||
concurrency_policy?: AgentflowConcurrencyPolicy;
|
||||
variables?: unknown[];
|
||||
}
|
||||
|
||||
export interface CreateAgentflowTriggerRequest {
|
||||
kind: AgentflowTriggerKind;
|
||||
enabled?: boolean;
|
||||
cron_expression?: string;
|
||||
timezone?: string;
|
||||
next_run_at?: string;
|
||||
}
|
||||
|
||||
export interface UpdateAgentflowTriggerRequest {
|
||||
enabled?: boolean;
|
||||
cron_expression?: string;
|
||||
timezone?: string;
|
||||
next_run_at?: string;
|
||||
}
|
||||
@@ -32,3 +32,17 @@ export type { IssueSubscriber } from "./subscriber";
|
||||
export type * from "./events";
|
||||
export type * from "./api";
|
||||
export type { Attachment } from "./attachment";
|
||||
export type {
|
||||
Agentflow,
|
||||
AgentflowStatus,
|
||||
AgentflowConcurrencyPolicy,
|
||||
AgentflowTrigger,
|
||||
AgentflowTriggerKind,
|
||||
AgentflowRun,
|
||||
AgentflowRunStatus,
|
||||
AgentflowRunSourceKind,
|
||||
CreateAgentflowRequest,
|
||||
UpdateAgentflowRequest,
|
||||
CreateAgentflowTriggerRequest,
|
||||
UpdateAgentflowTriggerRequest,
|
||||
} from "./agentflow";
|
||||
|
||||
156
server/cmd/server/agentflow_scheduler.go
Normal file
156
server/cmd/server/agentflow_scheduler.go
Normal file
@@ -0,0 +1,156 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/robfig/cron/v3"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/service"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
const (
|
||||
// agentflowScheduleInterval is how often we check for due triggers.
|
||||
agentflowScheduleInterval = 30 * time.Second
|
||||
)
|
||||
|
||||
// runAgentflowScheduler periodically checks for schedule triggers that are due
|
||||
// and dispatches agentflow runs. Modeled after runRuntimeSweeper.
|
||||
func runAgentflowScheduler(ctx context.Context, queries *db.Queries, bus *events.Bus) {
|
||||
taskSvc := service.NewTaskService(queries, nil, bus)
|
||||
ticker := time.NewTicker(agentflowScheduleInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
tickScheduledTriggers(ctx, queries, taskSvc)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// tickScheduledTriggers atomically claims all due schedule triggers and
|
||||
// dispatches agentflow runs for each.
|
||||
func tickScheduledTriggers(ctx context.Context, queries *db.Queries, taskSvc *service.TaskService) {
|
||||
dueTriggers, err := queries.ClaimDueScheduleTriggers(ctx)
|
||||
if err != nil {
|
||||
slog.Warn("agentflow scheduler: failed to claim due triggers", "error", err)
|
||||
return
|
||||
}
|
||||
if len(dueTriggers) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
slog.Info("agentflow scheduler: processing due triggers", "count", len(dueTriggers))
|
||||
|
||||
for _, t := range dueTriggers {
|
||||
dispatchScheduledTrigger(ctx, queries, taskSvc, t)
|
||||
}
|
||||
}
|
||||
|
||||
// dispatchScheduledTrigger creates an agentflow run, enqueues a task,
|
||||
// and recalculates the trigger's next_run_at.
|
||||
func dispatchScheduledTrigger(ctx context.Context, queries *db.Queries, taskSvc *service.TaskService, t db.ClaimDueScheduleTriggersRow) {
|
||||
afID := util.UUIDToString(t.AgentflowID)
|
||||
triggerID := util.UUIDToString(t.ID)
|
||||
|
||||
// Check concurrency policy
|
||||
if t.ConcurrencyPolicy == "skip_if_active" || t.ConcurrencyPolicy == "coalesce" {
|
||||
hasActive, err := queries.HasActiveAgentflowRun(ctx, t.AgentflowID)
|
||||
if err == nil && hasActive {
|
||||
status := "skipped"
|
||||
if t.ConcurrencyPolicy == "coalesce" {
|
||||
status = "coalesced"
|
||||
}
|
||||
slog.Info("agentflow scheduler: "+status+" due to concurrency policy",
|
||||
"agentflow_id", afID, "trigger_id", triggerID, "policy", t.ConcurrencyPolicy)
|
||||
|
||||
queries.CreateAgentflowRun(ctx, db.CreateAgentflowRunParams{
|
||||
AgentflowID: t.AgentflowID,
|
||||
TriggerID: t.ID,
|
||||
SourceKind: "schedule",
|
||||
Status: status,
|
||||
})
|
||||
// Still recalculate next_run_at
|
||||
recalcNextRun(ctx, queries, t)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Create the run
|
||||
run, err := queries.CreateAgentflowRun(ctx, db.CreateAgentflowRunParams{
|
||||
AgentflowID: t.AgentflowID,
|
||||
TriggerID: t.ID,
|
||||
SourceKind: "schedule",
|
||||
Status: "received",
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("agentflow scheduler: failed to create run", "agentflow_id", afID, "error", err)
|
||||
recalcNextRun(ctx, queries, t)
|
||||
return
|
||||
}
|
||||
|
||||
// Build a minimal Agentflow struct for the task service
|
||||
af := db.Agentflow{
|
||||
ID: t.AgentflowID,
|
||||
WorkspaceID: t.WorkspaceID,
|
||||
AgentID: t.AgentID,
|
||||
Title: t.AgentflowTitle,
|
||||
ConcurrencyPolicy: t.ConcurrencyPolicy,
|
||||
}
|
||||
if t.AgentflowDescription.Valid {
|
||||
af.Description = t.AgentflowDescription
|
||||
}
|
||||
|
||||
if err := taskSvc.EnqueueTaskForAgentflow(ctx, af, run); err != nil {
|
||||
slog.Error("agentflow scheduler: failed to enqueue task",
|
||||
"agentflow_id", afID, "run_id", util.UUIDToString(run.ID), "error", err)
|
||||
queries.UpdateAgentflowRunStatus(ctx, db.UpdateAgentflowRunStatusParams{
|
||||
ID: run.ID,
|
||||
Status: "failed",
|
||||
})
|
||||
}
|
||||
|
||||
// Recalculate next_run_at
|
||||
recalcNextRun(ctx, queries, t)
|
||||
}
|
||||
|
||||
// recalcNextRun parses the cron expression and sets the next run time.
|
||||
func recalcNextRun(ctx context.Context, queries *db.Queries, t db.ClaimDueScheduleTriggersRow) {
|
||||
if !t.CronExpression.Valid || t.CronExpression.String == "" {
|
||||
return
|
||||
}
|
||||
|
||||
// Parse timezone
|
||||
loc := time.UTC
|
||||
if t.Timezone.Valid && t.Timezone.String != "" {
|
||||
if parsed, err := time.LoadLocation(t.Timezone.String); err == nil {
|
||||
loc = parsed
|
||||
}
|
||||
}
|
||||
|
||||
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
|
||||
schedule, err := parser.Parse(t.CronExpression.String)
|
||||
if err != nil {
|
||||
slog.Error("agentflow scheduler: invalid cron expression",
|
||||
"trigger_id", util.UUIDToString(t.ID),
|
||||
"cron", t.CronExpression.String,
|
||||
"error", err)
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now().In(loc)
|
||||
next := schedule.Next(now)
|
||||
|
||||
queries.SetTriggerNextRunAt(ctx, db.SetTriggerNextRunAtParams{
|
||||
ID: t.ID,
|
||||
NextRunAt: pgtype.Timestamptz{Time: next, Valid: true},
|
||||
})
|
||||
}
|
||||
@@ -69,6 +69,9 @@ func main() {
|
||||
sweepCtx, sweepCancel := context.WithCancel(context.Background())
|
||||
go runRuntimeSweeper(sweepCtx, queries, bus)
|
||||
|
||||
// Start agentflow scheduler to dispatch due scheduled triggers.
|
||||
go runAgentflowScheduler(sweepCtx, queries, bus)
|
||||
|
||||
// Graceful shutdown
|
||||
go func() {
|
||||
slog.Info("server starting", "port", port)
|
||||
|
||||
@@ -220,6 +220,25 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
|
||||
})
|
||||
})
|
||||
|
||||
// Agentflows
|
||||
r.Route("/api/agentflows", func(r chi.Router) {
|
||||
r.Get("/", h.ListAgentflows)
|
||||
r.With(middleware.RequireWorkspaceRole(queries, "owner", "admin")).Post("/", h.CreateAgentflow)
|
||||
r.Route("/{id}", func(r chi.Router) {
|
||||
r.Get("/", h.GetAgentflow)
|
||||
r.Put("/", h.UpdateAgentflow)
|
||||
r.Post("/archive", h.ArchiveAgentflow)
|
||||
r.Get("/triggers", h.ListAgentflowTriggers)
|
||||
r.Post("/triggers", h.CreateAgentflowTrigger)
|
||||
r.Get("/runs", h.ListAgentflowRuns)
|
||||
r.Post("/run", h.RunAgentflow)
|
||||
})
|
||||
})
|
||||
r.Route("/api/agentflow-triggers/{triggerId}", func(r chi.Router) {
|
||||
r.Put("/", h.UpdateAgentflowTrigger)
|
||||
r.Delete("/", h.DeleteAgentflowTrigger)
|
||||
})
|
||||
|
||||
// Runtimes
|
||||
r.Route("/api/runtimes", func(r chi.Router) {
|
||||
r.Get("/", h.ListAgentRuntimes)
|
||||
|
||||
@@ -38,6 +38,7 @@ require (
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||
github.com/spf13/pflag v1.0.9 // indirect
|
||||
golang.org/x/sync v0.20.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
|
||||
@@ -66,6 +66,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/resend/resend-go/v2 v2.28.0 h1:ttM1/VZR4fApBv3xI1TneSKi1pbfFsVrq7fXFlHKtj4=
|
||||
github.com/resend/resend-go/v2 v2.28.0/go.mod h1:3YCb8c8+pLiqhtRFXTyFwlLvfjQtluxOr9HEh2BwCkQ=
|
||||
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
|
||||
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||
github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU=
|
||||
github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4=
|
||||
|
||||
@@ -728,7 +728,7 @@ func (d *Daemon) pollLoop(ctx context.Context) error {
|
||||
continue
|
||||
}
|
||||
if task != nil {
|
||||
d.logger.Info("task received", "task", shortID(task.ID), "issue", task.IssueID)
|
||||
d.logger.Info("task received", "task", shortID(task.ID), "issue", task.EffectiveIssueID())
|
||||
wg.Add(1)
|
||||
go func(t Task) {
|
||||
defer wg.Done()
|
||||
@@ -772,7 +772,7 @@ func (d *Daemon) handleTask(ctx context.Context, task Task) {
|
||||
if task.Agent != nil {
|
||||
agentName = task.Agent.Name
|
||||
}
|
||||
taskLog.Info("picked task", "issue", task.IssueID, "agent", agentName, "provider", provider)
|
||||
taskLog.Info("picked task", "issue", task.EffectiveIssueID(), "agent", agentName, "provider", provider, "agentflow", task.IsAgentflow())
|
||||
|
||||
if err := d.client.StartTask(ctx, task.ID); err != nil {
|
||||
taskLog.Error("start task failed", "error", err)
|
||||
@@ -872,13 +872,20 @@ func (d *Daemon) runTask(ctx context.Context, task Task, provider string, taskLo
|
||||
// Repos are passed as metadata only — the agent checks them out on demand
|
||||
// via `multica repo checkout <url>`.
|
||||
taskCtx := execenv.TaskContextForEnv{
|
||||
IssueID: task.IssueID,
|
||||
IssueID: task.EffectiveIssueID(),
|
||||
TriggerCommentID: task.TriggerCommentID,
|
||||
AgentName: agentName,
|
||||
AgentInstructions: instructions,
|
||||
AgentSkills: convertSkillsForEnv(skills),
|
||||
Repos: convertReposForEnv(task.Repos),
|
||||
}
|
||||
if task.IsAgentflow() {
|
||||
taskCtx.IsAgentflow = true
|
||||
taskCtx.AgentflowTitle = task.Agentflow.Title
|
||||
if task.Agentflow.Description != nil {
|
||||
taskCtx.AgentflowDescription = *task.Agentflow.Description
|
||||
}
|
||||
}
|
||||
|
||||
// Try to reuse the workdir from a previous task on the same (agent, issue) pair.
|
||||
var env *execenv.Environment
|
||||
|
||||
@@ -23,7 +23,7 @@ func TestBuildPromptContainsIssueID(t *testing.T) {
|
||||
|
||||
issueID := "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
|
||||
prompt := BuildPrompt(Task{
|
||||
IssueID: issueID,
|
||||
IssueID: &issueID,
|
||||
Agent: &AgentData{
|
||||
Name: "Local Codex",
|
||||
Skills: []SkillData{
|
||||
@@ -53,8 +53,9 @@ func TestBuildPromptContainsIssueID(t *testing.T) {
|
||||
func TestBuildPromptNoIssueDetails(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
testID := "test-id"
|
||||
prompt := BuildPrompt(Task{
|
||||
IssueID: "test-id",
|
||||
IssueID: &testID,
|
||||
Agent: &AgentData{Name: "Test"},
|
||||
})
|
||||
|
||||
|
||||
@@ -114,18 +114,29 @@ func writeSkillFiles(skillsDir string, skills []SkillContextForEnv) error {
|
||||
func renderIssueContext(provider string, ctx TaskContextForEnv) string {
|
||||
var b strings.Builder
|
||||
|
||||
b.WriteString("# Task Assignment\n\n")
|
||||
fmt.Fprintf(&b, "**Issue ID:** %s\n\n", ctx.IssueID)
|
||||
|
||||
if ctx.TriggerCommentID != "" {
|
||||
b.WriteString("**Trigger:** Comment Reply\n")
|
||||
b.WriteString("**Triggering comment ID:** `" + ctx.TriggerCommentID + "`\n\n")
|
||||
if ctx.IsAgentflow {
|
||||
b.WriteString("# Agentflow Task\n\n")
|
||||
fmt.Fprintf(&b, "**Agentflow:** %s\n\n", ctx.AgentflowTitle)
|
||||
if ctx.AgentflowDescription != "" {
|
||||
b.WriteString("## Instructions\n\n")
|
||||
b.WriteString(ctx.AgentflowDescription)
|
||||
b.WriteString("\n\n")
|
||||
}
|
||||
b.WriteString("Execute the instructions above. If the work requires tracking, create an issue using `multica issue create`.\n\n")
|
||||
} else {
|
||||
b.WriteString("**Trigger:** New Assignment\n\n")
|
||||
}
|
||||
b.WriteString("# Task Assignment\n\n")
|
||||
fmt.Fprintf(&b, "**Issue ID:** %s\n\n", ctx.IssueID)
|
||||
|
||||
b.WriteString("## Quick Start\n\n")
|
||||
fmt.Fprintf(&b, "Run `multica issue get %s --output json` to fetch the full issue details.\n\n", ctx.IssueID)
|
||||
if ctx.TriggerCommentID != "" {
|
||||
b.WriteString("**Trigger:** Comment Reply\n")
|
||||
b.WriteString("**Triggering comment ID:** `" + ctx.TriggerCommentID + "`\n\n")
|
||||
} else {
|
||||
b.WriteString("**Trigger:** New Assignment\n\n")
|
||||
}
|
||||
|
||||
b.WriteString("## Quick Start\n\n")
|
||||
fmt.Fprintf(&b, "Run `multica issue get %s --output json` to fetch the full issue details.\n\n", ctx.IssueID)
|
||||
}
|
||||
|
||||
if len(ctx.AgentSkills) > 0 {
|
||||
b.WriteString("## Agent Skills\n\n")
|
||||
|
||||
@@ -34,6 +34,11 @@ type TaskContextForEnv struct {
|
||||
AgentInstructions string // agent identity/persona instructions, injected into CLAUDE.md
|
||||
AgentSkills []SkillContextForEnv
|
||||
Repos []RepoContextForEnv // workspace repos available for checkout
|
||||
|
||||
// Agentflow fields (set when task is triggered by an agentflow)
|
||||
IsAgentflow bool
|
||||
AgentflowTitle string
|
||||
AgentflowDescription string // prompt template
|
||||
}
|
||||
|
||||
// SkillContextForEnv represents a skill to be written into the execution environment.
|
||||
|
||||
@@ -78,7 +78,19 @@ func buildMetaSkillContent(provider string, ctx TaskContextForEnv) string {
|
||||
|
||||
b.WriteString("### Workflow\n\n")
|
||||
|
||||
if ctx.TriggerCommentID != "" {
|
||||
if ctx.IsAgentflow {
|
||||
// Agentflow-triggered: execute the prompt template
|
||||
fmt.Fprintf(&b, "**This task was triggered by Agentflow: %s**\n\n", ctx.AgentflowTitle)
|
||||
if ctx.AgentflowDescription != "" {
|
||||
b.WriteString("## Instructions\n\n")
|
||||
b.WriteString(ctx.AgentflowDescription)
|
||||
b.WriteString("\n\n")
|
||||
}
|
||||
b.WriteString("Execute the instructions above.\n")
|
||||
b.WriteString("- If the work requires tracking, create an issue using `multica issue create --title \"...\" --description \"...\"`\n")
|
||||
b.WriteString("- If the work involves code changes, use `multica repo checkout <url>` to check out a repository\n")
|
||||
b.WriteString("- Post results as appropriate (comments on related issues, new issues, etc.)\n\n")
|
||||
} else if ctx.TriggerCommentID != "" {
|
||||
// Comment-triggered: focus on reading and replying
|
||||
b.WriteString("**This task was triggered by a comment.** Your primary job is to respond.\n\n")
|
||||
fmt.Fprintf(&b, "1. Run `multica issue get %s --output json` to understand the issue context\n", ctx.IssueID)
|
||||
|
||||
@@ -11,7 +11,21 @@ import (
|
||||
func BuildPrompt(task Task) string {
|
||||
var b strings.Builder
|
||||
b.WriteString("You are running as a local coding agent for a Multica workspace.\n\n")
|
||||
fmt.Fprintf(&b, "Your assigned issue ID is: %s\n\n", task.IssueID)
|
||||
fmt.Fprintf(&b, "Start by running `multica issue get %s --output json` to understand your task, then complete it.\n", task.IssueID)
|
||||
|
||||
if task.IsAgentflow() {
|
||||
af := task.Agentflow
|
||||
fmt.Fprintf(&b, "This task was triggered by Agentflow: **%s**\n\n", af.Title)
|
||||
if af.Description != nil && *af.Description != "" {
|
||||
b.WriteString("## Instructions\n\n")
|
||||
b.WriteString(*af.Description)
|
||||
b.WriteString("\n\n")
|
||||
}
|
||||
b.WriteString("Execute the instructions above. If the work requires tracking, create an issue using `multica issue create`.\n")
|
||||
} else {
|
||||
issueID := task.EffectiveIssueID()
|
||||
fmt.Fprintf(&b, "Your assigned issue ID is: %s\n\n", issueID)
|
||||
fmt.Fprintf(&b, "Start by running `multica issue get %s --output json` to understand your task, then complete it.\n", issueID)
|
||||
}
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
@@ -26,13 +26,37 @@ type Task struct {
|
||||
ID string `json:"id"`
|
||||
AgentID string `json:"agent_id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
IssueID string `json:"issue_id"`
|
||||
IssueID *string `json:"issue_id"` // nil for agentflow tasks
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
Agent *AgentData `json:"agent,omitempty"`
|
||||
Repos []RepoData `json:"repos,omitempty"`
|
||||
PriorSessionID string `json:"prior_session_id,omitempty"` // Claude session ID from a previous task on this issue
|
||||
PriorWorkDir string `json:"prior_work_dir,omitempty"` // work_dir from a previous task on this issue
|
||||
TriggerCommentID string `json:"trigger_comment_id,omitempty"` // comment that triggered this task
|
||||
PriorSessionID string `json:"prior_session_id,omitempty"`
|
||||
PriorWorkDir string `json:"prior_work_dir,omitempty"`
|
||||
TriggerCommentID string `json:"trigger_comment_id,omitempty"`
|
||||
AgentflowRunID *string `json:"agentflow_run_id,omitempty"`
|
||||
Agentflow *AgentflowData `json:"agentflow,omitempty"`
|
||||
}
|
||||
|
||||
// AgentflowData holds agentflow context for agentflow-triggered tasks.
|
||||
type AgentflowData struct {
|
||||
ID string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Description *string `json:"description"` // prompt template
|
||||
RunID string `json:"run_id"`
|
||||
SourceKind string `json:"source_kind"`
|
||||
}
|
||||
|
||||
// IsAgentflow returns true if this task was triggered by an agentflow.
|
||||
func (t Task) IsAgentflow() bool {
|
||||
return t.Agentflow != nil
|
||||
}
|
||||
|
||||
// EffectiveIssueID returns the issue ID or empty string if nil.
|
||||
func (t Task) EffectiveIssueID() string {
|
||||
if t.IssueID != nil {
|
||||
return *t.IssueID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// AgentData holds agent details returned by the claim endpoint.
|
||||
|
||||
@@ -96,7 +96,7 @@ type AgentTaskResponse struct {
|
||||
ID string `json:"id"`
|
||||
AgentID string `json:"agent_id"`
|
||||
RuntimeID string `json:"runtime_id"`
|
||||
IssueID string `json:"issue_id"`
|
||||
IssueID *string `json:"issue_id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
Status string `json:"status"`
|
||||
Priority int32 `json:"priority"`
|
||||
@@ -111,6 +111,18 @@ type AgentTaskResponse struct {
|
||||
PriorSessionID string `json:"prior_session_id,omitempty"` // session ID from a previous task on same issue
|
||||
PriorWorkDir string `json:"prior_work_dir,omitempty"` // work_dir from a previous task on same issue
|
||||
TriggerCommentID *string `json:"trigger_comment_id,omitempty"` // comment that triggered this task
|
||||
AgentflowRunID *string `json:"agentflow_run_id,omitempty"` // agentflow run that triggered this task
|
||||
Agentflow *TaskAgentflowData `json:"agentflow,omitempty"` // agentflow context for agentflow-triggered tasks
|
||||
}
|
||||
|
||||
// TaskAgentflowData holds agentflow info included in claim responses so the
|
||||
// daemon can build the prompt for agentflow-triggered tasks.
|
||||
type TaskAgentflowData struct {
|
||||
ID string `json:"id"`
|
||||
Title string `json:"title"`
|
||||
Description *string `json:"description"` // prompt template
|
||||
RunID string `json:"run_id"`
|
||||
SourceKind string `json:"source_kind"`
|
||||
}
|
||||
|
||||
// TaskAgentData holds agent info included in claim responses so the daemon
|
||||
@@ -128,19 +140,20 @@ func taskToResponse(t db.AgentTaskQueue) AgentTaskResponse {
|
||||
json.Unmarshal(t.Result, &result)
|
||||
}
|
||||
return AgentTaskResponse{
|
||||
ID: uuidToString(t.ID),
|
||||
AgentID: uuidToString(t.AgentID),
|
||||
RuntimeID: uuidToString(t.RuntimeID),
|
||||
IssueID: uuidToString(t.IssueID),
|
||||
Status: t.Status,
|
||||
Priority: t.Priority,
|
||||
DispatchedAt: timestampToPtr(t.DispatchedAt),
|
||||
StartedAt: timestampToPtr(t.StartedAt),
|
||||
CompletedAt: timestampToPtr(t.CompletedAt),
|
||||
Result: result,
|
||||
ID: uuidToString(t.ID),
|
||||
AgentID: uuidToString(t.AgentID),
|
||||
RuntimeID: uuidToString(t.RuntimeID),
|
||||
IssueID: uuidToPtr(t.IssueID),
|
||||
Status: t.Status,
|
||||
Priority: t.Priority,
|
||||
DispatchedAt: timestampToPtr(t.DispatchedAt),
|
||||
StartedAt: timestampToPtr(t.StartedAt),
|
||||
CompletedAt: timestampToPtr(t.CompletedAt),
|
||||
Result: result,
|
||||
Error: textToPtr(t.Error),
|
||||
CreatedAt: timestampToString(t.CreatedAt),
|
||||
TriggerCommentID: uuidToPtr(t.TriggerCommentID),
|
||||
AgentflowRunID: uuidToPtr(t.AgentflowRunID),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
484
server/internal/handler/agentflow.go
Normal file
484
server/internal/handler/agentflow.go
Normal file
@@ -0,0 +1,484 @@
|
||||
package handler
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"strconv"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// ── Response types ──────────────────────────────────────────────────────
|
||||
|
||||
type AgentflowResponse struct {
|
||||
ID string `json:"id"`
|
||||
WorkspaceID string `json:"workspace_id"`
|
||||
Title string `json:"title"`
|
||||
Description *string `json:"description"`
|
||||
AgentID string `json:"agent_id"`
|
||||
Status string `json:"status"`
|
||||
ConcurrencyPolicy string `json:"concurrency_policy"`
|
||||
Variables json.RawMessage `json:"variables"`
|
||||
CreatedBy string `json:"created_by"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
UpdatedAt string `json:"updated_at"`
|
||||
}
|
||||
|
||||
type AgentflowTriggerResponse struct {
|
||||
ID string `json:"id"`
|
||||
AgentflowID string `json:"agentflow_id"`
|
||||
Kind string `json:"kind"`
|
||||
Enabled bool `json:"enabled"`
|
||||
CronExpression *string `json:"cron_expression"`
|
||||
Timezone *string `json:"timezone"`
|
||||
NextRunAt *string `json:"next_run_at"`
|
||||
PublicID *string `json:"public_id,omitempty"`
|
||||
LastFiredAt *string `json:"last_fired_at"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
type AgentflowRunResponse struct {
|
||||
ID string `json:"id"`
|
||||
AgentflowID string `json:"agentflow_id"`
|
||||
TriggerID *string `json:"trigger_id"`
|
||||
SourceKind string `json:"source_kind"`
|
||||
Status string `json:"status"`
|
||||
LinkedIssueID *string `json:"linked_issue_id"`
|
||||
Payload json.RawMessage `json:"payload"`
|
||||
AgentOutput *string `json:"agent_output"`
|
||||
StartedAt *string `json:"started_at"`
|
||||
CompletedAt *string `json:"completed_at"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// ── Converters ──────────────────────────────────────────────────────────
|
||||
|
||||
func agentflowToResponse(a db.Agentflow) AgentflowResponse {
|
||||
vars := json.RawMessage(a.Variables)
|
||||
if len(vars) == 0 {
|
||||
vars = json.RawMessage("[]")
|
||||
}
|
||||
return AgentflowResponse{
|
||||
ID: uuidToString(a.ID),
|
||||
WorkspaceID: uuidToString(a.WorkspaceID),
|
||||
Title: a.Title,
|
||||
Description: textToPtr(a.Description),
|
||||
AgentID: uuidToString(a.AgentID),
|
||||
Status: a.Status,
|
||||
ConcurrencyPolicy: a.ConcurrencyPolicy,
|
||||
Variables: vars,
|
||||
CreatedBy: uuidToString(a.CreatedBy),
|
||||
CreatedAt: timestampToString(a.CreatedAt),
|
||||
UpdatedAt: timestampToString(a.UpdatedAt),
|
||||
}
|
||||
}
|
||||
|
||||
func triggerToResponse(t db.AgentflowTrigger) AgentflowTriggerResponse {
|
||||
return AgentflowTriggerResponse{
|
||||
ID: uuidToString(t.ID),
|
||||
AgentflowID: uuidToString(t.AgentflowID),
|
||||
Kind: t.Kind,
|
||||
Enabled: t.Enabled,
|
||||
CronExpression: textToPtr(t.CronExpression),
|
||||
Timezone: textToPtr(t.Timezone),
|
||||
NextRunAt: timestampToPtr(t.NextRunAt),
|
||||
PublicID: textToPtr(t.PublicID),
|
||||
LastFiredAt: timestampToPtr(t.LastFiredAt),
|
||||
CreatedAt: timestampToString(t.CreatedAt),
|
||||
}
|
||||
}
|
||||
|
||||
func runToResponse(r db.AgentflowRun) AgentflowRunResponse {
|
||||
payload := json.RawMessage(r.Payload)
|
||||
if len(payload) == 0 {
|
||||
payload = nil
|
||||
}
|
||||
return AgentflowRunResponse{
|
||||
ID: uuidToString(r.ID),
|
||||
AgentflowID: uuidToString(r.AgentflowID),
|
||||
TriggerID: uuidToPtr(r.TriggerID),
|
||||
SourceKind: r.SourceKind,
|
||||
Status: r.Status,
|
||||
LinkedIssueID: uuidToPtr(r.LinkedIssueID),
|
||||
Payload: payload,
|
||||
AgentOutput: textToPtr(r.AgentOutput),
|
||||
StartedAt: timestampToPtr(r.StartedAt),
|
||||
CompletedAt: timestampToPtr(r.CompletedAt),
|
||||
CreatedAt: timestampToString(r.CreatedAt),
|
||||
}
|
||||
}
|
||||
|
||||
// ── Agentflow CRUD ──────────────────────────────────────────────────────
|
||||
|
||||
type CreateAgentflowRequest struct {
|
||||
Title string `json:"title"`
|
||||
Description *string `json:"description"`
|
||||
AgentID string `json:"agent_id"`
|
||||
Status string `json:"status"`
|
||||
ConcurrencyPolicy string `json:"concurrency_policy"`
|
||||
Variables json.RawMessage `json:"variables"`
|
||||
}
|
||||
|
||||
func (h *Handler) CreateAgentflow(w http.ResponseWriter, r *http.Request) {
|
||||
var req CreateAgentflowRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
if req.Title == "" {
|
||||
writeError(w, http.StatusBadRequest, "title is required")
|
||||
return
|
||||
}
|
||||
if req.AgentID == "" {
|
||||
writeError(w, http.StatusBadRequest, "agent_id is required")
|
||||
return
|
||||
}
|
||||
|
||||
userID, ok := requireUserID(w, r)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
workspaceID := resolveWorkspaceID(r)
|
||||
|
||||
status := req.Status
|
||||
if status == "" {
|
||||
status = "active"
|
||||
}
|
||||
policy := req.ConcurrencyPolicy
|
||||
if policy == "" {
|
||||
policy = "skip_if_active"
|
||||
}
|
||||
vars := req.Variables
|
||||
if len(vars) == 0 {
|
||||
vars = json.RawMessage("[]")
|
||||
}
|
||||
|
||||
af, err := h.Queries.CreateAgentflow(r.Context(), db.CreateAgentflowParams{
|
||||
WorkspaceID: parseUUID(workspaceID),
|
||||
Title: req.Title,
|
||||
Description: ptrToText(req.Description),
|
||||
AgentID: parseUUID(req.AgentID),
|
||||
Status: status,
|
||||
ConcurrencyPolicy: policy,
|
||||
Variables: vars,
|
||||
CreatedBy: parseUUID(userID),
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to create agentflow")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, agentflowToResponse(af))
|
||||
}
|
||||
|
||||
func (h *Handler) ListAgentflows(w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := resolveWorkspaceID(r)
|
||||
|
||||
flows, err := h.Queries.ListAgentflows(r.Context(), parseUUID(workspaceID))
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list agentflows")
|
||||
return
|
||||
}
|
||||
|
||||
resp := make([]AgentflowResponse, len(flows))
|
||||
for i, f := range flows {
|
||||
resp[i] = agentflowToResponse(f)
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func (h *Handler) GetAgentflow(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
workspaceID := resolveWorkspaceID(r)
|
||||
|
||||
af, err := h.Queries.GetAgentflowInWorkspace(r.Context(), db.GetAgentflowInWorkspaceParams{
|
||||
ID: parseUUID(id),
|
||||
WorkspaceID: parseUUID(workspaceID),
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "agentflow not found")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, agentflowToResponse(af))
|
||||
}
|
||||
|
||||
type UpdateAgentflowRequest struct {
|
||||
Title *string `json:"title"`
|
||||
Description *string `json:"description"`
|
||||
AgentID *string `json:"agent_id"`
|
||||
Status *string `json:"status"`
|
||||
ConcurrencyPolicy *string `json:"concurrency_policy"`
|
||||
Variables json.RawMessage `json:"variables"`
|
||||
}
|
||||
|
||||
func (h *Handler) UpdateAgentflow(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
|
||||
var req UpdateAgentflowRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
params := db.UpdateAgentflowParams{ID: parseUUID(id)}
|
||||
if req.Title != nil {
|
||||
params.Title = ptrToText(req.Title)
|
||||
}
|
||||
if req.Description != nil {
|
||||
params.Description = ptrToText(req.Description)
|
||||
}
|
||||
if req.AgentID != nil {
|
||||
params.AgentID = parseUUID(*req.AgentID)
|
||||
}
|
||||
if req.Status != nil {
|
||||
params.Status = ptrToText(req.Status)
|
||||
}
|
||||
if req.ConcurrencyPolicy != nil {
|
||||
params.ConcurrencyPolicy = ptrToText(req.ConcurrencyPolicy)
|
||||
}
|
||||
if len(req.Variables) > 0 {
|
||||
params.Variables = req.Variables
|
||||
}
|
||||
|
||||
af, err := h.Queries.UpdateAgentflow(r.Context(), params)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to update agentflow")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, agentflowToResponse(af))
|
||||
}
|
||||
|
||||
func (h *Handler) ArchiveAgentflow(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
|
||||
af, err := h.Queries.ArchiveAgentflow(r.Context(), parseUUID(id))
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to archive agentflow")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, agentflowToResponse(af))
|
||||
}
|
||||
|
||||
// ── Triggers ────────────────────────────────────────────────────────────
|
||||
|
||||
type CreateTriggerRequest struct {
|
||||
Kind string `json:"kind"`
|
||||
Enabled *bool `json:"enabled"`
|
||||
CronExpression *string `json:"cron_expression"`
|
||||
Timezone *string `json:"timezone"`
|
||||
NextRunAt *string `json:"next_run_at"`
|
||||
}
|
||||
|
||||
func (h *Handler) CreateAgentflowTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
agentflowID := chi.URLParam(r, "id")
|
||||
|
||||
var req CreateTriggerRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
if req.Kind == "" {
|
||||
writeError(w, http.StatusBadRequest, "kind is required")
|
||||
return
|
||||
}
|
||||
|
||||
enabled := true
|
||||
if req.Enabled != nil {
|
||||
enabled = *req.Enabled
|
||||
}
|
||||
|
||||
var nextRunAt pgtype.Timestamptz
|
||||
if req.NextRunAt != nil {
|
||||
nextRunAt = parseTimestamp(*req.NextRunAt)
|
||||
}
|
||||
|
||||
t, err := h.Queries.CreateAgentflowTrigger(r.Context(), db.CreateAgentflowTriggerParams{
|
||||
AgentflowID: parseUUID(agentflowID),
|
||||
Kind: req.Kind,
|
||||
Enabled: enabled,
|
||||
CronExpression: ptrToText(req.CronExpression),
|
||||
Timezone: ptrToText(req.Timezone),
|
||||
NextRunAt: nextRunAt,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to create trigger")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, triggerToResponse(t))
|
||||
}
|
||||
|
||||
func (h *Handler) ListAgentflowTriggers(w http.ResponseWriter, r *http.Request) {
|
||||
agentflowID := chi.URLParam(r, "id")
|
||||
|
||||
triggers, err := h.Queries.ListAgentflowTriggers(r.Context(), parseUUID(agentflowID))
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list triggers")
|
||||
return
|
||||
}
|
||||
|
||||
resp := make([]AgentflowTriggerResponse, len(triggers))
|
||||
for i, t := range triggers {
|
||||
resp[i] = triggerToResponse(t)
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
type UpdateTriggerRequest struct {
|
||||
Enabled *bool `json:"enabled"`
|
||||
CronExpression *string `json:"cron_expression"`
|
||||
Timezone *string `json:"timezone"`
|
||||
NextRunAt *string `json:"next_run_at"`
|
||||
}
|
||||
|
||||
func (h *Handler) UpdateAgentflowTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
triggerID := chi.URLParam(r, "triggerId")
|
||||
|
||||
var req UpdateTriggerRequest
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
writeError(w, http.StatusBadRequest, "invalid request body")
|
||||
return
|
||||
}
|
||||
|
||||
params := db.UpdateAgentflowTriggerParams{ID: parseUUID(triggerID)}
|
||||
if req.Enabled != nil {
|
||||
params.Enabled = pgtype.Bool{Bool: *req.Enabled, Valid: true}
|
||||
}
|
||||
if req.CronExpression != nil {
|
||||
params.CronExpression = ptrToText(req.CronExpression)
|
||||
}
|
||||
if req.Timezone != nil {
|
||||
params.Timezone = ptrToText(req.Timezone)
|
||||
}
|
||||
if req.NextRunAt != nil {
|
||||
params.NextRunAt = parseTimestamp(*req.NextRunAt)
|
||||
}
|
||||
|
||||
t, err := h.Queries.UpdateAgentflowTrigger(r.Context(), params)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to update trigger")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, triggerToResponse(t))
|
||||
}
|
||||
|
||||
func (h *Handler) DeleteAgentflowTrigger(w http.ResponseWriter, r *http.Request) {
|
||||
triggerID := chi.URLParam(r, "triggerId")
|
||||
|
||||
err := h.Queries.DeleteAgentflowTrigger(r.Context(), parseUUID(triggerID))
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to delete trigger")
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
// ── Runs ────────────────────────────────────────────────────────────────
|
||||
|
||||
func (h *Handler) ListAgentflowRuns(w http.ResponseWriter, r *http.Request) {
|
||||
agentflowID := chi.URLParam(r, "id")
|
||||
|
||||
limit := int32(50)
|
||||
offset := int32(0)
|
||||
if l := r.URL.Query().Get("limit"); l != "" {
|
||||
if v, err := strconv.Atoi(l); err == nil {
|
||||
limit = int32(v)
|
||||
}
|
||||
}
|
||||
if o := r.URL.Query().Get("offset"); o != "" {
|
||||
if v, err := strconv.Atoi(o); err == nil {
|
||||
offset = int32(v)
|
||||
}
|
||||
}
|
||||
|
||||
runs, err := h.Queries.ListAgentflowRuns(r.Context(), db.ListAgentflowRunsParams{
|
||||
AgentflowID: parseUUID(agentflowID),
|
||||
Limit: limit,
|
||||
Offset: offset,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list runs")
|
||||
return
|
||||
}
|
||||
|
||||
resp := make([]AgentflowRunResponse, len(runs))
|
||||
for i, run := range runs {
|
||||
resp[i] = runToResponse(run)
|
||||
}
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func (h *Handler) RunAgentflow(w http.ResponseWriter, r *http.Request) {
|
||||
agentflowID := chi.URLParam(r, "id")
|
||||
workspaceID := resolveWorkspaceID(r)
|
||||
|
||||
af, err := h.Queries.GetAgentflowInWorkspace(r.Context(), db.GetAgentflowInWorkspaceParams{
|
||||
ID: parseUUID(agentflowID),
|
||||
WorkspaceID: parseUUID(workspaceID),
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusNotFound, "agentflow not found")
|
||||
return
|
||||
}
|
||||
|
||||
// Check concurrency policy
|
||||
if af.ConcurrencyPolicy == "skip_if_active" || af.ConcurrencyPolicy == "coalesce" {
|
||||
hasActive, err := h.Queries.HasActiveAgentflowRun(r.Context(), af.ID)
|
||||
if err == nil && hasActive {
|
||||
if af.ConcurrencyPolicy == "skip_if_active" {
|
||||
writeError(w, http.StatusConflict, "agentflow already has an active run")
|
||||
return
|
||||
}
|
||||
// coalesce: skip silently
|
||||
run, _ := h.Queries.CreateAgentflowRun(r.Context(), db.CreateAgentflowRunParams{
|
||||
AgentflowID: af.ID,
|
||||
SourceKind: "manual",
|
||||
Status: "coalesced",
|
||||
})
|
||||
writeJSON(w, http.StatusOK, runToResponse(run))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Create run
|
||||
run, err := h.Queries.CreateAgentflowRun(r.Context(), db.CreateAgentflowRunParams{
|
||||
AgentflowID: af.ID,
|
||||
SourceKind: "manual",
|
||||
Status: "received",
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to create run")
|
||||
return
|
||||
}
|
||||
|
||||
// Enqueue task for the agent
|
||||
err = h.TaskService.EnqueueTaskForAgentflow(r.Context(), af, run)
|
||||
if err != nil {
|
||||
// Mark run as failed
|
||||
h.Queries.UpdateAgentflowRunStatus(r.Context(), db.UpdateAgentflowRunStatusParams{
|
||||
ID: run.ID,
|
||||
Status: "failed",
|
||||
})
|
||||
writeError(w, http.StatusInternalServerError, "failed to enqueue agentflow task")
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusCreated, runToResponse(run))
|
||||
}
|
||||
|
||||
// ── Helpers ─────────────────────────────────────────────────────────────
|
||||
|
||||
func parseTimestamp(s string) pgtype.Timestamptz {
|
||||
var ts pgtype.Timestamptz
|
||||
if err := ts.Scan(s); err != nil {
|
||||
return pgtype.Timestamptz{}
|
||||
}
|
||||
return ts
|
||||
}
|
||||
@@ -246,10 +246,34 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// Include workspace ID and repos so the daemon can set up worktrees.
|
||||
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
|
||||
resp.WorkspaceID = uuidToString(issue.WorkspaceID)
|
||||
if ws, err := h.Queries.GetWorkspace(r.Context(), issue.WorkspaceID); err == nil && ws.Repos != nil {
|
||||
// Resolve workspace ID and repos.
|
||||
var workspaceID pgtype.UUID
|
||||
if task.IssueID.Valid {
|
||||
// Standard issue-triggered task
|
||||
if issue, err := h.Queries.GetIssue(r.Context(), task.IssueID); err == nil {
|
||||
workspaceID = issue.WorkspaceID
|
||||
resp.WorkspaceID = uuidToString(issue.WorkspaceID)
|
||||
}
|
||||
} else if task.AgentflowRunID.Valid {
|
||||
// Agentflow-triggered task: resolve workspace via agentflow
|
||||
if run, err := h.Queries.GetAgentflowRun(r.Context(), task.AgentflowRunID); err == nil {
|
||||
if af, err := h.Queries.GetAgentflow(r.Context(), run.AgentflowID); err == nil {
|
||||
workspaceID = af.WorkspaceID
|
||||
resp.WorkspaceID = uuidToString(af.WorkspaceID)
|
||||
resp.Agentflow = &TaskAgentflowData{
|
||||
ID: uuidToString(af.ID),
|
||||
Title: af.Title,
|
||||
Description: textToPtr(af.Description),
|
||||
RunID: uuidToString(run.ID),
|
||||
SourceKind: run.SourceKind,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Include repos so the daemon can set up worktrees.
|
||||
if workspaceID.Valid {
|
||||
if ws, err := h.Queries.GetWorkspace(r.Context(), workspaceID); err == nil && ws.Repos != nil {
|
||||
var repos []RepoData
|
||||
if json.Unmarshal(ws.Repos, &repos) == nil && len(repos) > 0 {
|
||||
resp.Repos = repos
|
||||
@@ -259,13 +283,15 @@ func (h *Handler) ClaimTaskByRuntime(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
// Look up the prior session for this (agent, issue) pair so the daemon
|
||||
// can resume the Claude Code conversation context.
|
||||
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
|
||||
AgentID: task.AgentID,
|
||||
IssueID: task.IssueID,
|
||||
}); err == nil && prior.SessionID.Valid {
|
||||
resp.PriorSessionID = prior.SessionID.String
|
||||
if prior.WorkDir.Valid {
|
||||
resp.PriorWorkDir = prior.WorkDir.String
|
||||
if task.IssueID.Valid {
|
||||
if prior, err := h.Queries.GetLastTaskSession(r.Context(), db.GetLastTaskSessionParams{
|
||||
AgentID: task.AgentID,
|
||||
IssueID: task.IssueID,
|
||||
}); err == nil && prior.SessionID.Valid {
|
||||
resp.PriorSessionID = prior.SessionID.String
|
||||
if prior.WorkDir.Valid {
|
||||
resp.PriorWorkDir = prior.WorkDir.String
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -108,6 +108,41 @@ func (s *TaskService) EnqueueTaskForMention(ctx context.Context, issue db.Issue,
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// EnqueueTaskForAgentflow creates a queued task for an agentflow run.
|
||||
// The task has no issue_id — the agent may or may not create one during execution.
|
||||
func (s *TaskService) EnqueueTaskForAgentflow(ctx context.Context, af db.Agentflow, run db.AgentflowRun) error {
|
||||
agent, err := s.Queries.GetAgent(ctx, af.AgentID)
|
||||
if err != nil {
|
||||
slog.Error("agentflow task enqueue failed: agent not found", "agentflow_id", util.UUIDToString(af.ID), "agent_id", util.UUIDToString(af.AgentID), "error", err)
|
||||
return fmt.Errorf("load agent: %w", err)
|
||||
}
|
||||
if agent.ArchivedAt.Valid {
|
||||
return fmt.Errorf("agent is archived")
|
||||
}
|
||||
if !agent.RuntimeID.Valid {
|
||||
return fmt.Errorf("agent has no runtime")
|
||||
}
|
||||
|
||||
task, err := s.Queries.CreateAgentflowTask(ctx, db.CreateAgentflowTaskParams{
|
||||
AgentID: af.AgentID,
|
||||
RuntimeID: agent.RuntimeID,
|
||||
Priority: 1, // default priority for agentflow tasks
|
||||
AgentflowRunID: run.ID,
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("agentflow task enqueue failed", "agentflow_id", util.UUIDToString(af.ID), "error", err)
|
||||
return fmt.Errorf("create task: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("agentflow task enqueued",
|
||||
"task_id", util.UUIDToString(task.ID),
|
||||
"agentflow_id", util.UUIDToString(af.ID),
|
||||
"run_id", util.UUIDToString(run.ID),
|
||||
"agent_id", util.UUIDToString(af.AgentID),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// CancelTasksForIssue cancels all active tasks for an issue.
|
||||
func (s *TaskService) CancelTasksForIssue(ctx context.Context, issueID pgtype.UUID) error {
|
||||
return s.Queries.CancelAgentTasksByIssue(ctx, issueID)
|
||||
@@ -402,12 +437,9 @@ func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTa
|
||||
payload["task_id"] = util.UUIDToString(task.ID)
|
||||
payload["runtime_id"] = util.UUIDToString(task.RuntimeID)
|
||||
|
||||
workspaceID := ""
|
||||
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
|
||||
workspaceID = util.UUIDToString(issue.WorkspaceID)
|
||||
}
|
||||
workspaceID := s.resolveTaskWorkspaceID(ctx, task)
|
||||
if workspaceID == "" {
|
||||
return // Issue deleted; skip broadcast to avoid global leak
|
||||
return
|
||||
}
|
||||
s.Bus.Publish(events.Event{
|
||||
Type: protocol.EventTaskDispatch,
|
||||
@@ -419,12 +451,9 @@ func (s *TaskService) broadcastTaskDispatch(ctx context.Context, task db.AgentTa
|
||||
}
|
||||
|
||||
func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string, task db.AgentTaskQueue) {
|
||||
workspaceID := ""
|
||||
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
|
||||
workspaceID = util.UUIDToString(issue.WorkspaceID)
|
||||
}
|
||||
workspaceID := s.resolveTaskWorkspaceID(ctx, task)
|
||||
if workspaceID == "" {
|
||||
return // Issue deleted; skip broadcast to avoid global leak
|
||||
return
|
||||
}
|
||||
s.Bus.Publish(events.Event{
|
||||
Type: eventType,
|
||||
@@ -432,14 +461,39 @@ func (s *TaskService) broadcastTaskEvent(ctx context.Context, eventType string,
|
||||
ActorType: "system",
|
||||
ActorID: "",
|
||||
Payload: map[string]any{
|
||||
"task_id": util.UUIDToString(task.ID),
|
||||
"agent_id": util.UUIDToString(task.AgentID),
|
||||
"issue_id": util.UUIDToString(task.IssueID),
|
||||
"status": task.Status,
|
||||
"task_id": util.UUIDToString(task.ID),
|
||||
"agent_id": util.UUIDToString(task.AgentID),
|
||||
"issue_id": util.UUIDToPtr(task.IssueID),
|
||||
"agentflow_run_id": util.UUIDToPtr(task.AgentflowRunID),
|
||||
"status": task.Status,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// resolveTaskWorkspaceID derives the workspace ID for a task, which may be
|
||||
// associated with an issue (normal task) or an agentflow run (agentflow task).
|
||||
func (s *TaskService) resolveTaskWorkspaceID(ctx context.Context, task db.AgentTaskQueue) string {
|
||||
// Try issue first (most common path)
|
||||
if task.IssueID.Valid {
|
||||
if issue, err := s.Queries.GetIssue(ctx, task.IssueID); err == nil {
|
||||
return util.UUIDToString(issue.WorkspaceID)
|
||||
}
|
||||
}
|
||||
// Fallback: look up via agentflow run → agentflow → workspace
|
||||
if task.AgentflowRunID.Valid {
|
||||
if run, err := s.Queries.GetAgentflowRun(ctx, task.AgentflowRunID); err == nil {
|
||||
if af, err := s.Queries.GetAgentflow(ctx, run.AgentflowID); err == nil {
|
||||
return util.UUIDToString(af.WorkspaceID)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Last resort: look up via agent
|
||||
if agent, err := s.Queries.GetAgent(ctx, task.AgentID); err == nil {
|
||||
return util.UUIDToString(agent.WorkspaceID)
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (s *TaskService) broadcastIssueUpdated(issue db.Issue) {
|
||||
prefix := s.getIssuePrefix(issue.WorkspaceID)
|
||||
s.Bus.Publish(events.Event{
|
||||
|
||||
6
server/migrations/032_agentflow.down.sql
Normal file
6
server/migrations/032_agentflow.down.sql
Normal file
@@ -0,0 +1,6 @@
|
||||
ALTER TABLE agent_task_queue DROP COLUMN IF EXISTS agentflow_run_id;
|
||||
ALTER TABLE agent_task_queue ALTER COLUMN issue_id SET NOT NULL;
|
||||
|
||||
DROP TABLE IF EXISTS agentflow_run;
|
||||
DROP TABLE IF EXISTS agentflow_trigger;
|
||||
DROP TABLE IF EXISTS agentflow;
|
||||
74
server/migrations/032_agentflow.up.sql
Normal file
74
server/migrations/032_agentflow.up.sql
Normal file
@@ -0,0 +1,74 @@
|
||||
-- Agentflow: scheduled/triggered agent tasks that run independently of issues.
|
||||
|
||||
-- Agentflows define reusable agent tasks with prompt templates.
|
||||
CREATE TABLE agentflow (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
workspace_id UUID NOT NULL REFERENCES workspace(id) ON DELETE CASCADE,
|
||||
title TEXT NOT NULL,
|
||||
description TEXT, -- prompt template, supports {{VARIABLE}} interpolation
|
||||
agent_id UUID NOT NULL REFERENCES agent(id) ON DELETE CASCADE,
|
||||
status TEXT NOT NULL DEFAULT 'active'
|
||||
CHECK (status IN ('active', 'paused', 'archived')),
|
||||
concurrency_policy TEXT NOT NULL DEFAULT 'skip_if_active'
|
||||
CHECK (concurrency_policy IN ('skip_if_active', 'coalesce', 'always_run')),
|
||||
variables JSONB NOT NULL DEFAULT '[]', -- variable definitions array
|
||||
created_by UUID NOT NULL REFERENCES "user"(id),
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_agentflow_workspace ON agentflow(workspace_id)
|
||||
WHERE status != 'archived';
|
||||
|
||||
-- Triggers define when an agentflow fires (schedule, webhook, api).
|
||||
CREATE TABLE agentflow_trigger (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
agentflow_id UUID NOT NULL REFERENCES agentflow(id) ON DELETE CASCADE,
|
||||
kind TEXT NOT NULL CHECK (kind IN ('schedule', 'webhook', 'api')),
|
||||
enabled BOOLEAN NOT NULL DEFAULT true,
|
||||
|
||||
-- schedule fields
|
||||
cron_expression TEXT,
|
||||
timezone TEXT DEFAULT 'UTC',
|
||||
next_run_at TIMESTAMPTZ,
|
||||
|
||||
-- webhook fields (reserved for phase 2)
|
||||
public_id TEXT UNIQUE,
|
||||
secret_hash TEXT,
|
||||
signing_mode TEXT CHECK (signing_mode IN ('bearer', 'hmac_sha256')),
|
||||
|
||||
last_fired_at TIMESTAMPTZ,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
-- Efficient lookup for the scheduler: find schedule triggers that are due.
|
||||
CREATE INDEX idx_agentflow_trigger_schedule_due
|
||||
ON agentflow_trigger(next_run_at)
|
||||
WHERE kind = 'schedule' AND enabled = true;
|
||||
|
||||
-- Runs track each execution of an agentflow.
|
||||
CREATE TABLE agentflow_run (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
agentflow_id UUID NOT NULL REFERENCES agentflow(id) ON DELETE CASCADE,
|
||||
trigger_id UUID REFERENCES agentflow_trigger(id) ON DELETE SET NULL,
|
||||
source_kind TEXT NOT NULL CHECK (source_kind IN ('schedule', 'webhook', 'api', 'manual')),
|
||||
status TEXT NOT NULL DEFAULT 'received'
|
||||
CHECK (status IN ('received', 'executing', 'completed', 'failed', 'skipped', 'coalesced')),
|
||||
linked_issue_id UUID REFERENCES issue(id) ON DELETE SET NULL, -- nullable: agent decides if issue needed
|
||||
payload JSONB, -- variable values + trigger payload snapshot
|
||||
agent_output TEXT, -- agent's execution summary
|
||||
started_at TIMESTAMPTZ,
|
||||
completed_at TIMESTAMPTZ,
|
||||
idempotency_key TEXT,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_agentflow_run_agentflow ON agentflow_run(agentflow_id, created_at DESC);
|
||||
|
||||
-- Extend agent_task_queue to support agentflow-triggered tasks.
|
||||
-- issue_id becomes nullable (agentflow tasks may not have an issue).
|
||||
ALTER TABLE agent_task_queue ALTER COLUMN issue_id DROP NOT NULL;
|
||||
|
||||
-- Link tasks to agentflow runs.
|
||||
ALTER TABLE agent_task_queue ADD COLUMN agentflow_run_id UUID
|
||||
REFERENCES agentflow_run(id) ON DELETE SET NULL;
|
||||
@@ -53,7 +53,7 @@ const cancelAgentTask = `-- name: CancelAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'cancelled', completed_at = now()
|
||||
WHERE id = $1 AND status IN ('queued', 'dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
|
||||
`
|
||||
|
||||
func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
||||
@@ -76,6 +76,7 @@ func (q *Queries) CancelAgentTask(ctx context.Context, id pgtype.UUID) (AgentTas
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -117,7 +118,7 @@ WHERE id = (
|
||||
LIMIT 1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
|
||||
`
|
||||
|
||||
// Claims the next queued task for an agent, enforcing per-issue serialization:
|
||||
@@ -144,6 +145,7 @@ func (q *Queries) ClaimAgentTask(ctx context.Context, agentID pgtype.UUID) (Agen
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -152,7 +154,7 @@ const completeAgentTask = `-- name: CompleteAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'completed', completed_at = now(), result = $2, session_id = $3, work_dir = $4
|
||||
WHERE id = $1 AND status = 'running'
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
|
||||
`
|
||||
|
||||
type CompleteAgentTaskParams struct {
|
||||
@@ -187,6 +189,7 @@ func (q *Queries) CompleteAgentTask(ctx context.Context, arg CompleteAgentTaskPa
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -272,7 +275,7 @@ func (q *Queries) CreateAgent(ctx context.Context, arg CreateAgentParams) (Agent
|
||||
const createAgentTask = `-- name: CreateAgentTask :one
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, trigger_comment_id)
|
||||
VALUES ($1, $2, $3, 'queued', $4, $5)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
|
||||
`
|
||||
|
||||
type CreateAgentTaskParams struct {
|
||||
@@ -309,6 +312,7 @@ func (q *Queries) CreateAgentTask(ctx context.Context, arg CreateAgentTaskParams
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -317,7 +321,7 @@ const failAgentTask = `-- name: FailAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'failed', completed_at = now(), error = $2
|
||||
WHERE id = $1 AND status IN ('dispatched', 'running')
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
|
||||
`
|
||||
|
||||
type FailAgentTaskParams struct {
|
||||
@@ -345,6 +349,7 @@ func (q *Queries) FailAgentTask(ctx context.Context, arg FailAgentTaskParams) (A
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -461,7 +466,7 @@ func (q *Queries) GetAgentInWorkspace(ctx context.Context, arg GetAgentInWorkspa
|
||||
}
|
||||
|
||||
const getAgentTask = `-- name: GetAgentTask :one
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
@@ -485,6 +490,7 @@ func (q *Queries) GetAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQu
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -564,7 +570,7 @@ func (q *Queries) HasPendingTaskForIssueAndAgent(ctx context.Context, arg HasPen
|
||||
}
|
||||
|
||||
const listActiveTasksByIssue = `-- name: ListActiveTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
|
||||
WHERE issue_id = $1 AND status IN ('dispatched', 'running')
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -595,6 +601,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -607,7 +614,7 @@ func (q *Queries) ListActiveTasksByIssue(ctx context.Context, issueID pgtype.UUI
|
||||
}
|
||||
|
||||
const listAgentTasks = `-- name: ListAgentTasks :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
|
||||
WHERE agent_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -638,6 +645,7 @@ func (q *Queries) ListAgentTasks(ctx context.Context, agentID pgtype.UUID) ([]Ag
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -742,7 +750,7 @@ func (q *Queries) ListAllAgents(ctx context.Context, workspaceID pgtype.UUID) ([
|
||||
}
|
||||
|
||||
const listPendingTasksByRuntime = `-- name: ListPendingTasksByRuntime :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
|
||||
WHERE runtime_id = $1 AND status IN ('queued', 'dispatched')
|
||||
ORDER BY priority DESC, created_at ASC
|
||||
`
|
||||
@@ -773,6 +781,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -785,7 +794,7 @@ func (q *Queries) ListPendingTasksByRuntime(ctx context.Context, runtimeID pgtyp
|
||||
}
|
||||
|
||||
const listTasksByIssue = `-- name: ListTasksByIssue :many
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id FROM agent_task_queue
|
||||
SELECT id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id FROM agent_task_queue
|
||||
WHERE issue_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
@@ -816,6 +825,7 @@ func (q *Queries) ListTasksByIssue(ctx context.Context, issueID pgtype.UUID) ([]
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -864,7 +874,7 @@ const startAgentTask = `-- name: StartAgentTask :one
|
||||
UPDATE agent_task_queue
|
||||
SET status = 'running', started_at = now()
|
||||
WHERE id = $1 AND status = 'dispatched'
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
|
||||
`
|
||||
|
||||
func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTaskQueue, error) {
|
||||
@@ -887,6 +897,7 @@ func (q *Queries) StartAgentTask(ctx context.Context, id pgtype.UUID) (AgentTask
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
764
server/pkg/db/generated/agentflow.sql.go
Normal file
764
server/pkg/db/generated/agentflow.sql.go
Normal file
@@ -0,0 +1,764 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.30.0
|
||||
// source: agentflow.sql
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const archiveAgentflow = `-- name: ArchiveAgentflow :one
|
||||
UPDATE agentflow SET status = 'archived', updated_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
|
||||
`
|
||||
|
||||
func (q *Queries) ArchiveAgentflow(ctx context.Context, id pgtype.UUID) (Agentflow, error) {
|
||||
row := q.db.QueryRow(ctx, archiveAgentflow, id)
|
||||
var i Agentflow
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.Description,
|
||||
&i.AgentID,
|
||||
&i.Status,
|
||||
&i.ConcurrencyPolicy,
|
||||
&i.Variables,
|
||||
&i.CreatedBy,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const claimDueScheduleTriggers = `-- name: ClaimDueScheduleTriggers :many
|
||||
UPDATE agentflow_trigger t SET
|
||||
next_run_at = NULL, -- will be recalculated by caller
|
||||
last_fired_at = now()
|
||||
FROM agentflow a
|
||||
WHERE t.agentflow_id = a.id
|
||||
AND t.kind = 'schedule'
|
||||
AND t.enabled = true
|
||||
AND a.status = 'active'
|
||||
AND t.next_run_at IS NOT NULL
|
||||
AND t.next_run_at <= now()
|
||||
RETURNING t.id, t.agentflow_id, t.kind, t.enabled, t.cron_expression, t.timezone, t.next_run_at, t.public_id, t.secret_hash, t.signing_mode, t.last_fired_at, t.created_at, a.workspace_id, a.agent_id, a.title AS agentflow_title, a.description AS agentflow_description, a.concurrency_policy
|
||||
`
|
||||
|
||||
type ClaimDueScheduleTriggersRow struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
||||
Kind string `json:"kind"`
|
||||
Enabled bool `json:"enabled"`
|
||||
CronExpression pgtype.Text `json:"cron_expression"`
|
||||
Timezone pgtype.Text `json:"timezone"`
|
||||
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
||||
PublicID pgtype.Text `json:"public_id"`
|
||||
SecretHash pgtype.Text `json:"secret_hash"`
|
||||
SigningMode pgtype.Text `json:"signing_mode"`
|
||||
LastFiredAt pgtype.Timestamptz `json:"last_fired_at"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
AgentflowTitle string `json:"agentflow_title"`
|
||||
AgentflowDescription pgtype.Text `json:"agentflow_description"`
|
||||
ConcurrencyPolicy string `json:"concurrency_policy"`
|
||||
}
|
||||
|
||||
// Atomically claims all schedule triggers that are due.
|
||||
// Uses CAS (compare-and-swap) on next_run_at to prevent double-firing
|
||||
// across multiple server instances.
|
||||
func (q *Queries) ClaimDueScheduleTriggers(ctx context.Context) ([]ClaimDueScheduleTriggersRow, error) {
|
||||
rows, err := q.db.Query(ctx, claimDueScheduleTriggers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ClaimDueScheduleTriggersRow{}
|
||||
for rows.Next() {
|
||||
var i ClaimDueScheduleTriggersRow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.Kind,
|
||||
&i.Enabled,
|
||||
&i.CronExpression,
|
||||
&i.Timezone,
|
||||
&i.NextRunAt,
|
||||
&i.PublicID,
|
||||
&i.SecretHash,
|
||||
&i.SigningMode,
|
||||
&i.LastFiredAt,
|
||||
&i.CreatedAt,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.AgentflowTitle,
|
||||
&i.AgentflowDescription,
|
||||
&i.ConcurrencyPolicy,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const completeAgentflowRun = `-- name: CompleteAgentflowRun :one
|
||||
UPDATE agentflow_run SET
|
||||
status = $2,
|
||||
agent_output = $3,
|
||||
linked_issue_id = $4,
|
||||
completed_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
|
||||
`
|
||||
|
||||
type CompleteAgentflowRunParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Status string `json:"status"`
|
||||
AgentOutput pgtype.Text `json:"agent_output"`
|
||||
LinkedIssueID pgtype.UUID `json:"linked_issue_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) CompleteAgentflowRun(ctx context.Context, arg CompleteAgentflowRunParams) (AgentflowRun, error) {
|
||||
row := q.db.QueryRow(ctx, completeAgentflowRun,
|
||||
arg.ID,
|
||||
arg.Status,
|
||||
arg.AgentOutput,
|
||||
arg.LinkedIssueID,
|
||||
)
|
||||
var i AgentflowRun
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.TriggerID,
|
||||
&i.SourceKind,
|
||||
&i.Status,
|
||||
&i.LinkedIssueID,
|
||||
&i.Payload,
|
||||
&i.AgentOutput,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.IdempotencyKey,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const createAgentflow = `-- name: CreateAgentflow :one
|
||||
INSERT INTO agentflow (
|
||||
workspace_id, title, description, agent_id, status,
|
||||
concurrency_policy, variables, created_by
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
|
||||
`
|
||||
|
||||
type CreateAgentflowParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Title string `json:"title"`
|
||||
Description pgtype.Text `json:"description"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
Status string `json:"status"`
|
||||
ConcurrencyPolicy string `json:"concurrency_policy"`
|
||||
Variables []byte `json:"variables"`
|
||||
CreatedBy pgtype.UUID `json:"created_by"`
|
||||
}
|
||||
|
||||
func (q *Queries) CreateAgentflow(ctx context.Context, arg CreateAgentflowParams) (Agentflow, error) {
|
||||
row := q.db.QueryRow(ctx, createAgentflow,
|
||||
arg.WorkspaceID,
|
||||
arg.Title,
|
||||
arg.Description,
|
||||
arg.AgentID,
|
||||
arg.Status,
|
||||
arg.ConcurrencyPolicy,
|
||||
arg.Variables,
|
||||
arg.CreatedBy,
|
||||
)
|
||||
var i Agentflow
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.Description,
|
||||
&i.AgentID,
|
||||
&i.Status,
|
||||
&i.ConcurrencyPolicy,
|
||||
&i.Variables,
|
||||
&i.CreatedBy,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const createAgentflowRun = `-- name: CreateAgentflowRun :one
|
||||
INSERT INTO agentflow_run (
|
||||
agentflow_id, trigger_id, source_kind, status, payload, idempotency_key
|
||||
) VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
|
||||
`
|
||||
|
||||
type CreateAgentflowRunParams struct {
|
||||
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
||||
TriggerID pgtype.UUID `json:"trigger_id"`
|
||||
SourceKind string `json:"source_kind"`
|
||||
Status string `json:"status"`
|
||||
Payload []byte `json:"payload"`
|
||||
IdempotencyKey pgtype.Text `json:"idempotency_key"`
|
||||
}
|
||||
|
||||
func (q *Queries) CreateAgentflowRun(ctx context.Context, arg CreateAgentflowRunParams) (AgentflowRun, error) {
|
||||
row := q.db.QueryRow(ctx, createAgentflowRun,
|
||||
arg.AgentflowID,
|
||||
arg.TriggerID,
|
||||
arg.SourceKind,
|
||||
arg.Status,
|
||||
arg.Payload,
|
||||
arg.IdempotencyKey,
|
||||
)
|
||||
var i AgentflowRun
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.TriggerID,
|
||||
&i.SourceKind,
|
||||
&i.Status,
|
||||
&i.LinkedIssueID,
|
||||
&i.Payload,
|
||||
&i.AgentOutput,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.IdempotencyKey,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const createAgentflowTask = `-- name: CreateAgentflowTask :one
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, agentflow_run_id)
|
||||
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
||||
RETURNING id, agent_id, issue_id, status, priority, dispatched_at, started_at, completed_at, result, error, created_at, context, runtime_id, session_id, work_dir, trigger_comment_id, agentflow_run_id
|
||||
`
|
||||
|
||||
type CreateAgentflowTaskParams struct {
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
RuntimeID pgtype.UUID `json:"runtime_id"`
|
||||
Priority int32 `json:"priority"`
|
||||
AgentflowRunID pgtype.UUID `json:"agentflow_run_id"`
|
||||
}
|
||||
|
||||
// Creates a task in the queue for an agentflow run (issue_id is NULL).
|
||||
func (q *Queries) CreateAgentflowTask(ctx context.Context, arg CreateAgentflowTaskParams) (AgentTaskQueue, error) {
|
||||
row := q.db.QueryRow(ctx, createAgentflowTask,
|
||||
arg.AgentID,
|
||||
arg.RuntimeID,
|
||||
arg.Priority,
|
||||
arg.AgentflowRunID,
|
||||
)
|
||||
var i AgentTaskQueue
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.AgentID,
|
||||
&i.IssueID,
|
||||
&i.Status,
|
||||
&i.Priority,
|
||||
&i.DispatchedAt,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.Result,
|
||||
&i.Error,
|
||||
&i.CreatedAt,
|
||||
&i.Context,
|
||||
&i.RuntimeID,
|
||||
&i.SessionID,
|
||||
&i.WorkDir,
|
||||
&i.TriggerCommentID,
|
||||
&i.AgentflowRunID,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const createAgentflowTrigger = `-- name: CreateAgentflowTrigger :one
|
||||
INSERT INTO agentflow_trigger (
|
||||
agentflow_id, kind, enabled,
|
||||
cron_expression, timezone, next_run_at,
|
||||
public_id, secret_hash, signing_mode
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
RETURNING id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at
|
||||
`
|
||||
|
||||
type CreateAgentflowTriggerParams struct {
|
||||
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
||||
Kind string `json:"kind"`
|
||||
Enabled bool `json:"enabled"`
|
||||
CronExpression pgtype.Text `json:"cron_expression"`
|
||||
Timezone pgtype.Text `json:"timezone"`
|
||||
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
||||
PublicID pgtype.Text `json:"public_id"`
|
||||
SecretHash pgtype.Text `json:"secret_hash"`
|
||||
SigningMode pgtype.Text `json:"signing_mode"`
|
||||
}
|
||||
|
||||
func (q *Queries) CreateAgentflowTrigger(ctx context.Context, arg CreateAgentflowTriggerParams) (AgentflowTrigger, error) {
|
||||
row := q.db.QueryRow(ctx, createAgentflowTrigger,
|
||||
arg.AgentflowID,
|
||||
arg.Kind,
|
||||
arg.Enabled,
|
||||
arg.CronExpression,
|
||||
arg.Timezone,
|
||||
arg.NextRunAt,
|
||||
arg.PublicID,
|
||||
arg.SecretHash,
|
||||
arg.SigningMode,
|
||||
)
|
||||
var i AgentflowTrigger
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.Kind,
|
||||
&i.Enabled,
|
||||
&i.CronExpression,
|
||||
&i.Timezone,
|
||||
&i.NextRunAt,
|
||||
&i.PublicID,
|
||||
&i.SecretHash,
|
||||
&i.SigningMode,
|
||||
&i.LastFiredAt,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteAgentflowTrigger = `-- name: DeleteAgentflowTrigger :exec
|
||||
DELETE FROM agentflow_trigger WHERE id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) DeleteAgentflowTrigger(ctx context.Context, id pgtype.UUID) error {
|
||||
_, err := q.db.Exec(ctx, deleteAgentflowTrigger, id)
|
||||
return err
|
||||
}
|
||||
|
||||
const getAgentflow = `-- name: GetAgentflow :one
|
||||
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow WHERE id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) GetAgentflow(ctx context.Context, id pgtype.UUID) (Agentflow, error) {
|
||||
row := q.db.QueryRow(ctx, getAgentflow, id)
|
||||
var i Agentflow
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.Description,
|
||||
&i.AgentID,
|
||||
&i.Status,
|
||||
&i.ConcurrencyPolicy,
|
||||
&i.Variables,
|
||||
&i.CreatedBy,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getAgentflowInWorkspace = `-- name: GetAgentflowInWorkspace :one
|
||||
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow WHERE id = $1 AND workspace_id = $2
|
||||
`
|
||||
|
||||
type GetAgentflowInWorkspaceParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetAgentflowInWorkspace(ctx context.Context, arg GetAgentflowInWorkspaceParams) (Agentflow, error) {
|
||||
row := q.db.QueryRow(ctx, getAgentflowInWorkspace, arg.ID, arg.WorkspaceID)
|
||||
var i Agentflow
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.Description,
|
||||
&i.AgentID,
|
||||
&i.Status,
|
||||
&i.ConcurrencyPolicy,
|
||||
&i.Variables,
|
||||
&i.CreatedBy,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getAgentflowRun = `-- name: GetAgentflowRun :one
|
||||
SELECT id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at FROM agentflow_run WHERE id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) GetAgentflowRun(ctx context.Context, id pgtype.UUID) (AgentflowRun, error) {
|
||||
row := q.db.QueryRow(ctx, getAgentflowRun, id)
|
||||
var i AgentflowRun
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.TriggerID,
|
||||
&i.SourceKind,
|
||||
&i.Status,
|
||||
&i.LinkedIssueID,
|
||||
&i.Payload,
|
||||
&i.AgentOutput,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.IdempotencyKey,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getAgentflowTrigger = `-- name: GetAgentflowTrigger :one
|
||||
SELECT id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at FROM agentflow_trigger WHERE id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) GetAgentflowTrigger(ctx context.Context, id pgtype.UUID) (AgentflowTrigger, error) {
|
||||
row := q.db.QueryRow(ctx, getAgentflowTrigger, id)
|
||||
var i AgentflowTrigger
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.Kind,
|
||||
&i.Enabled,
|
||||
&i.CronExpression,
|
||||
&i.Timezone,
|
||||
&i.NextRunAt,
|
||||
&i.PublicID,
|
||||
&i.SecretHash,
|
||||
&i.SigningMode,
|
||||
&i.LastFiredAt,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const hasActiveAgentflowRun = `-- name: HasActiveAgentflowRun :one
|
||||
SELECT count(*) > 0 AS has_active FROM agentflow_run
|
||||
WHERE agentflow_id = $1 AND status IN ('received', 'executing')
|
||||
`
|
||||
|
||||
// Check if agentflow has a run currently executing (for concurrency policy).
|
||||
func (q *Queries) HasActiveAgentflowRun(ctx context.Context, agentflowID pgtype.UUID) (bool, error) {
|
||||
row := q.db.QueryRow(ctx, hasActiveAgentflowRun, agentflowID)
|
||||
var has_active bool
|
||||
err := row.Scan(&has_active)
|
||||
return has_active, err
|
||||
}
|
||||
|
||||
const listAgentflowRuns = `-- name: ListAgentflowRuns :many
|
||||
SELECT id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at FROM agentflow_run
|
||||
WHERE agentflow_id = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $2 OFFSET $3
|
||||
`
|
||||
|
||||
type ListAgentflowRunsParams struct {
|
||||
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
||||
Limit int32 `json:"limit"`
|
||||
Offset int32 `json:"offset"`
|
||||
}
|
||||
|
||||
func (q *Queries) ListAgentflowRuns(ctx context.Context, arg ListAgentflowRunsParams) ([]AgentflowRun, error) {
|
||||
rows, err := q.db.Query(ctx, listAgentflowRuns, arg.AgentflowID, arg.Limit, arg.Offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []AgentflowRun{}
|
||||
for rows.Next() {
|
||||
var i AgentflowRun
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.TriggerID,
|
||||
&i.SourceKind,
|
||||
&i.Status,
|
||||
&i.LinkedIssueID,
|
||||
&i.Payload,
|
||||
&i.AgentOutput,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.IdempotencyKey,
|
||||
&i.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listAgentflowTriggers = `-- name: ListAgentflowTriggers :many
|
||||
SELECT id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at FROM agentflow_trigger
|
||||
WHERE agentflow_id = $1
|
||||
ORDER BY created_at ASC
|
||||
`
|
||||
|
||||
func (q *Queries) ListAgentflowTriggers(ctx context.Context, agentflowID pgtype.UUID) ([]AgentflowTrigger, error) {
|
||||
rows, err := q.db.Query(ctx, listAgentflowTriggers, agentflowID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []AgentflowTrigger{}
|
||||
for rows.Next() {
|
||||
var i AgentflowTrigger
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.Kind,
|
||||
&i.Enabled,
|
||||
&i.CronExpression,
|
||||
&i.Timezone,
|
||||
&i.NextRunAt,
|
||||
&i.PublicID,
|
||||
&i.SecretHash,
|
||||
&i.SigningMode,
|
||||
&i.LastFiredAt,
|
||||
&i.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listAgentflows = `-- name: ListAgentflows :many
|
||||
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow
|
||||
WHERE workspace_id = $1 AND status != 'archived'
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
|
||||
func (q *Queries) ListAgentflows(ctx context.Context, workspaceID pgtype.UUID) ([]Agentflow, error) {
|
||||
rows, err := q.db.Query(ctx, listAgentflows, workspaceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []Agentflow{}
|
||||
for rows.Next() {
|
||||
var i Agentflow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.Description,
|
||||
&i.AgentID,
|
||||
&i.Status,
|
||||
&i.ConcurrencyPolicy,
|
||||
&i.Variables,
|
||||
&i.CreatedBy,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listAllAgentflows = `-- name: ListAllAgentflows :many
|
||||
SELECT id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at FROM agentflow
|
||||
WHERE workspace_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
|
||||
func (q *Queries) ListAllAgentflows(ctx context.Context, workspaceID pgtype.UUID) ([]Agentflow, error) {
|
||||
rows, err := q.db.Query(ctx, listAllAgentflows, workspaceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []Agentflow{}
|
||||
for rows.Next() {
|
||||
var i Agentflow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.Description,
|
||||
&i.AgentID,
|
||||
&i.Status,
|
||||
&i.ConcurrencyPolicy,
|
||||
&i.Variables,
|
||||
&i.CreatedBy,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const setTriggerNextRunAt = `-- name: SetTriggerNextRunAt :exec
|
||||
UPDATE agentflow_trigger SET next_run_at = $2
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
type SetTriggerNextRunAtParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
||||
}
|
||||
|
||||
func (q *Queries) SetTriggerNextRunAt(ctx context.Context, arg SetTriggerNextRunAtParams) error {
|
||||
_, err := q.db.Exec(ctx, setTriggerNextRunAt, arg.ID, arg.NextRunAt)
|
||||
return err
|
||||
}
|
||||
|
||||
const updateAgentflow = `-- name: UpdateAgentflow :one
|
||||
UPDATE agentflow SET
|
||||
title = COALESCE($2, title),
|
||||
description = COALESCE($3, description),
|
||||
agent_id = COALESCE($4, agent_id),
|
||||
status = COALESCE($5, status),
|
||||
concurrency_policy = COALESCE($6, concurrency_policy),
|
||||
variables = COALESCE($7, variables),
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING id, workspace_id, title, description, agent_id, status, concurrency_policy, variables, created_by, created_at, updated_at
|
||||
`
|
||||
|
||||
type UpdateAgentflowParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Title pgtype.Text `json:"title"`
|
||||
Description pgtype.Text `json:"description"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
Status pgtype.Text `json:"status"`
|
||||
ConcurrencyPolicy pgtype.Text `json:"concurrency_policy"`
|
||||
Variables []byte `json:"variables"`
|
||||
}
|
||||
|
||||
func (q *Queries) UpdateAgentflow(ctx context.Context, arg UpdateAgentflowParams) (Agentflow, error) {
|
||||
row := q.db.QueryRow(ctx, updateAgentflow,
|
||||
arg.ID,
|
||||
arg.Title,
|
||||
arg.Description,
|
||||
arg.AgentID,
|
||||
arg.Status,
|
||||
arg.ConcurrencyPolicy,
|
||||
arg.Variables,
|
||||
)
|
||||
var i Agentflow
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.Title,
|
||||
&i.Description,
|
||||
&i.AgentID,
|
||||
&i.Status,
|
||||
&i.ConcurrencyPolicy,
|
||||
&i.Variables,
|
||||
&i.CreatedBy,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const updateAgentflowRunStatus = `-- name: UpdateAgentflowRunStatus :one
|
||||
UPDATE agentflow_run SET
|
||||
status = $2,
|
||||
started_at = CASE WHEN $2 = 'executing' THEN now() ELSE started_at END,
|
||||
completed_at = CASE WHEN $2 IN ('completed', 'failed', 'skipped', 'coalesced') THEN now() ELSE completed_at END
|
||||
WHERE id = $1
|
||||
RETURNING id, agentflow_id, trigger_id, source_kind, status, linked_issue_id, payload, agent_output, started_at, completed_at, idempotency_key, created_at
|
||||
`
|
||||
|
||||
type UpdateAgentflowRunStatusParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
func (q *Queries) UpdateAgentflowRunStatus(ctx context.Context, arg UpdateAgentflowRunStatusParams) (AgentflowRun, error) {
|
||||
row := q.db.QueryRow(ctx, updateAgentflowRunStatus, arg.ID, arg.Status)
|
||||
var i AgentflowRun
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.TriggerID,
|
||||
&i.SourceKind,
|
||||
&i.Status,
|
||||
&i.LinkedIssueID,
|
||||
&i.Payload,
|
||||
&i.AgentOutput,
|
||||
&i.StartedAt,
|
||||
&i.CompletedAt,
|
||||
&i.IdempotencyKey,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const updateAgentflowTrigger = `-- name: UpdateAgentflowTrigger :one
|
||||
UPDATE agentflow_trigger SET
|
||||
enabled = COALESCE($2, enabled),
|
||||
cron_expression = COALESCE($3, cron_expression),
|
||||
timezone = COALESCE($4, timezone),
|
||||
next_run_at = COALESCE($5, next_run_at)
|
||||
WHERE id = $1
|
||||
RETURNING id, agentflow_id, kind, enabled, cron_expression, timezone, next_run_at, public_id, secret_hash, signing_mode, last_fired_at, created_at
|
||||
`
|
||||
|
||||
type UpdateAgentflowTriggerParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Enabled pgtype.Bool `json:"enabled"`
|
||||
CronExpression pgtype.Text `json:"cron_expression"`
|
||||
Timezone pgtype.Text `json:"timezone"`
|
||||
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
||||
}
|
||||
|
||||
func (q *Queries) UpdateAgentflowTrigger(ctx context.Context, arg UpdateAgentflowTriggerParams) (AgentflowTrigger, error) {
|
||||
row := q.db.QueryRow(ctx, updateAgentflowTrigger,
|
||||
arg.ID,
|
||||
arg.Enabled,
|
||||
arg.CronExpression,
|
||||
arg.Timezone,
|
||||
arg.NextRunAt,
|
||||
)
|
||||
var i AgentflowTrigger
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.AgentflowID,
|
||||
&i.Kind,
|
||||
&i.Enabled,
|
||||
&i.CronExpression,
|
||||
&i.Timezone,
|
||||
&i.NextRunAt,
|
||||
&i.PublicID,
|
||||
&i.SecretHash,
|
||||
&i.SigningMode,
|
||||
&i.LastFiredAt,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -79,6 +79,51 @@ type AgentTaskQueue struct {
|
||||
SessionID pgtype.Text `json:"session_id"`
|
||||
WorkDir pgtype.Text `json:"work_dir"`
|
||||
TriggerCommentID pgtype.UUID `json:"trigger_comment_id"`
|
||||
AgentflowRunID pgtype.UUID `json:"agentflow_run_id"`
|
||||
}
|
||||
|
||||
type Agentflow struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
Title string `json:"title"`
|
||||
Description pgtype.Text `json:"description"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
Status string `json:"status"`
|
||||
ConcurrencyPolicy string `json:"concurrency_policy"`
|
||||
Variables []byte `json:"variables"`
|
||||
CreatedBy pgtype.UUID `json:"created_by"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
}
|
||||
|
||||
type AgentflowRun struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
||||
TriggerID pgtype.UUID `json:"trigger_id"`
|
||||
SourceKind string `json:"source_kind"`
|
||||
Status string `json:"status"`
|
||||
LinkedIssueID pgtype.UUID `json:"linked_issue_id"`
|
||||
Payload []byte `json:"payload"`
|
||||
AgentOutput pgtype.Text `json:"agent_output"`
|
||||
StartedAt pgtype.Timestamptz `json:"started_at"`
|
||||
CompletedAt pgtype.Timestamptz `json:"completed_at"`
|
||||
IdempotencyKey pgtype.Text `json:"idempotency_key"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type AgentflowTrigger struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
AgentflowID pgtype.UUID `json:"agentflow_id"`
|
||||
Kind string `json:"kind"`
|
||||
Enabled bool `json:"enabled"`
|
||||
CronExpression pgtype.Text `json:"cron_expression"`
|
||||
Timezone pgtype.Text `json:"timezone"`
|
||||
NextRunAt pgtype.Timestamptz `json:"next_run_at"`
|
||||
PublicID pgtype.Text `json:"public_id"`
|
||||
SecretHash pgtype.Text `json:"secret_hash"`
|
||||
SigningMode pgtype.Text `json:"signing_mode"`
|
||||
LastFiredAt pgtype.Timestamptz `json:"last_fired_at"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type Attachment struct {
|
||||
|
||||
130
server/pkg/db/queries/agentflow.sql
Normal file
130
server/pkg/db/queries/agentflow.sql
Normal file
@@ -0,0 +1,130 @@
|
||||
-- name: CreateAgentflow :one
|
||||
INSERT INTO agentflow (
|
||||
workspace_id, title, description, agent_id, status,
|
||||
concurrency_policy, variables, created_by
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetAgentflow :one
|
||||
SELECT * FROM agentflow WHERE id = $1;
|
||||
|
||||
-- name: GetAgentflowInWorkspace :one
|
||||
SELECT * FROM agentflow WHERE id = $1 AND workspace_id = $2;
|
||||
|
||||
-- name: ListAgentflows :many
|
||||
SELECT * FROM agentflow
|
||||
WHERE workspace_id = $1 AND status != 'archived'
|
||||
ORDER BY created_at DESC;
|
||||
|
||||
-- name: ListAllAgentflows :many
|
||||
SELECT * FROM agentflow
|
||||
WHERE workspace_id = $1
|
||||
ORDER BY created_at DESC;
|
||||
|
||||
-- name: UpdateAgentflow :one
|
||||
UPDATE agentflow SET
|
||||
title = COALESCE(sqlc.narg('title'), title),
|
||||
description = COALESCE(sqlc.narg('description'), description),
|
||||
agent_id = COALESCE(sqlc.narg('agent_id'), agent_id),
|
||||
status = COALESCE(sqlc.narg('status'), status),
|
||||
concurrency_policy = COALESCE(sqlc.narg('concurrency_policy'), concurrency_policy),
|
||||
variables = COALESCE(sqlc.narg('variables'), variables),
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING *;
|
||||
|
||||
-- name: ArchiveAgentflow :one
|
||||
UPDATE agentflow SET status = 'archived', updated_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING *;
|
||||
|
||||
-- name: CreateAgentflowTrigger :one
|
||||
INSERT INTO agentflow_trigger (
|
||||
agentflow_id, kind, enabled,
|
||||
cron_expression, timezone, next_run_at,
|
||||
public_id, secret_hash, signing_mode
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetAgentflowTrigger :one
|
||||
SELECT * FROM agentflow_trigger WHERE id = $1;
|
||||
|
||||
-- name: ListAgentflowTriggers :many
|
||||
SELECT * FROM agentflow_trigger
|
||||
WHERE agentflow_id = $1
|
||||
ORDER BY created_at ASC;
|
||||
|
||||
-- name: UpdateAgentflowTrigger :one
|
||||
UPDATE agentflow_trigger SET
|
||||
enabled = COALESCE(sqlc.narg('enabled'), enabled),
|
||||
cron_expression = COALESCE(sqlc.narg('cron_expression'), cron_expression),
|
||||
timezone = COALESCE(sqlc.narg('timezone'), timezone),
|
||||
next_run_at = COALESCE(sqlc.narg('next_run_at'), next_run_at)
|
||||
WHERE id = $1
|
||||
RETURNING *;
|
||||
|
||||
-- name: DeleteAgentflowTrigger :exec
|
||||
DELETE FROM agentflow_trigger WHERE id = $1;
|
||||
|
||||
-- name: ClaimDueScheduleTriggers :many
|
||||
-- Atomically claims all schedule triggers that are due.
|
||||
-- Uses CAS (compare-and-swap) on next_run_at to prevent double-firing
|
||||
-- across multiple server instances.
|
||||
UPDATE agentflow_trigger t SET
|
||||
next_run_at = NULL, -- will be recalculated by caller
|
||||
last_fired_at = now()
|
||||
FROM agentflow a
|
||||
WHERE t.agentflow_id = a.id
|
||||
AND t.kind = 'schedule'
|
||||
AND t.enabled = true
|
||||
AND a.status = 'active'
|
||||
AND t.next_run_at IS NOT NULL
|
||||
AND t.next_run_at <= now()
|
||||
RETURNING t.*, a.workspace_id, a.agent_id, a.title AS agentflow_title, a.description AS agentflow_description, a.concurrency_policy;
|
||||
|
||||
-- name: SetTriggerNextRunAt :exec
|
||||
UPDATE agentflow_trigger SET next_run_at = $2
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: CreateAgentflowRun :one
|
||||
INSERT INTO agentflow_run (
|
||||
agentflow_id, trigger_id, source_kind, status, payload, idempotency_key
|
||||
) VALUES ($1, $2, $3, $4, $5, $6)
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetAgentflowRun :one
|
||||
SELECT * FROM agentflow_run WHERE id = $1;
|
||||
|
||||
-- name: ListAgentflowRuns :many
|
||||
SELECT * FROM agentflow_run
|
||||
WHERE agentflow_id = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $2 OFFSET $3;
|
||||
|
||||
-- name: UpdateAgentflowRunStatus :one
|
||||
UPDATE agentflow_run SET
|
||||
status = $2,
|
||||
started_at = CASE WHEN $2 = 'executing' THEN now() ELSE started_at END,
|
||||
completed_at = CASE WHEN $2 IN ('completed', 'failed', 'skipped', 'coalesced') THEN now() ELSE completed_at END
|
||||
WHERE id = $1
|
||||
RETURNING *;
|
||||
|
||||
-- name: CompleteAgentflowRun :one
|
||||
UPDATE agentflow_run SET
|
||||
status = $2,
|
||||
agent_output = $3,
|
||||
linked_issue_id = sqlc.narg('linked_issue_id'),
|
||||
completed_at = now()
|
||||
WHERE id = $1
|
||||
RETURNING *;
|
||||
|
||||
-- name: HasActiveAgentflowRun :one
|
||||
-- Check if agentflow has a run currently executing (for concurrency policy).
|
||||
SELECT count(*) > 0 AS has_active FROM agentflow_run
|
||||
WHERE agentflow_id = $1 AND status IN ('received', 'executing');
|
||||
|
||||
-- name: CreateAgentflowTask :one
|
||||
-- Creates a task in the queue for an agentflow run (issue_id is NULL).
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, issue_id, status, priority, agentflow_run_id)
|
||||
VALUES ($1, $2, NULL, 'queued', $3, $4)
|
||||
RETURNING *;
|
||||
Reference in New Issue
Block a user