Compare commits

...

5 Commits

Author SHA1 Message Date
Lambda
007c98af03 fix autopilot subscriber template transaction
Co-authored-by: multica-agent <github@multica.ai>
2026-06-17 15:26:46 +08:00
Lambda
803067316b fix(autopilot): align subscriber PR with current main
Co-authored-by: multica-agent <github@multica.ai>
2026-06-17 03:21:31 +08:00
Lambda
e0c3ed0695 fix(autopilot): notify template subscribers on issue creation (MUL-2533)
The autopilot create-issue path fans out template subscribers into
issue_subscriber inside the same tx as the issue insert, but the
issue:created notification listener only matches handler.IssueResponse
payloads and only direct-notifies the assignee + @mentions. The autopilot
publishes a map[string]any payload, so the listener falls through and the
template subscribers never receive an inbox item for the creation event —
breaking OQ3 ("reason='autopilot' subscribers receive all subscription
events, consistent with reason='manual'").

Fix it where the divergence lives: in dispatchCreateIssue, right after
EventIssueCreated fires, write an inbox_item (type='issue_subscribed',
severity='info') for each member subscriber and publish EventInboxNew so
the recipient's inbox WS feed updates in real time. The write is after
the tx commit so an inbox hiccup can't roll back the issue; failures are
logged, not propagated. The manual path is unchanged — manual subscribers
don't exist at creation time, so there is nothing to notify there.

Adds a new InboxItemType 'issue_subscribed' (en/zh labels) and two
covering tests in autopilot_subscriber_test.go: one asserts the inbox
row lands for a template subscriber on dispatch, the other asserts the
no-subscriber autopilot stays silent.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-17 03:12:06 +08:00
Lambda
a023cc51e4 feat(autopilot): default subscriber template (MUL-2533) — frontend
New SubscriberMultiSelect picker (members-only search + chips) wired
into the create / edit AutopilotDialog. The detail page renders the
saved template as read-only chips; edits flow through the dialog.

TS types expose the new `subscribers` field on Autopilot, plus an
AutopilotSubscriberInput shape for the create/update wire payloads.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-17 03:12:06 +08:00
Lambda
21e11ee511 feat(autopilot): default subscriber template (MUL-2533) — server
Add per-autopilot member subscriber template that fans out to every
issue the autopilot spawns. New autopilot_subscriber table; extend
issue_subscriber.reason with 'autopilot' so the dispatch-time fanout
is distinguishable from manual subscriptions.

API: POST/PATCH /api/autopilots accept a `subscribers` array (member
user_type only for the first version); PATCH semantics are full-replace.
GET returns subscribers on the detail endpoint; the list endpoint omits
them to avoid an N+1.

Dispatch: dispatchCreateIssue lists the template inside the same tx as
the issue insert and writes the rows with reason='autopilot' before
EventIssueCreated fires, so notification listeners see the full
subscriber set on the first event.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-17 03:11:22 +08:00
25 changed files with 1296 additions and 14 deletions

View File

@@ -47,6 +47,7 @@ const PRIORITY_LABEL: Record<IssuePriority, string> = {
// Mirrors useTypeLabels in packages/views/inbox/components/inbox-detail-label.tsx
const TYPE_LABEL: Record<InboxItemType, string> = {
issue_assigned: "Assigned",
issue_subscribed: "Subscribed",
unassigned: "Unassigned",
assignee_changed: "Reassigned",
status_changed: "Status changed",

View File

@@ -39,11 +39,22 @@ export function useUpdateAutopilot() {
qc.cancelQueries({ queryKey: autopilotKeys.list(wsId) });
const prevList = qc.getQueryData<ListAutopilotsResponse>(autopilotKeys.list(wsId));
const prevDetail = qc.getQueryData<GetAutopilotResponse>(autopilotKeys.detail(wsId, id));
// Request shape (AutopilotSubscriberInput) lacks `created_at`, so it's
// not assignable to the response shape. onSettled invalidates the
// detail query and refetches the authoritative server payload.
const { subscribers: _omitSubs, ...optimistic } = data;
qc.setQueryData<ListAutopilotsResponse>(autopilotKeys.list(wsId), (old) =>
old ? { ...old, autopilots: old.autopilots.map((a) => (a.id === id ? { ...a, ...data } : a)) } : old,
old
? {
...old,
autopilots: old.autopilots.map((a) =>
a.id === id ? { ...a, ...optimistic } : a,
),
}
: old,
);
qc.setQueryData<GetAutopilotResponse>(autopilotKeys.detail(wsId, id), (old) =>
old ? { ...old, autopilot: { ...old.autopilot, ...data } } : old,
old ? { ...old, autopilot: { ...old.autopilot, ...optimistic } } : old,
);
return { prevList, prevDetail, id };
},

View File

@@ -46,6 +46,9 @@ export interface Autopilot {
trigger_kinds?: string[];
next_run_at?: string | null;
last_run_status?: string | null;
// List endpoint returns []; only the detail endpoint populates this.
// Treat undefined as empty on older servers.
subscribers?: AutopilotSubscriber[];
}
export interface WebhookEventFilter {
@@ -53,6 +56,12 @@ export interface WebhookEventFilter {
actions?: string[];
}
export interface AutopilotSubscriber {
user_type: "member";
user_id: string;
created_at: string;
}
export interface AutopilotTrigger {
id: string;
autopilot_id: string;
@@ -95,6 +104,11 @@ export interface AutopilotRun {
created_at: string;
}
export interface AutopilotSubscriberInput {
user_type: "member";
user_id: string;
}
export interface CreateAutopilotRequest {
title: string;
description?: string;
@@ -105,6 +119,7 @@ export interface CreateAutopilotRequest {
assignee_id: string;
execution_mode: AutopilotExecutionMode;
issue_title_template?: string;
subscribers?: AutopilotSubscriberInput[];
}
export interface UpdateAutopilotRequest {
@@ -118,6 +133,9 @@ export interface UpdateAutopilotRequest {
status?: AutopilotStatus;
execution_mode?: AutopilotExecutionMode;
issue_title_template?: string | null;
// When present, fully replaces the autopilot's subscriber template;
// omit to leave it untouched.
subscribers?: AutopilotSubscriberInput[];
}
export interface CreateAutopilotTriggerRequest {

View File

@@ -4,6 +4,7 @@ export type InboxSeverity = "action_required" | "attention" | "info";
export type InboxItemType =
| "issue_assigned"
| "issue_subscribed"
| "unassigned"
| "assignee_changed"
| "status_changed"

View File

@@ -118,6 +118,8 @@ export type {
AutopilotStatus,
AutopilotExecutionMode,
AutopilotAssigneeType,
AutopilotSubscriber,
AutopilotSubscriberInput,
AutopilotTrigger,
AutopilotTriggerKind,
AutopilotRun,

View File

@@ -52,7 +52,12 @@ import {
toCronExpression,
} from "./trigger-config";
import type { TriggerConfig } from "./trigger-config";
import type { AutopilotExecutionMode, AutopilotRun, AutopilotTrigger } from "@multica/core/types";
import type {
AutopilotExecutionMode,
AutopilotRun,
AutopilotSubscriber,
AutopilotTrigger,
} from "@multica/core/types";
import type { AgentTask } from "@multica/core/types/agent";
import { ReadonlyContent } from "../../editor";
import { TranscriptButton } from "../../common/task-transcript";
@@ -573,6 +578,40 @@ function AddTriggerDialog({
);
}
// Read-only chip row; edits flow through AutopilotDialog → SubscriberMultiSelect
// so the detail page never holds in-flight selection state.
function SubscriberChips({
subscribers,
}: {
subscribers: AutopilotSubscriber[] | undefined;
}) {
const { t } = useT("autopilots");
const { getActorName } = useActorName();
const members = (subscribers ?? []).filter((s) => s.user_type === "member");
if (members.length === 0) {
return (
<div className="mt-1 text-sm text-muted-foreground">
{t(($) => $.detail.field_subscribers_none)}
</div>
);
}
return (
<div className="mt-1 flex flex-wrap gap-1.5">
{members.map((s) => (
<span
key={`${s.user_type}:${s.user_id}`}
className="inline-flex items-center gap-1 rounded-full border bg-background px-2 py-0.5 text-xs"
>
<ActorAvatar actorType="member" actorId={s.user_id} size={14} />
<span className="max-w-[14rem] truncate">
{getActorName("member", s.user_id)}
</span>
</span>
))}
</div>
);
}
export function AutopilotDetailPage({ autopilotId }: { autopilotId: string }) {
const { t } = useT("autopilots");
const wsId = useWorkspaceId();
@@ -802,6 +841,16 @@ export function AutopilotDetailPage({ autopilotId }: { autopilotId: string }) {
</div>
</div>
)}
{autopilot.execution_mode === "create_issue" && (
<div className="col-span-2">
<label className="text-xs text-muted-foreground">
{t(($) => $.detail.field_subscribers)}
</label>
<SubscriberChips
subscribers={autopilot.subscribers}
/>
</div>
)}
{autopilot.description && (
<div className="col-span-2">
<label className="text-xs text-muted-foreground">{t(($) => $.detail.field_prompt)}</label>
@@ -900,6 +949,10 @@ export function AutopilotDetailPage({ autopilotId }: { autopilotId: string }) {
assignee_type: autopilot.assignee_type,
assignee_id: autopilot.assignee_id,
execution_mode: autopilot.execution_mode as AutopilotExecutionMode,
subscriber_user_ids:
autopilot.subscribers
?.filter((s) => s.user_type === "member")
.map((s) => s.user_id) ?? [],
}}
triggers={triggers}
/>

View File

@@ -59,6 +59,7 @@ import { ActorAvatar } from "../../common/actor-avatar";
import { ProjectPicker } from "../../projects/components/project-picker";
import { ProjectIcon } from "../../projects/components/project-icon";
import { AgentPicker, type AssigneeSelection } from "./pickers/agent-picker";
import { SubscriberMultiSelect } from "./subscriber-multi-select";
import {
getDefaultTriggerConfig,
getLocalTimezone,
@@ -83,6 +84,7 @@ export interface AutopilotInitial {
assignee_type: AutopilotAssigneeType;
assignee_id: string;
execution_mode: AutopilotExecutionMode;
subscriber_user_ids?: string[];
}
export type AutopilotDialogProps =
@@ -285,6 +287,9 @@ export function AutopilotDialog(props: AutopilotDialogProps) {
const [executionMode, setExecutionMode] = useState<AutopilotExecutionMode>(
initial.execution_mode ?? "create_issue",
);
const [subscriberUserIds, setSubscriberUserIds] = useState<string[]>(
initial.subscriber_user_ids ?? [],
);
const initialCfg: TriggerConfig = (() => {
if (isCreate) {
@@ -379,6 +384,10 @@ export function AutopilotDialog(props: AutopilotDialogProps) {
assignee_type: assigneeType,
assignee_id: assigneeId,
execution_mode: executionMode,
subscribers: subscriberUserIds.map((user_id) => ({
user_type: "member" as const,
user_id,
})),
});
let triggerOk = true;
let triggerErrMessage: string | null = null;
@@ -428,6 +437,10 @@ export function AutopilotDialog(props: AutopilotDialogProps) {
assignee_type: assigneeType,
assignee_id: assigneeId,
execution_mode: executionMode,
subscribers: subscriberUserIds.map((user_id) => ({
user_type: "member" as const,
user_id,
})),
});
let triggerOk = true;
let triggerErrMessage: string | null = null;
@@ -645,6 +658,13 @@ export function AutopilotDialog(props: AutopilotDialogProps) {
/>
)}
{executionMode === "create_issue" && (
<SubscribersSection
selectedUserIds={subscriberUserIds}
onChange={setSubscriberUserIds}
/>
)}
{isCreate && (
<TriggerKindSection kind={triggerKind} onChange={setTriggerKind} />
)}
@@ -870,6 +890,28 @@ function ProjectSection({
);
}
function SubscribersSection({
selectedUserIds,
onChange,
}: {
selectedUserIds: string[];
onChange: (next: string[]) => void;
}) {
const { t } = useT("autopilots");
return (
<div>
<SectionLabel>{t(($) => $.dialog.section_subscribers)}</SectionLabel>
<p className="mb-2 text-[11px] text-muted-foreground">
{t(($) => $.dialog.subscribers_hint)}
</p>
<SubscriberMultiSelect
selectedIds={selectedUserIds}
onChange={onChange}
/>
</div>
);
}
function ScheduleSection({
config,
onChange,

View File

@@ -0,0 +1,125 @@
"use client";
import { useMemo, useState } from "react";
import { Plus, X } from "lucide-react";
import { useQuery } from "@tanstack/react-query";
import { useWorkspaceId } from "@multica/core/hooks";
import { memberListOptions } from "@multica/core/workspace/queries";
import { cn } from "@multica/ui/lib/utils";
import { ActorAvatar } from "../../common/actor-avatar";
import {
PropertyPicker,
PickerItem,
PickerEmpty,
} from "../../issues/components/pickers/property-picker";
import { matchesPinyin } from "../../editor/extensions/pinyin-match";
import { useT } from "../../i18n";
// Fully controlled — parent owns the selection state and ships it to the
// create/update mutation. Members-only on purpose (per RFC, MUL-2533).
export function SubscriberMultiSelect({
selectedIds,
onChange,
}: {
/** User IDs of the currently-selected member subscribers. */
selectedIds: ReadonlyArray<string>;
/** Called with the new full list whenever the selection changes. */
onChange: (next: string[]) => void;
}) {
const { t } = useT("autopilots");
const wsId = useWorkspaceId();
const { data: members = [] } = useQuery(memberListOptions(wsId));
const [open, setOpen] = useState(false);
const [filter, setFilter] = useState("");
const selectedSet = useMemo(() => new Set(selectedIds), [selectedIds]);
const query = filter.trim().toLowerCase();
const filteredMembers = useMemo(
() =>
members.filter(
(m) =>
query === "" ||
m.name.toLowerCase().includes(query) ||
matchesPinyin(m.name, query),
),
[members, query],
);
const selectedMembers = useMemo(
() => members.filter((m) => selectedSet.has(m.user_id)),
[members, selectedSet],
);
const toggle = (userId: string) => {
if (selectedSet.has(userId)) {
onChange(selectedIds.filter((id) => id !== userId));
} else {
onChange([...selectedIds, userId]);
}
};
const remove = (userId: string) => {
onChange(selectedIds.filter((id) => id !== userId));
};
return (
<div className="flex flex-wrap items-center gap-1.5">
{selectedMembers.map((m) => (
<span
key={m.user_id}
className="inline-flex items-center gap-1 rounded-full border bg-background px-2 py-0.5 text-xs"
>
<ActorAvatar actorType="member" actorId={m.user_id} size={14} />
<span className="max-w-[10rem] truncate">{m.name}</span>
<button
type="button"
onClick={() => remove(m.user_id)}
className="text-muted-foreground hover:text-foreground transition-colors cursor-pointer"
aria-label={t(($) => $.dialog.subscribers_remove_tooltip)}
>
<X className="size-3" />
</button>
</span>
))}
<PropertyPicker
open={open}
onOpenChange={(v) => {
setOpen(v);
if (!v) setFilter("");
}}
width="w-64"
align="start"
searchable
searchPlaceholder={t(($) => $.dialog.subscribers_search_placeholder)}
onSearchChange={setFilter}
trigger={
<span
className={cn(
"inline-flex items-center gap-1 rounded-full border border-dashed px-2 py-0.5 text-xs text-muted-foreground",
"hover:border-primary/40 hover:text-foreground transition-colors cursor-pointer",
)}
>
<Plus className="size-3" />
{t(($) => $.dialog.subscribers_add)}
</span>
}
>
{filteredMembers.length === 0 ? (
<PickerEmpty />
) : (
filteredMembers.map((m) => (
<PickerItem
key={m.user_id}
selected={selectedSet.has(m.user_id)}
onClick={() => toggle(m.user_id)}
>
<ActorAvatar actorType="member" actorId={m.user_id} size={18} />
<span className="truncate">{m.name}</span>
</PickerItem>
))
)}
</PropertyPicker>
</div>
);
}

View File

@@ -15,6 +15,7 @@ export function useTypeLabels(): Record<InboxItemType, string> {
const { t } = useT("inbox");
return {
issue_assigned: t(($) => $.types.issue_assigned),
issue_subscribed: t(($) => $.types.issue_subscribed),
unassigned: t(($) => $.types.unassigned),
assignee_changed: t(($) => $.types.assignee_changed),
status_changed: t(($) => $.types.status_changed),

View File

@@ -85,6 +85,8 @@
"no_project": "No project",
"project_unavailable": "Project unavailable",
"field_prompt": "Prompt",
"field_subscribers": "Subscribers",
"field_subscribers_none": "None",
"add_trigger": "Add trigger",
"no_triggers": "No triggers configured. Add a schedule to run automatically.",
"no_runs": "No runs yet. Click \"Run now\" to trigger manually.",
@@ -255,6 +257,13 @@
"section_project": "Project",
"no_project": "No project",
"section_output_mode": "Output mode",
"section_subscribers": "Subscribers",
"subscribers_hint": "Auto-subscribed to every issue this autopilot creates",
"subscribers_empty": "No subscribers — add a teammate to notify them on every run",
"subscribers_add": "Add subscriber",
"subscribers_search_placeholder": "Search members by name…",
"subscribers_no_results": "No matching members",
"subscribers_remove_tooltip": "Remove",
"section_schedule": "Schedule",
"section_trigger_kind": "Trigger",
"trigger_kind_schedule": "Schedule",

View File

@@ -29,6 +29,7 @@
},
"types": {
"issue_assigned": "Assigned",
"issue_subscribed": "Subscribed",
"unassigned": "Unassigned",
"assignee_changed": "Assignee changed",
"status_changed": "Status changed",

View File

@@ -85,6 +85,8 @@
"no_project": "プロジェクトなし",
"project_unavailable": "プロジェクトを利用できません",
"field_prompt": "プロンプト",
"field_subscribers": "購読者",
"field_subscribers_none": "なし",
"add_trigger": "トリガーを追加",
"no_triggers": "設定されたトリガーがありません。自動で実行するスケジュールを追加してください。",
"no_runs": "まだ実行履歴がありません。手動で実行するには「今すぐ実行」をクリックしてください。",
@@ -254,6 +256,13 @@
"select_assignee": "エージェントまたはスクワッドを選択",
"section_project": "プロジェクト",
"no_project": "プロジェクトなし",
"section_subscribers": "購読者",
"subscribers_hint": "このオートパイロットが作成するすべてのイシューを自動で購読します",
"subscribers_empty": "購読者はいません。チームメイトを追加すると、毎回通知されます",
"subscribers_add": "購読者を追加",
"subscribers_search_placeholder": "名前でメンバーを検索…",
"subscribers_no_results": "一致するメンバーはいません",
"subscribers_remove_tooltip": "削除",
"section_output_mode": "出力モード",
"section_schedule": "スケジュール",
"section_trigger_kind": "トリガー",

View File

@@ -29,6 +29,7 @@
},
"types": {
"issue_assigned": "割り当て済み",
"issue_subscribed": "購読済み",
"unassigned": "割り当て解除",
"assignee_changed": "担当者を変更",
"status_changed": "ステータスを変更",

View File

@@ -85,6 +85,8 @@
"no_project": "프로젝트 없음",
"project_unavailable": "프로젝트를 사용할 수 없습니다",
"field_prompt": "프롬프트",
"field_subscribers": "구독자",
"field_subscribers_none": "없음",
"add_trigger": "트리거 추가",
"no_triggers": "설정된 트리거가 없습니다. 자동으로 실행할 일정을 추가하세요.",
"no_runs": "아직 실행 기록이 없습니다. 수동으로 실행하려면 \"지금 실행\"을 클릭하세요.",
@@ -254,6 +256,13 @@
"select_assignee": "에이전트 또는 스쿼드 선택",
"section_project": "프로젝트",
"no_project": "프로젝트 없음",
"section_subscribers": "구독자",
"subscribers_hint": "이 오토파일럿이 만드는 모든 이슈를 자동으로 구독합니다",
"subscribers_empty": "구독자가 없습니다. 팀원을 추가하면 매번 알림을 받습니다",
"subscribers_add": "구독자 추가",
"subscribers_search_placeholder": "이름으로 멤버 검색…",
"subscribers_no_results": "일치하는 멤버가 없습니다",
"subscribers_remove_tooltip": "제거",
"section_output_mode": "출력 모드",
"section_schedule": "일정",
"section_trigger_kind": "트리거",

View File

@@ -29,6 +29,7 @@
},
"types": {
"issue_assigned": "할당됨",
"issue_subscribed": "구독됨",
"unassigned": "할당 해제됨",
"assignee_changed": "담당자 변경됨",
"status_changed": "상태 변경됨",

View File

@@ -85,6 +85,8 @@
"no_project": "不关联项目",
"project_unavailable": "项目不可用",
"field_prompt": "Prompt",
"field_subscribers": "订阅者",
"field_subscribers_none": "无",
"add_trigger": "添加触发器",
"no_triggers": "未配置触发器。添加一个时间表让它自动运行。",
"no_runs": "暂无运行记录。点击\"立即运行\"手动触发。",
@@ -255,6 +257,13 @@
"section_project": "关联项目",
"no_project": "不关联项目",
"section_output_mode": "输出模式",
"section_subscribers": "订阅者",
"subscribers_hint": "每次跑出来的 issue 默认订阅",
"subscribers_empty": "暂无订阅者——添加成员后,每次触发都会通知到他",
"subscribers_add": "添加订阅者",
"subscribers_search_placeholder": "按名称搜索成员……",
"subscribers_no_results": "没有匹配的成员",
"subscribers_remove_tooltip": "移除",
"section_schedule": "时间表",
"section_trigger_kind": "触发方式",
"trigger_kind_schedule": "时间表",

View File

@@ -29,6 +29,7 @@
},
"types": {
"issue_assigned": "已分配",
"issue_subscribed": "已订阅",
"unassigned": "已取消分配",
"assignee_changed": "分配人已更改",
"status_changed": "状态已更改",

View File

@@ -52,6 +52,18 @@ type AutopilotResponse struct {
TriggerKinds []string `json:"trigger_kinds,omitempty"`
NextRunAt *string `json:"next_run_at,omitempty"`
LastRunStatus *string `json:"last_run_status,omitempty"`
// Always non-nil (empty slice when no subscribers configured) so
// frontend optional-chain rules can treat the field as authoritative.
Subscribers []AutopilotSubscriberEntry `json:"subscribers"`
}
// user_type is restricted to "member" at the DB layer; the field is kept on
// the wire so a future expansion to agents/squads is additive, not breaking.
type AutopilotSubscriberEntry struct {
UserType string `json:"user_type"`
UserID string `json:"user_id"`
CreatedAt string `json:"created_at"`
}
type AutopilotTriggerResponse struct {
@@ -113,7 +125,7 @@ type AutopilotRunResponse struct {
// ── Converters ──────────────────────────────────────────────────────────────
func autopilotToResponse(a db.Autopilot) AutopilotResponse {
func autopilotToResponse(a db.Autopilot, subscribers []db.AutopilotSubscriber) AutopilotResponse {
assigneeType := a.AssigneeType
if assigneeType == "" {
// Older rows pre-MUL-2429 may surface as "" against an out-of-date
@@ -121,6 +133,14 @@ func autopilotToResponse(a db.Autopilot) AutopilotResponse {
// non-null.
assigneeType = "agent"
}
subResp := make([]AutopilotSubscriberEntry, len(subscribers))
for i, s := range subscribers {
subResp[i] = AutopilotSubscriberEntry{
UserType: s.UserType,
UserID: uuidToString(s.UserID),
CreatedAt: timestampToString(s.CreatedAt),
}
}
return AutopilotResponse{
ID: uuidToString(a.ID),
WorkspaceID: uuidToString(a.WorkspaceID),
@@ -137,6 +157,7 @@ func autopilotToResponse(a db.Autopilot) AutopilotResponse {
LastRunAt: timestampToPtr(a.LastRunAt),
CreatedAt: timestampToString(a.CreatedAt),
UpdatedAt: timestampToString(a.UpdatedAt),
Subscribers: subResp,
}
}
@@ -249,10 +270,11 @@ type CreateAutopilotRequest struct {
ProjectID *string `json:"project_id"`
// AssigneeType is optional and defaults to "agent" — preserves backward
// compatibility with desktop clients shipped before MUL-2429.
AssigneeType *string `json:"assignee_type"`
AssigneeID string `json:"assignee_id"`
ExecutionMode string `json:"execution_mode"`
IssueTitleTemplate *string `json:"issue_title_template"`
AssigneeType *string `json:"assignee_type"`
AssigneeID string `json:"assignee_id"`
ExecutionMode string `json:"execution_mode"`
IssueTitleTemplate *string `json:"issue_title_template"`
Subscribers []SubscriberInput `json:"subscribers"`
}
type UpdateAutopilotRequest struct {
@@ -264,6 +286,13 @@ type UpdateAutopilotRequest struct {
Status *string `json:"status"`
ExecutionMode *string `json:"execution_mode"`
IssueTitleTemplate *string `json:"issue_title_template"`
// Wholesale replacement when present; omit to leave subscribers untouched.
Subscribers []SubscriberInput `json:"subscribers"`
}
type SubscriberInput struct {
UserType string `json:"user_type"`
UserID string `json:"user_id"`
}
type CreateAutopilotTriggerRequest struct {
@@ -334,7 +363,9 @@ func (h *Handler) ListAutopilots(w http.ResponseWriter, r *http.Request) {
resp := make([]AutopilotResponse, len(autopilots))
for i, row := range autopilots {
r := autopilotToResponse(row.Autopilot)
// Omit subscribers to avoid an N+1; GET /api/autopilots/{id} is
// the source of truth for the populated template.
r := autopilotToResponse(row.Autopilot, nil)
r.TriggerKinds = row.TriggerKinds
if row.NextRunAt.Valid {
r.NextRunAt = timestampToPtr(row.NextRunAt)
@@ -357,7 +388,12 @@ func (h *Handler) GetAutopilot(w http.ResponseWriter, r *http.Request) {
return
}
resp := autopilotToResponse(autopilot)
subs, err := h.Queries.ListAutopilotSubscribers(r.Context(), autopilot.ID)
if err != nil {
// Don't 500 the detail fetch over template metadata.
subs = nil
}
resp := autopilotToResponse(autopilot, subs)
// Include triggers.
triggers, err := h.Queries.ListAutopilotTriggers(r.Context(), autopilot.ID)
@@ -456,7 +492,21 @@ func (h *Handler) CreateAutopilot(w http.ResponseWriter, r *http.Request) {
return
}
autopilot, err := h.Queries.CreateAutopilot(r.Context(), db.CreateAutopilotParams{
// Validate before insert so a bad payload doesn't half-create the row.
subscriberUUIDs, ok := h.validateAutopilotSubscribers(w, r, req.Subscribers, workspaceID)
if !ok {
return
}
tx, err := h.TxStarter.Begin(r.Context())
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create autopilot")
return
}
defer tx.Rollback(r.Context())
qtx := h.Queries.WithTx(tx)
autopilot, err := qtx.CreateAutopilot(r.Context(), db.CreateAutopilotParams{
WorkspaceID: wsUUID,
Title: req.Title,
AssigneeType: assigneeType,
@@ -474,7 +524,26 @@ func (h *Handler) CreateAutopilot(w http.ResponseWriter, r *http.Request) {
return
}
resp := autopilotToResponse(autopilot)
for _, uid := range subscriberUUIDs {
if err := qtx.AddAutopilotSubscriber(r.Context(), db.AddAutopilotSubscriberParams{
AutopilotID: autopilot.ID,
UserType: "member",
UserID: uid,
}); err != nil {
writeError(w, http.StatusInternalServerError, "failed to add autopilot subscriber")
return
}
}
if err := tx.Commit(r.Context()); err != nil {
writeError(w, http.StatusInternalServerError, "failed to create autopilot")
return
}
subs, err := h.Queries.ListAutopilotSubscribers(r.Context(), autopilot.ID)
if err != nil {
subs = nil
}
resp := autopilotToResponse(autopilot, subs)
h.publish(protocol.EventAutopilotCreated, workspaceID, "member", userID, map[string]any{"autopilot": resp})
obsmetrics.RecordEvent(h.Analytics, h.Metrics, analytics.AutopilotCreated(
userID,
@@ -486,6 +555,46 @@ func (h *Handler) CreateAutopilot(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusCreated, resp)
}
// Writes an HTTP error and returns ok=false on the first invalid entry.
// Returns (nil, true) when raw is empty — caller distinguishes "leave alone"
// from "replace with empty" via the raw-fields map, not this return.
func (h *Handler) validateAutopilotSubscribers(
w http.ResponseWriter,
r *http.Request,
raw []SubscriberInput,
workspaceID string,
) ([]pgtype.UUID, bool) {
if len(raw) == 0 {
return nil, true
}
out := make([]pgtype.UUID, 0, len(raw))
seen := make(map[string]bool, len(raw))
for i, entry := range raw {
if entry.UserType != "member" {
writeError(w, http.StatusBadRequest, fmt.Sprintf("subscribers[%d].user_type must be 'member'", i))
return nil, false
}
if entry.UserID == "" {
writeError(w, http.StatusBadRequest, fmt.Sprintf("subscribers[%d].user_id is required", i))
return nil, false
}
uid, ok := parseUUIDOrBadRequest(w, entry.UserID, fmt.Sprintf("subscribers[%d].user_id", i))
if !ok {
return nil, false
}
if seen[entry.UserID] {
continue
}
seen[entry.UserID] = true
if !h.isWorkspaceEntity(r.Context(), entry.UserType, entry.UserID, workspaceID) {
writeError(w, http.StatusBadRequest, fmt.Sprintf("subscribers[%d] is not a member of this workspace", i))
return nil, false
}
out = append(out, uid)
}
return out, true
}
func (h *Handler) UpdateAutopilot(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
workspaceID := h.resolveWorkspaceID(r)
@@ -594,13 +703,62 @@ func (h *Handler) UpdateAutopilot(w http.ResponseWriter, r *http.Request) {
}
}
autopilot, err := h.Queries.UpdateAutopilot(r.Context(), params)
// Subscribers are validated up-front (before any write) so a bad payload
// doesn't leave the autopilot row updated but the template stale.
var (
subscriberUUIDs []pgtype.UUID
replaceSubscribers bool
)
if _, sent := rawFields["subscribers"]; sent {
replaceSubscribers = true
validated, vok := h.validateAutopilotSubscribers(w, r, req.Subscribers, workspaceID)
if !vok {
return
}
subscriberUUIDs = validated
}
tx, err := h.TxStarter.Begin(r.Context())
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to update autopilot")
return
}
defer tx.Rollback(r.Context())
qtx := h.Queries.WithTx(tx)
autopilot, err := qtx.UpdateAutopilot(r.Context(), params)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to update autopilot")
return
}
resp := autopilotToResponse(autopilot)
if replaceSubscribers {
if err := qtx.DeleteAutopilotSubscribersForAutopilot(r.Context(), autopilot.ID); err != nil {
writeError(w, http.StatusInternalServerError, "failed to update subscribers")
return
}
for _, uid := range subscriberUUIDs {
if err := qtx.AddAutopilotSubscriber(r.Context(), db.AddAutopilotSubscriberParams{
AutopilotID: autopilot.ID,
UserType: "member",
UserID: uid,
}); err != nil {
writeError(w, http.StatusInternalServerError, "failed to add autopilot subscriber")
return
}
}
}
if err := tx.Commit(r.Context()); err != nil {
writeError(w, http.StatusInternalServerError, "failed to update autopilot")
return
}
subs, err := h.Queries.ListAutopilotSubscribers(r.Context(), autopilot.ID)
if err != nil {
subs = nil
}
resp := autopilotToResponse(autopilot, subs)
h.publish(protocol.EventAutopilotUpdated, workspaceID, "member", userID, map[string]any{"autopilot": resp})
writeJSON(w, http.StatusOK, resp)
}

View File

@@ -0,0 +1,598 @@
package handler
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
func installAutopilotSubscriberInsertFailure(t *testing.T) {
t.Helper()
ctx := context.Background()
suffix := time.Now().UnixNano()
functionName := fmt.Sprintf("autopilot_subscriber_fail_fn_%d", suffix)
triggerName := fmt.Sprintf("autopilot_subscriber_fail_%d", suffix)
t.Cleanup(func() {
testPool.Exec(ctx, fmt.Sprintf(`DROP TRIGGER IF EXISTS %s ON autopilot_subscriber`, triggerName))
testPool.Exec(ctx, fmt.Sprintf(`DROP FUNCTION IF EXISTS %s()`, functionName))
})
if _, err := testPool.Exec(ctx, fmt.Sprintf(`
CREATE FUNCTION %s() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
RAISE EXCEPTION 'forced autopilot subscriber insert failure';
END;
$$;
`, functionName)); err != nil {
t.Fatalf("install failure function: %v", err)
}
if _, err := testPool.Exec(ctx, fmt.Sprintf(`
CREATE TRIGGER %s
BEFORE INSERT ON autopilot_subscriber
FOR EACH ROW EXECUTE FUNCTION %s();
`, triggerName, functionName)); err != nil {
t.Fatalf("install failure trigger: %v", err)
}
}
// TestCreateAutopilotPersistsMemberSubscribers covers the happy path:
// supplying a non-empty `subscribers` array on POST /api/autopilots stores
// the rows and the response echoes them back. This is the create half of the
// MUL-2533 RFC ("autopilot default subscriber template").
func TestCreateAutopilotPersistsMemberSubscribers(t *testing.T) {
ctx := context.Background()
var autopilotID string
defer func() {
if autopilotID != "" {
testPool.Exec(ctx, `DELETE FROM autopilot WHERE id = $1`, autopilotID)
}
}()
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": "Subscriber template autopilot",
"assignee_id": agentID,
"execution_mode": "create_issue",
"subscribers": []map[string]any{
{"user_type": "member", "user_id": testUserID},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("CreateAutopilot: expected 201, got %d: %s", w.Code, w.Body.String())
}
var resp AutopilotResponse
if err := json.NewDecoder(w.Body).Decode(&resp); err != nil {
t.Fatalf("decode autopilot: %v", err)
}
autopilotID = resp.ID
if len(resp.Subscribers) != 1 {
t.Fatalf("subscribers in response = %d, want 1", len(resp.Subscribers))
}
if resp.Subscribers[0].UserType != "member" || resp.Subscribers[0].UserID != testUserID {
t.Fatalf("subscribers[0] = %+v, want member/%s", resp.Subscribers[0], testUserID)
}
// Confirm the row landed in the DB. Belt-and-braces: the response could
// in principle be assembled from the request without writing.
var count int
if err := testPool.QueryRow(ctx, `
SELECT count(*) FROM autopilot_subscriber WHERE autopilot_id = $1
`, autopilotID).Scan(&count); err != nil {
t.Fatalf("count subscribers: %v", err)
}
if count != 1 {
t.Fatalf("autopilot_subscriber rows = %d, want 1", count)
}
}
// TestCreateAutopilotRejectsNonMemberSubscriberType locks in the first-version
// constraint: only user_type='member' is accepted on the API. The DB CHECK
// would also reject anything else; the 400 here exists so the client gets a
// clear message instead of a 500 with a constraint-name leak.
func TestCreateAutopilotRejectsNonMemberSubscriberType(t *testing.T) {
var agentID string
if err := testPool.QueryRow(context.Background(), `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": "Bad subscriber type",
"assignee_id": agentID,
"execution_mode": "create_issue",
"subscribers": []map[string]any{
{"user_type": "agent", "user_id": agentID},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("CreateAutopilot: expected 400 for non-member subscriber, got %d: %s", w.Code, w.Body.String())
}
}
// TestCreateAutopilotRejectsForeignSubscriber covers the boundary check:
// supplying a UUID that does not belong to this workspace must 400, not
// silently leak inside the autopilot row.
func TestCreateAutopilotRejectsForeignSubscriber(t *testing.T) {
var agentID string
if err := testPool.QueryRow(context.Background(), `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": "Foreign subscriber",
"assignee_id": agentID,
"execution_mode": "create_issue",
"subscribers": []map[string]any{
{"user_type": "member", "user_id": "00000000-0000-0000-0000-000000000000"},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("CreateAutopilot: expected 400 for foreign member subscriber, got %d: %s", w.Code, w.Body.String())
}
}
func TestCreateAutopilotRollsBackWhenSubscriberInsertFails(t *testing.T) {
ctx := context.Background()
title := fmt.Sprintf("Subscriber rollback create %d", time.Now().UnixNano())
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
installAutopilotSubscriberInsertFailure(t)
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": title,
"assignee_id": agentID,
"execution_mode": "create_issue",
"subscribers": []map[string]any{
{"user_type": "member", "user_id": testUserID},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusInternalServerError {
t.Fatalf("CreateAutopilot: expected 500 for forced subscriber insert failure, got %d: %s", w.Code, w.Body.String())
}
var count int
if err := testPool.QueryRow(ctx, `
SELECT count(*) FROM autopilot
WHERE workspace_id = $1 AND title = $2
`, testWorkspaceID, title).Scan(&count); err != nil {
t.Fatalf("count rolled-back autopilots: %v", err)
}
if count != 0 {
t.Fatalf("autopilot rows after failed subscriber insert = %d, want 0", count)
}
}
// TestUpdateAutopilotFullReplaceSubscribers covers the PATCH semantics from
// the RFC: sending `subscribers` wipes whatever was there and re-inserts the
// new set. Omitting the field would leave the previous template untouched;
// that branch is exercised separately by TestUpdateAutopilotPreservesSubscribersWhenOmitted.
func TestUpdateAutopilotFullReplaceSubscribers(t *testing.T) {
ctx := context.Background()
var autopilotID string
defer func() {
if autopilotID != "" {
testPool.Exec(ctx, `DELETE FROM autopilot WHERE id = $1`, autopilotID)
}
}()
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": "Replace subscribers autopilot",
"assignee_id": agentID,
"execution_mode": "create_issue",
"subscribers": []map[string]any{
{"user_type": "member", "user_id": testUserID},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("CreateAutopilot: expected 201, got %d: %s", w.Code, w.Body.String())
}
var created AutopilotResponse
if err := json.NewDecoder(w.Body).Decode(&created); err != nil {
t.Fatalf("decode created: %v", err)
}
autopilotID = created.ID
// PATCH with an empty array → expect zero subscribers afterward.
w = httptest.NewRecorder()
req = newRequest("PATCH", "/api/autopilots/"+autopilotID+"?workspace_id="+testWorkspaceID, map[string]any{
"subscribers": []map[string]any{},
})
req = withURLParam(req, "id", autopilotID)
testHandler.UpdateAutopilot(w, req)
if w.Code != http.StatusOK {
t.Fatalf("UpdateAutopilot: expected 200, got %d: %s", w.Code, w.Body.String())
}
var updated AutopilotResponse
if err := json.NewDecoder(w.Body).Decode(&updated); err != nil {
t.Fatalf("decode updated: %v", err)
}
if len(updated.Subscribers) != 0 {
t.Fatalf("subscribers after empty replace = %d, want 0", len(updated.Subscribers))
}
var count int
if err := testPool.QueryRow(ctx, `SELECT count(*) FROM autopilot_subscriber WHERE autopilot_id = $1`, autopilotID).Scan(&count); err != nil {
t.Fatalf("count after replace: %v", err)
}
if count != 0 {
t.Fatalf("DB rows after empty replace = %d, want 0", count)
}
}
func TestUpdateAutopilotRollsBackWhenSubscriberInsertFails(t *testing.T) {
ctx := context.Background()
originalTitle := fmt.Sprintf("Subscriber rollback update %d", time.Now().UnixNano())
updatedTitle := originalTitle + " changed"
var autopilotID string
defer func() {
if autopilotID != "" {
testPool.Exec(ctx, `DELETE FROM autopilot WHERE id = $1`, autopilotID)
}
}()
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": originalTitle,
"assignee_id": agentID,
"execution_mode": "create_issue",
"subscribers": []map[string]any{
{"user_type": "member", "user_id": testUserID},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("CreateAutopilot: expected 201, got %d: %s", w.Code, w.Body.String())
}
var created AutopilotResponse
if err := json.NewDecoder(w.Body).Decode(&created); err != nil {
t.Fatalf("decode created: %v", err)
}
autopilotID = created.ID
installAutopilotSubscriberInsertFailure(t)
w = httptest.NewRecorder()
req = newRequest("PATCH", "/api/autopilots/"+autopilotID+"?workspace_id="+testWorkspaceID, map[string]any{
"title": updatedTitle,
"subscribers": []map[string]any{
{"user_type": "member", "user_id": testUserID},
},
})
req = withURLParam(req, "id", autopilotID)
testHandler.UpdateAutopilot(w, req)
if w.Code != http.StatusInternalServerError {
t.Fatalf("UpdateAutopilot: expected 500 for forced subscriber insert failure, got %d: %s", w.Code, w.Body.String())
}
var gotTitle string
if err := testPool.QueryRow(ctx, `SELECT title FROM autopilot WHERE id = $1`, autopilotID).Scan(&gotTitle); err != nil {
t.Fatalf("load autopilot title after rollback: %v", err)
}
if gotTitle != originalTitle {
t.Fatalf("autopilot title after failed subscriber replace = %q, want %q", gotTitle, originalTitle)
}
var count int
if err := testPool.QueryRow(ctx, `SELECT count(*) FROM autopilot_subscriber WHERE autopilot_id = $1`, autopilotID).Scan(&count); err != nil {
t.Fatalf("count subscribers after rollback: %v", err)
}
if count != 1 {
t.Fatalf("subscriber rows after failed replace = %d, want 1", count)
}
}
// TestUpdateAutopilotPreservesSubscribersWhenOmitted asserts the
// "omit the field to leave it alone" contract — a previously-set template
// must NOT be wiped just because the client sent a partial PATCH.
func TestUpdateAutopilotPreservesSubscribersWhenOmitted(t *testing.T) {
ctx := context.Background()
var autopilotID string
defer func() {
if autopilotID != "" {
testPool.Exec(ctx, `DELETE FROM autopilot WHERE id = $1`, autopilotID)
}
}()
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": "Preserve subscribers autopilot",
"assignee_id": agentID,
"execution_mode": "create_issue",
"subscribers": []map[string]any{
{"user_type": "member", "user_id": testUserID},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("CreateAutopilot: expected 201, got %d: %s", w.Code, w.Body.String())
}
var created AutopilotResponse
if err := json.NewDecoder(w.Body).Decode(&created); err != nil {
t.Fatalf("decode created: %v", err)
}
autopilotID = created.ID
// PATCH a different field, leave subscribers out → row count unchanged.
w = httptest.NewRecorder()
req = newRequest("PATCH", "/api/autopilots/"+autopilotID+"?workspace_id="+testWorkspaceID, map[string]any{
"title": "Preserve subscribers autopilot (renamed)",
})
req = withURLParam(req, "id", autopilotID)
testHandler.UpdateAutopilot(w, req)
if w.Code != http.StatusOK {
t.Fatalf("UpdateAutopilot: expected 200, got %d: %s", w.Code, w.Body.String())
}
var count int
if err := testPool.QueryRow(ctx, `SELECT count(*) FROM autopilot_subscriber WHERE autopilot_id = $1`, autopilotID).Scan(&count); err != nil {
t.Fatalf("count after omitted PATCH: %v", err)
}
if count != 1 {
t.Fatalf("DB rows after omitted PATCH = %d, want 1 (subscribers must not have been touched)", count)
}
}
// TestAutopilotDispatchFansOutSubscribersToIssue is the integration check
// for the dispatch path: an autopilot with a default subscriber list must
// auto-subscribe each entry to the issue it spawns, with reason='autopilot'.
// Belt-and-braces: also confirms that the creator-of-the-issue (the assignee
// agent — see TestAutopilotCreatedIssueCreatorIsAssigneeAgent) gets a row
// with reason='creator', and the two reasons don't fight (PK is one row per
// (issue, user_type, user_id), so the first one wins on conflict).
func TestAutopilotDispatchFansOutSubscribersToIssue(t *testing.T) {
ctx := context.Background()
title := fmt.Sprintf("Autopilot subscriber fanout %d", time.Now().UnixNano())
var autopilotID, issueID string
defer func() {
if issueID != "" {
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
}
if autopilotID != "" {
testPool.Exec(ctx, `DELETE FROM autopilot WHERE id = $1`, autopilotID)
}
}()
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": "Subscriber fanout autopilot",
"assignee_id": agentID,
"execution_mode": "create_issue",
"issue_title_template": title,
"subscribers": []map[string]any{
{"user_type": "member", "user_id": testUserID},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("CreateAutopilot: expected 201, got %d: %s", w.Code, w.Body.String())
}
var autopilot AutopilotResponse
if err := json.NewDecoder(w.Body).Decode(&autopilot); err != nil {
t.Fatalf("decode autopilot: %v", err)
}
autopilotID = autopilot.ID
queries := db.New(testPool)
ap, err := queries.GetAutopilot(ctx, parseUUID(autopilotID))
if err != nil {
t.Fatalf("GetAutopilot: %v", err)
}
run, err := testHandler.AutopilotService.DispatchAutopilot(ctx, ap, pgtype.UUID{}, "manual", nil)
if err != nil {
t.Fatalf("DispatchAutopilot: %v", err)
}
if run == nil || !run.IssueID.Valid {
t.Fatalf("dispatch run = %+v, want linked issue", run)
}
issueID = uuidToString(run.IssueID)
var subscriberReason string
if err := testPool.QueryRow(ctx, `
SELECT reason
FROM issue_subscriber
WHERE issue_id = $1 AND user_type = 'member' AND user_id = $2
`, issueID, testUserID).Scan(&subscriberReason); err != nil {
t.Fatalf("query autopilot-fanned subscriber: %v", err)
}
if subscriberReason != "autopilot" {
t.Fatalf("subscriber reason = %q, want %q", subscriberReason, "autopilot")
}
}
// TestAutopilotDispatchNotifiesSubscribersOnCreate locks in the OQ3 promise
// from the RFC ("reason='autopilot' 与 reason='manual' 一致,订阅事件全收"):
// when an autopilot creates an issue, each template subscriber must land in
// the recipient's inbox with type='issue_subscribed' pointing at the new
// issue. Without this, subscribers would only see comment/status updates
// after the fact and miss the creation event itself — flagged in PR #3060
// review by the Emacs agent.
func TestAutopilotDispatchNotifiesSubscribersOnCreate(t *testing.T) {
ctx := context.Background()
title := fmt.Sprintf("Autopilot subscriber inbox %d", time.Now().UnixNano())
var autopilotID, issueID string
defer func() {
if issueID != "" {
testPool.Exec(ctx, `DELETE FROM inbox_item WHERE issue_id = $1`, issueID)
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
}
if autopilotID != "" {
testPool.Exec(ctx, `DELETE FROM autopilot WHERE id = $1`, autopilotID)
}
}()
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": "Subscriber inbox autopilot",
"assignee_id": agentID,
"execution_mode": "create_issue",
"issue_title_template": title,
"subscribers": []map[string]any{
{"user_type": "member", "user_id": testUserID},
},
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("CreateAutopilot: expected 201, got %d: %s", w.Code, w.Body.String())
}
var autopilot AutopilotResponse
if err := json.NewDecoder(w.Body).Decode(&autopilot); err != nil {
t.Fatalf("decode autopilot: %v", err)
}
autopilotID = autopilot.ID
queries := db.New(testPool)
ap, err := queries.GetAutopilot(ctx, parseUUID(autopilotID))
if err != nil {
t.Fatalf("GetAutopilot: %v", err)
}
run, err := testHandler.AutopilotService.DispatchAutopilot(ctx, ap, pgtype.UUID{}, "manual", nil)
if err != nil {
t.Fatalf("DispatchAutopilot: %v", err)
}
if run == nil || !run.IssueID.Valid {
t.Fatalf("dispatch run = %+v, want linked issue", run)
}
issueID = uuidToString(run.IssueID)
var inboxCount int
var inboxType, inboxTitle string
if err := testPool.QueryRow(ctx, `
SELECT count(*) FROM inbox_item
WHERE issue_id = $1 AND recipient_id = $2 AND type = 'issue_subscribed'
`, issueID, testUserID).Scan(&inboxCount); err != nil {
t.Fatalf("count inbox rows: %v", err)
}
if inboxCount != 1 {
t.Fatalf("inbox_item rows for subscriber = %d, want 1", inboxCount)
}
if err := testPool.QueryRow(ctx, `
SELECT type, title FROM inbox_item
WHERE issue_id = $1 AND recipient_id = $2 AND type = 'issue_subscribed'
`, issueID, testUserID).Scan(&inboxType, &inboxTitle); err != nil {
t.Fatalf("load inbox row: %v", err)
}
if inboxType != "issue_subscribed" {
t.Fatalf("inbox type = %q, want issue_subscribed", inboxType)
}
if inboxTitle != title {
t.Fatalf("inbox title = %q, want %q (issue title)", inboxTitle, title)
}
}
// TestAutopilotDispatchSkipsInboxWhenNoSubscribers asserts the no-op path:
// an autopilot with an empty subscriber template must NOT create any inbox
// rows on dispatch — otherwise we'd be paging the workspace on every quiet
// autopilot run. The corresponding issue_subscriber rows are also expected
// to be absent (other-reason rows like creator/assignee are filtered out by
// the WHERE type = 'issue_subscribed' clause).
func TestAutopilotDispatchSkipsInboxWhenNoSubscribers(t *testing.T) {
ctx := context.Background()
title := fmt.Sprintf("Autopilot no-subscriber inbox %d", time.Now().UnixNano())
var autopilotID, issueID string
defer func() {
if issueID != "" {
testPool.Exec(ctx, `DELETE FROM inbox_item WHERE issue_id = $1`, issueID)
testPool.Exec(ctx, `DELETE FROM issue WHERE id = $1`, issueID)
}
if autopilotID != "" {
testPool.Exec(ctx, `DELETE FROM autopilot WHERE id = $1`, autopilotID)
}
}()
var agentID string
if err := testPool.QueryRow(ctx, `SELECT id FROM agent WHERE workspace_id = $1 LIMIT 1`, testWorkspaceID).Scan(&agentID); err != nil {
t.Fatalf("load test agent: %v", err)
}
w := httptest.NewRecorder()
req := newRequest("POST", "/api/autopilots?workspace_id="+testWorkspaceID, map[string]any{
"title": "No-subscriber autopilot",
"assignee_id": agentID,
"execution_mode": "create_issue",
"issue_title_template": title,
})
testHandler.CreateAutopilot(w, req)
if w.Code != http.StatusCreated {
t.Fatalf("CreateAutopilot: expected 201, got %d: %s", w.Code, w.Body.String())
}
var autopilot AutopilotResponse
if err := json.NewDecoder(w.Body).Decode(&autopilot); err != nil {
t.Fatalf("decode autopilot: %v", err)
}
autopilotID = autopilot.ID
queries := db.New(testPool)
ap, err := queries.GetAutopilot(ctx, parseUUID(autopilotID))
if err != nil {
t.Fatalf("GetAutopilot: %v", err)
}
run, err := testHandler.AutopilotService.DispatchAutopilot(ctx, ap, pgtype.UUID{}, "manual", nil)
if err != nil {
t.Fatalf("DispatchAutopilot: %v", err)
}
if run == nil || !run.IssueID.Valid {
t.Fatalf("dispatch run = %+v, want linked issue", run)
}
issueID = uuidToString(run.IssueID)
var inboxCount int
if err := testPool.QueryRow(ctx, `
SELECT count(*) FROM inbox_item
WHERE issue_id = $1 AND type = 'issue_subscribed'
`, issueID).Scan(&inboxCount); err != nil {
t.Fatalf("count inbox rows: %v", err)
}
if inboxCount != 0 {
t.Fatalf("issue_subscribed inbox rows = %d, want 0 (no subscribers)", inboxCount)
}
}

View File

@@ -197,6 +197,25 @@ func (s *AutopilotService) dispatchCreateIssue(ctx context.Context, ap db.Autopi
return fmt.Errorf("create issue: %w", err)
}
// Fan out the default subscriber template inside the same tx as the
// issue insert, before EventIssueCreated fires — so notification
// listeners see the full subscriber set on the first event instead of
// racing the listener that would otherwise hydrate the template.
templateSubs, err := qtx.ListAutopilotSubscribers(ctx, ap.ID)
if err != nil {
return fmt.Errorf("list autopilot subscribers: %w", err)
}
for _, sub := range templateSubs {
if err := qtx.AddIssueSubscriber(ctx, db.AddIssueSubscriberParams{
IssueID: issue.ID,
UserType: sub.UserType,
UserID: sub.UserID,
Reason: "autopilot",
}); err != nil {
return fmt.Errorf("add autopilot subscriber to issue: %w", err)
}
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("commit tx: %w", err)
}
@@ -227,6 +246,17 @@ func (s *AutopilotService) dispatchCreateIssue(ctx context.Context, ap db.Autopi
})
s.captureIssueCreatedFromAutopilot(ap, run, issue, leader.ID)
// The issue:created notification listener only handles handler.IssueResponse
// payloads and only direct-notifies the assignee + @mentions; subscribers
// don't get an inbox at creation time on the manual path because there are
// none yet. The autopilot path is different: the template subscribers were
// fanned out into issue_subscriber inside the tx above, so they exist at the
// moment of creation and OQ3 says they should receive the same subscription
// events as reason='manual'. Issue creation is one such event — so write
// the inbox rows directly here. Done after commit so a failure here doesn't
// roll back the issue itself.
s.notifyAutopilotSubscribersOnCreate(ctx, ap, issue, leader.ID, templateSubs)
// Enqueue agent task via the existing flow. Squad-assigned autopilots
// route to the resolved leader as the executing agent (Path A from
// MUL-2429); agent-assigned autopilots go through the standard issue
@@ -257,6 +287,85 @@ func (s *AutopilotService) dispatchCreateIssue(ctx context.Context, ap db.Autopi
return nil
}
// notifyAutopilotSubscribersOnCreate writes an inbox_item for each template
// subscriber of an autopilot-created issue and broadcasts an inbox:new event
// so the recipient's inbox updates in real time. Mirrors the inbox payload
// shape from notification_listeners.go so the WS consumer sees the same fields
// the listener-driven path produces. Failures are logged, not propagated:
// the issue and its subscriber rows are already committed, and an inbox-write
// hiccup must not bubble up as a dispatch failure.
func (s *AutopilotService) notifyAutopilotSubscribersOnCreate(
ctx context.Context,
ap db.Autopilot,
issue db.Issue,
leaderID pgtype.UUID,
subscribers []db.AutopilotSubscriber,
) {
if len(subscribers) == 0 {
return
}
details, _ := json.Marshal(map[string]string{
"autopilot_id": util.UUIDToString(ap.ID),
"reason": "autopilot",
})
for _, sub := range subscribers {
// Autopilot subscribers are restricted to user_type='member' at the
// handler boundary; defend in case that constraint is ever relaxed
// (agents don't have inbox).
if sub.UserType != "member" {
continue
}
item, err := s.Queries.CreateInboxItem(ctx, db.CreateInboxItemParams{
WorkspaceID: ap.WorkspaceID,
RecipientType: "member",
RecipientID: sub.UserID,
Type: "issue_subscribed",
Severity: "info",
IssueID: issue.ID,
Title: issue.Title,
Body: pgtype.Text{},
ActorType: pgtype.Text{String: "agent", Valid: true},
ActorID: leaderID,
Details: details,
})
if err != nil {
slog.Error("autopilot subscriber inbox write failed",
"autopilot_id", util.UUIDToString(ap.ID),
"issue_id", util.UUIDToString(issue.ID),
"recipient_id", util.UUIDToString(sub.UserID),
"error", err,
)
continue
}
s.Bus.Publish(events.Event{
Type: protocol.EventInboxNew,
WorkspaceID: util.UUIDToString(ap.WorkspaceID),
ActorType: "agent",
ActorID: util.UUIDToString(leaderID),
Payload: map[string]any{
"item": map[string]any{
"id": util.UUIDToString(item.ID),
"workspace_id": util.UUIDToString(item.WorkspaceID),
"recipient_type": item.RecipientType,
"recipient_id": util.UUIDToString(item.RecipientID),
"type": item.Type,
"severity": item.Severity,
"issue_id": util.UUIDToPtr(item.IssueID),
"issue_status": issue.Status,
"title": item.Title,
"body": util.TextToPtr(item.Body),
"read": item.Read,
"archived": item.Archived,
"created_at": util.TimestampToString(item.CreatedAt),
"actor_type": util.TextToPtr(item.ActorType),
"actor_id": util.UUIDToPtr(item.ActorID),
"details": json.RawMessage(item.Details),
},
},
})
}
}
// errDispatchSkipped wraps a readiness failure encountered after the
// admission gate has already passed. dispatchRunOnly returns this when a
// resolved leader has gone offline / been archived between admission and

View File

@@ -0,0 +1,11 @@
-- Rows still carrying reason='autopilot' would violate the restored CHECK
-- constraint, so drop them first. Operators wanting an audit trail should
-- backfill reason='manual' before rolling this back.
DELETE FROM issue_subscriber WHERE reason = 'autopilot';
ALTER TABLE issue_subscriber DROP CONSTRAINT issue_subscriber_reason_check;
ALTER TABLE issue_subscriber ADD CONSTRAINT issue_subscriber_reason_check
CHECK (reason IN ('creator', 'assignee', 'commenter', 'mentioned', 'manual'));
DROP INDEX IF EXISTS idx_autopilot_subscriber_user;
DROP TABLE IF EXISTS autopilot_subscriber;

View File

@@ -0,0 +1,20 @@
-- Template list of workspace members auto-subscribed to every issue spawned
-- by the autopilot. Members-only for now (broaden the CHECK to expand).
-- No FK on user_id — workspace membership is enforced at the API boundary
-- (isWorkspaceEntity in the handler), matching issue_subscriber.
CREATE TABLE autopilot_subscriber (
autopilot_id UUID NOT NULL REFERENCES autopilot(id) ON DELETE CASCADE,
user_type TEXT NOT NULL CHECK (user_type IN ('member')),
user_id UUID NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (autopilot_id, user_type, user_id)
);
-- Reverse-lookup index for "which autopilots auto-subscribe this member?";
-- the PK can't answer that since autopilot_id is its leading column.
CREATE INDEX idx_autopilot_subscriber_user
ON autopilot_subscriber (user_type, user_id);
ALTER TABLE issue_subscriber DROP CONSTRAINT issue_subscriber_reason_check;
ALTER TABLE issue_subscriber ADD CONSTRAINT issue_subscriber_reason_check
CHECK (reason IN ('creator', 'assignee', 'commenter', 'mentioned', 'manual', 'autopilot'));

View File

@@ -11,6 +11,23 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const addAutopilotSubscriber = `-- name: AddAutopilotSubscriber :exec
INSERT INTO autopilot_subscriber (autopilot_id, user_type, user_id)
VALUES ($1, $2, $3)
ON CONFLICT (autopilot_id, user_type, user_id) DO NOTHING
`
type AddAutopilotSubscriberParams struct {
AutopilotID pgtype.UUID `json:"autopilot_id"`
UserType string `json:"user_type"`
UserID pgtype.UUID `json:"user_id"`
}
func (q *Queries) AddAutopilotSubscriber(ctx context.Context, arg AddAutopilotSubscriberParams) error {
_, err := q.db.Exec(ctx, addAutopilotSubscriber, arg.AutopilotID, arg.UserType, arg.UserID)
return err
}
const advanceTriggerNextRun = `-- name: AdvanceTriggerNextRun :exec
UPDATE autopilot_trigger
SET next_run_at = $2,
@@ -347,6 +364,17 @@ func (q *Queries) DeleteAutopilot(ctx context.Context, id pgtype.UUID) error {
return err
}
const deleteAutopilotSubscribersForAutopilot = `-- name: DeleteAutopilotSubscribersForAutopilot :exec
DELETE FROM autopilot_subscriber
WHERE autopilot_id = $1
`
// Paired with a re-insert loop to implement full-replace PATCH semantics.
func (q *Queries) DeleteAutopilotSubscribersForAutopilot(ctx context.Context, autopilotID pgtype.UUID) error {
_, err := q.db.Exec(ctx, deleteAutopilotSubscribersForAutopilot, autopilotID)
return err
}
const deleteAutopilotTrigger = `-- name: DeleteAutopilotTrigger :exec
DELETE FROM autopilot_trigger WHERE id = $1
`
@@ -622,6 +650,42 @@ func (q *Queries) ListAutopilotRuns(ctx context.Context, arg ListAutopilotRunsPa
return items, nil
}
const listAutopilotSubscribers = `-- name: ListAutopilotSubscribers :many
SELECT autopilot_id, user_type, user_id, created_at FROM autopilot_subscriber
WHERE autopilot_id = $1
ORDER BY created_at ASC, user_id ASC
`
// =====================
// Autopilot Subscribers
// =====================
// ORDER BY created_at keeps chip rendering stable across refreshes.
func (q *Queries) ListAutopilotSubscribers(ctx context.Context, autopilotID pgtype.UUID) ([]AutopilotSubscriber, error) {
rows, err := q.db.Query(ctx, listAutopilotSubscribers, autopilotID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []AutopilotSubscriber{}
for rows.Next() {
var i AutopilotSubscriber
if err := rows.Scan(
&i.AutopilotID,
&i.UserType,
&i.UserID,
&i.CreatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listAutopilotTriggers = `-- name: ListAutopilotTriggers :many
SELECT id, autopilot_id, kind, enabled, cron_expression, timezone, next_run_at, webhook_token, label, last_fired_at, created_at, updated_at, provider, signing_secret, event_filters FROM autopilot_trigger

View File

@@ -151,6 +151,13 @@ type AutopilotRun struct {
SquadID pgtype.UUID `json:"squad_id"`
}
type AutopilotSubscriber struct {
AutopilotID pgtype.UUID `json:"autopilot_id"`
UserType string `json:"user_type"`
UserID pgtype.UUID `json:"user_id"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type AutopilotTrigger struct {
ID pgtype.UUID `json:"id"`
AutopilotID pgtype.UUID `json:"autopilot_id"`

View File

@@ -353,3 +353,24 @@ UPDATE autopilot
SET status = 'paused', updated_at = now()
WHERE id = $1 AND status = 'active'
RETURNING *;
-- =====================
-- Autopilot Subscribers
-- =====================
-- name: ListAutopilotSubscribers :many
-- ORDER BY created_at keeps chip rendering stable across refreshes.
SELECT * FROM autopilot_subscriber
WHERE autopilot_id = $1
ORDER BY created_at ASC, user_id ASC;
-- name: AddAutopilotSubscriber :exec
INSERT INTO autopilot_subscriber (autopilot_id, user_type, user_id)
VALUES ($1, $2, $3)
ON CONFLICT (autopilot_id, user_type, user_id) DO NOTHING;
-- name: DeleteAutopilotSubscribersForAutopilot :exec
-- Paired with a re-insert loop to implement full-replace PATCH semantics.
DELETE FROM autopilot_subscriber
WHERE autopilot_id = $1;