Compare commits

...

3 Commits

Author SHA1 Message Date
Naiyuan Qing
13fa01bb0a Guard cancelled chat restore on tx failure
Co-authored-by: multica-agent <github@multica.ai>
2026-06-12 15:11:40 +08:00
Naiyuan Qing
f660d28b4c Fix chat cancel recovery follow-ups
Co-authored-by: multica-agent <github@multica.ai>
2026-06-12 15:01:39 +08:00
Naiyuan Qing
a08c4283c3 Fix chat stop and send recovery
Co-authored-by: multica-agent <github@multica.ai>
2026-06-12 14:38:32 +08:00
20 changed files with 926 additions and 54 deletions

View File

@@ -487,6 +487,109 @@ describe("ApiClient", () => {
});
});
describe("cancelTaskById response parsing", () => {
const taskResponse = {
id: "task-1",
agent_id: "agent-1",
runtime_id: "runtime-1",
issue_id: "",
status: "cancelled",
priority: 0,
dispatched_at: null,
started_at: null,
completed_at: "2026-06-12T06:40:00Z",
result: null,
error: null,
created_at: "2026-06-12T06:39:00Z",
};
it("parses the cancelled chat message payload", async () => {
const fetchMock = vi.fn().mockResolvedValue(
new Response(JSON.stringify({
...taskResponse,
cancelled_chat_message: {
chat_session_id: "session-1",
message_id: "message-1",
content: "restore me",
restore_to_input: true,
},
}), {
status: 200,
headers: { "Content-Type": "application/json" },
}),
);
vi.stubGlobal("fetch", fetchMock);
const client = new ApiClient("https://api.example.test");
const result = await client.cancelTaskById("task-1");
expect(fetchMock.mock.calls[0]).toMatchObject([
"https://api.example.test/api/tasks/task-1/cancel",
{ method: "POST" },
]);
expect(result.cancelled_chat_message).toEqual({
chat_session_id: "session-1",
message_id: "message-1",
content: "restore me",
restore_to_input: true,
});
});
it("treats a null cancelled chat message as absent", async () => {
vi.stubGlobal(
"fetch",
vi.fn().mockResolvedValue(
new Response(JSON.stringify({
...taskResponse,
cancelled_chat_message: null,
}), {
status: 200,
headers: { "Content-Type": "application/json" },
}),
),
);
const client = new ApiClient("https://api.example.test");
const result = await client.cancelTaskById("task-1");
expect(result.id).toBe("task-1");
expect(result.cancelled_chat_message).toBeUndefined();
});
it.each([
["a missing task id", { ...taskResponse, id: undefined }],
[
"a malformed cancelled chat message",
{
...taskResponse,
cancelled_chat_message: {
chat_session_id: "session-1",
message_id: "message-1",
content: "restore me",
restore_to_input: "true",
},
},
],
["a null body", null],
])("falls back for %s", async (_label, body) => {
vi.stubGlobal(
"fetch",
vi.fn().mockResolvedValue(
new Response(JSON.stringify(body), {
status: 200,
headers: { "Content-Type": "application/json" },
}),
),
);
const client = new ApiClient("https://api.example.test");
const result = await client.cancelTaskById("task-1");
expect(result.id).toBe("");
expect(result.cancelled_chat_message).toBeUndefined();
});
});
describe("chat attachment wiring", () => {
it("uploadFile includes chat_session_id in the FormData body", async () => {
const fetchMock = vi.fn().mockResolvedValue(

View File

@@ -66,6 +66,7 @@ import type {
ChatPendingTask,
PendingChatTasksResponse,
SendChatMessageResponse,
CancelTaskResponse,
Project,
CreateProjectRequest,
UpdateProjectRequest,
@@ -132,6 +133,7 @@ import {
AgentTemplateSchema,
AgentTemplateSummaryListSchema,
AttachmentResponseSchema,
CancelTaskResponseSchema,
ChildIssuesResponseSchema,
CommentsListSchema,
CommentTriggerPreviewSchema,
@@ -190,6 +192,7 @@ import {
EMPTY_CREATE_BILLING_CHECKOUT_SESSION_RESPONSE,
EMPTY_BILLING_CHECKOUT_SESSION_STATUS,
EMPTY_CREATE_BILLING_PORTAL_SESSION_RESPONSE,
EMPTY_CANCEL_TASK_RESPONSE,
} from "./schemas";
/** Identifies the calling client to the server.
@@ -1683,8 +1686,11 @@ export class ApiClient {
await this.fetch(`/api/chat/sessions/${sessionId}/read`, { method: "POST" });
}
async cancelTaskById(taskId: string): Promise<void> {
await this.fetch(`/api/tasks/${taskId}/cancel`, { method: "POST" });
async cancelTaskById(taskId: string): Promise<CancelTaskResponse> {
const raw = await this.fetch<unknown>(`/api/tasks/${taskId}/cancel`, { method: "POST" });
return parseWithFallback(raw, CancelTaskResponseSchema, EMPTY_CANCEL_TASK_RESPONSE, {
endpoint: "POST /api/tasks/{taskId}/cancel",
});
}
async listAttachments(issueId: string): Promise<Attachment[]> {

View File

@@ -10,6 +10,7 @@ import type {
BillingPriceTier,
BillingTopupsPage,
BillingTransactionsPage,
CancelTaskResponse,
CreateAgentFromTemplateResponse,
CreateBillingCheckoutSessionResponse,
CreateBillingPortalSessionResponse,
@@ -420,6 +421,67 @@ const RuntimeUsageByHourSchema = z.object({
export const RuntimeUsageByHourListSchema = z.array(RuntimeUsageByHourSchema);
// ---------------------------------------------------------------------------
// Task cancellation (`POST /api/tasks/:id/cancel`)
//
// This response is consumed directly by chat recovery. The embedded task
// object stays loose so daemon/runtime fields can drift, but the optional
// `cancelled_chat_message` payload must be well-formed before the UI deletes
// a message from cache or restores text into the input.
// ---------------------------------------------------------------------------
const AgentTaskResponseSchema = z.object({
id: z.string(),
agent_id: z.string().default(""),
runtime_id: z.string().default(""),
issue_id: z.string().default(""),
status: z.string().default("cancelled"),
priority: z.number().default(0),
dispatched_at: z.string().nullable().default(null),
started_at: z.string().nullable().default(null),
completed_at: z.string().nullable().default(null),
result: z.unknown().default(null),
error: z.string().nullable().default(null),
failure_reason: z.string().optional(),
created_at: z.string().default(""),
chat_session_id: z.string().optional(),
autopilot_run_id: z.string().optional(),
parent_task_id: z.string().optional(),
attempt: z.number().optional(),
trigger_comment_id: z.string().optional(),
trigger_summary: z.string().optional(),
kind: z.string().optional(),
work_dir: z.string().optional(),
relative_work_dir: z.string().optional(),
}).loose();
const CancelledChatMessageSchema = z.object({
chat_session_id: z.string(),
message_id: z.string(),
content: z.string(),
restore_to_input: z.boolean().default(false),
}).loose();
export const CancelTaskResponseSchema = AgentTaskResponseSchema.extend({
cancelled_chat_message: CancelledChatMessageSchema.nullish()
.transform((value) => value ?? undefined),
}).loose();
export const EMPTY_CANCEL_TASK_RESPONSE: CancelTaskResponse = {
id: "",
agent_id: "",
runtime_id: "",
issue_id: "",
status: "cancelled",
priority: 0,
dispatched_at: null,
started_at: null,
completed_at: null,
result: null,
error: null,
created_at: "",
};
// ---------------------------------------------------------------------------
// Agent template catalog — `/api/agent-templates*` and the
// create-from-template response. The desktop app's create-agent picker

View File

@@ -19,6 +19,7 @@ import {
applyChatDoneToCache,
applyWorkspaceUpdatedToCache,
handleInboxNew,
invalidateChatMessageQueries,
resolveInboxSourceSlug,
} from "./use-realtime-sync";
@@ -134,6 +135,18 @@ describe("applyChatDoneToCache", () => {
});
});
describe("invalidateChatMessageQueries", () => {
it("invalidates both legacy and paged chat message caches", () => {
const qc = createQueryClient();
const invalidate = vi.spyOn(qc, "invalidateQueries");
invalidateChatMessageQueries(qc, sessionId);
expect(invalidate).toHaveBeenCalledWith({ queryKey: chatKeys.messages(sessionId) });
expect(invalidate).toHaveBeenCalledWith({ queryKey: chatKeys.messagesPage(sessionId) });
});
});
describe("applyWorkspaceUpdatedToCache", () => {
const wsId = "ws-1";

View File

@@ -89,6 +89,14 @@ const chatWsLogger = createLogger("chat.ws");
const logger = createLogger("realtime-sync");
export function invalidateChatMessageQueries(
qc: QueryClient,
sessionId: string,
) {
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
}
export function applyChatDoneToCache(
qc: QueryClient,
payload: ChatDonePayload,
@@ -125,8 +133,7 @@ export function applyChatDoneToCache(
qc.setQueryData(chatKeys.pendingTask(sessionId), {});
// Authoritative refetch reconciles redaction / migrations / clients
// that took the fallback branch above.
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
invalidateChatMessageQueries(qc, sessionId);
qc.invalidateQueries({ queryKey: chatKeys.pendingTask(sessionId) });
}
@@ -835,7 +842,7 @@ export function useRealtimeSync(
const unsubChatMessage = ws.on("chat:message", (p) => {
const payload = p as { chat_session_id: string };
chatWsLogger.info("chat:message (global)", { chat_session_id: payload.chat_session_id });
qc.invalidateQueries({ queryKey: chatKeys.messages(payload.chat_session_id) });
invalidateChatMessageQueries(qc, payload.chat_session_id);
qc.invalidateQueries({ queryKey: chatKeys.pendingTask(payload.chat_session_id) });
invalidatePendingAggregate();
});
@@ -949,6 +956,9 @@ export function useRealtimeSync(
// 2. another tab / admin / system cancels — this is the only path that
// drops the pending pill in those cases. Without it the pill spins
// forever in the second-tab scenario.
// CancelTask also persists a best-effort assistant snapshot when the
// stopped chat task had already streamed transcript rows, so refresh the
// message page along with clearing pending.
const unsubTaskCancelled = ws.on("task:cancelled", (p) => {
const payload = p as TaskCancelledPayload;
if (!payload.chat_session_id) return;
@@ -957,6 +967,7 @@ export function useRealtimeSync(
chat_session_id: payload.chat_session_id,
});
qc.setQueryData(chatKeys.pendingTask(payload.chat_session_id), {});
invalidateChatMessageQueries(qc, payload.chat_session_id);
invalidatePendingAggregate();
});
@@ -990,7 +1001,7 @@ export function useRealtimeSync(
// this branch only flipped pending — the comment "No new message"
// was true then, but FailTask now persists a row.
qc.setQueryData(chatKeys.pendingTask(payload.chat_session_id), {});
qc.invalidateQueries({ queryKey: chatKeys.messages(payload.chat_session_id) });
invalidateChatMessageQueries(qc, payload.chat_session_id);
qc.invalidateQueries({ queryKey: chatKeys.pendingTask(payload.chat_session_id) });
invalidatePendingAggregate();
});

View File

@@ -1,3 +1,5 @@
import type { AgentTask } from "./agent";
export interface ChatSession {
id: string;
workspace_id: string;
@@ -79,6 +81,17 @@ export interface SendChatMessageResponse {
created_at: string;
}
export interface CancelledChatMessage {
chat_session_id: string;
message_id: string;
content: string;
restore_to_input: boolean;
}
export interface CancelTaskResponse extends AgentTask {
cancelled_chat_message?: CancelledChatMessage;
}
/**
* Response from GET /api/chat/sessions/{id}/pending-task.
* All fields are absent when the session has no in-flight task.

View File

@@ -68,7 +68,17 @@ export type * from "./events";
export type * from "./api";
export type { Attachment } from "./attachment";
export { attachmentDownloadPath, attachmentIdFromDownloadURL, contentReferencesAttachment } from "./attachment-url";
export type { ChatSession, ChatMessage, ChatMessagesPage, ChatPendingTask, PendingChatTaskItem, PendingChatTasksResponse, SendChatMessageResponse } from "./chat";
export type {
ChatSession,
ChatMessage,
ChatMessagesPage,
ChatPendingTask,
PendingChatTaskItem,
PendingChatTasksResponse,
SendChatMessageResponse,
CancelledChatMessage,
CancelTaskResponse,
} from "./chat";
export type { StorageAdapter } from "./storage";
export type {
Project,

View File

@@ -1,5 +1,5 @@
import { forwardRef, useRef, useImperativeHandle } from "react";
import { describe, it, expect, vi } from "vitest";
import { beforeEach, describe, it, expect, vi } from "vitest";
import { act, render, screen, fireEvent, waitFor } from "@testing-library/react";
import { I18nProvider } from "@multica/core/i18n/react";
import type { UploadResult } from "@multica/core/hooks/use-file-upload";
@@ -130,6 +130,24 @@ vi.mock("@multica/core/chat", () => {
});
import { ChatInput } from "./chat-input";
import { useChatStore } from "@multica/core/chat";
beforeEach(() => {
dropHandlers.onDrop = null;
editorProps.last = null;
const state = useChatStore.getState() as unknown as {
activeSessionId: string | null;
selectedAgentId: string;
inputDrafts: Record<string, string>;
setInputDraft: ReturnType<typeof vi.fn>;
clearInputDraft: ReturnType<typeof vi.fn>;
};
state.activeSessionId = null;
state.selectedAgentId = "agent-1";
state.inputDrafts = {};
state.setInputDraft.mockClear();
state.clearInputDraft.mockClear();
});
function renderInput(props: Partial<React.ComponentProps<typeof ChatInput>> = {}) {
const onSend = props.onSend ?? vi.fn();
@@ -313,3 +331,105 @@ describe("ChatInput attachment wiring", () => {
expect(buttons.length).toBe(1);
});
});
describe("ChatInput async send", () => {
it("restores a cancelled empty run draft into the editor", async () => {
const onRestoreDraftConsumed = vi.fn();
renderInput({
restoreDraftRequest: {
id: "msg-restored",
content: "bring this back",
},
onRestoreDraftConsumed,
});
await waitFor(() => {
expect(useChatStore.getState().setInputDraft).toHaveBeenCalledWith(
"__draft_new__:agent-1",
"bring this back",
);
expect(editorProps.last?.defaultValue).toBe("bring this back");
expect(onRestoreDraftConsumed).toHaveBeenCalledTimes(1);
});
});
it("consumes a restore request even when an existing draft blocks restore", async () => {
const state = useChatStore.getState() as unknown as {
inputDrafts: Record<string, string>;
setInputDraft: ReturnType<typeof vi.fn>;
};
state.inputDrafts["__draft_new__:agent-1"] = "already typing";
const onRestoreDraftConsumed = vi.fn();
renderInput({
restoreDraftRequest: {
id: "msg-restored",
content: "bring this back",
},
onRestoreDraftConsumed,
});
await waitFor(() => {
expect(onRestoreDraftConsumed).toHaveBeenCalledTimes(1);
});
expect(state.setInputDraft).not.toHaveBeenCalledWith(
"__draft_new__:agent-1",
"bring this back",
);
});
it("keeps the draft while send is pending and clears after acceptance", async () => {
let resolveSend: (accepted: boolean) => void;
const sendPromise = new Promise<boolean>((res) => {
resolveSend = res;
});
const onSend = vi.fn(() => sendPromise);
renderInput({ onSend });
fireEvent.change(screen.getByTestId("editor"), { target: { value: "slow network" } });
let sendButton: HTMLElement;
await waitFor(() => {
const buttons = screen.getAllByRole("button");
sendButton = buttons[buttons.length - 1]!;
expect(sendButton).not.toBeDisabled();
});
fireEvent.click(sendButton!);
expect(onSend).toHaveBeenCalledWith("slow network", undefined);
expect(useChatStore.getState().clearInputDraft).not.toHaveBeenCalled();
await waitFor(() => expect(sendButton!).toBeDisabled());
await act(async () => {
resolveSend!(true);
await sendPromise;
});
await waitFor(() => {
expect(useChatStore.getState().clearInputDraft).toHaveBeenCalledWith("__draft_new__:agent-1");
});
});
it("keeps the draft when send is rejected by the owner", async () => {
const onSend = vi.fn(async () => false);
renderInput({ onSend });
fireEvent.change(screen.getByTestId("editor"), { target: { value: "retry me" } });
let sendButton: HTMLElement;
await waitFor(() => {
const buttons = screen.getAllByRole("button");
sendButton = buttons[buttons.length - 1]!;
expect(sendButton).not.toBeDisabled();
});
await act(async () => {
fireEvent.click(sendButton!);
await Promise.resolve();
});
expect(onSend).toHaveBeenCalledWith("retry me", undefined);
expect(useChatStore.getState().clearInputDraft).not.toHaveBeenCalled();
});
});

View File

@@ -1,7 +1,7 @@
"use client";
import type { ReactNode } from "react";
import { useCallback, useRef, useState } from "react";
import { useCallback, useEffect, useRef, useState } from "react";
import { cn } from "@multica/ui/lib/utils";
import {
ContentEditor,
@@ -21,7 +21,9 @@ import { useT } from "../../i18n";
const logger = createLogger("chat.ui");
interface ChatInputProps {
onSend: (content: string, attachmentIds?: string[]) => void;
onSend: (content: string, attachmentIds?: string[]) => void | boolean | Promise<void | boolean>;
restoreDraftRequest?: { id: string; content: string } | null;
onRestoreDraftConsumed?: () => void;
/** Receives a File and returns the attachment row (with id + CDN link).
* The wrapper owner (ChatWindow) lazy-creates a chat_session if needed
* and forwards `chatSessionId` to the upload — chat-input only cares
@@ -46,6 +48,8 @@ interface ChatInputProps {
export function ChatInput({
onSend,
restoreDraftRequest,
onRestoreDraftConsumed,
onUploadFile,
onStop,
isRunning,
@@ -67,9 +71,10 @@ export function ChatInput({
// mid-compose gives each agent its own draft. This is a STORAGE key, not
// a React identity.
//
// `editorKey` — React `key` on the ContentEditor. Used ONLY to force a
// `editorKey` — React `key` on the ContentEditor. Used to force a
// remount when the user explicitly switches agent (so Tiptap's
// Placeholder, which only reads on mount, refreshes to "Tell {agent}…").
// Placeholder, which only reads on mount, refreshes to "Tell {agent}…")
// or when a cancelled empty run restores a draft from the server.
// Crucially this does NOT include `activeSessionId`: when the user
// uploads a file in a brand-new chat, `handleUploadFile` first awaits
// `ensureSession` which lazily creates the session and flips
@@ -82,12 +87,19 @@ export function ChatInput({
// first-upload-creates-session work the same as second-upload.
const draftKey =
activeSessionId ?? `${DRAFT_NEW_SESSION}:${selectedAgentId ?? ""}`;
const editorKey = selectedAgentId ?? "no-agent";
// Select a primitive — empty-string fallback keeps referential stability.
const inputDraft = useChatStore((s) => s.inputDrafts[draftKey] ?? "");
const setInputDraft = useChatStore((s) => s.setInputDraft);
const clearInputDraft = useChatStore((s) => s.clearInputDraft);
const [isEmpty, setIsEmpty] = useState(!inputDraft.trim());
const [isSubmitting, setIsSubmitting] = useState(false);
const [editorRestore, setEditorRestore] = useState<{
id: string;
content: string;
draftKey: string;
} | null>(null);
const activeRestore = editorRestore?.draftKey === draftKey ? editorRestore : null;
const editorKey = `${selectedAgentId ?? "no-agent"}:${activeRestore?.id ?? "base"}`;
// Number of in-flight uploads. We track this explicitly (rather than
// peeking at the editor on every render) so the SubmitButton visibly
// disables the instant an upload starts and re-enables the instant it
@@ -110,6 +122,26 @@ export function ChatInput({
// attachment never binds to the chat message.
const uploadMapRef = useRef<Map<string, string>>(new Map());
useEffect(() => {
if (!restoreDraftRequest) return;
if (inputDraft.trim()) {
logger.info("input.restore skipped: draft already has content", {
draftKey,
restoreId: restoreDraftRequest.id,
});
onRestoreDraftConsumed?.();
return;
}
setInputDraft(draftKey, restoreDraftRequest.content);
setIsEmpty(!restoreDraftRequest.content.trim());
setEditorRestore({
id: restoreDraftRequest.id,
content: restoreDraftRequest.content,
draftKey,
});
onRestoreDraftConsumed?.();
}, [draftKey, inputDraft, onRestoreDraftConsumed, restoreDraftRequest, setInputDraft]);
const handleUpload = useCallback(
async (file: File): Promise<UploadResult | null> => {
if (!onUploadFile) return null;
@@ -135,12 +167,13 @@ export function ChatInput({
onDrop: (files) => files.forEach((f) => editorRef.current?.uploadFile(f)),
});
const handleSend = () => {
const handleSend = async () => {
const content = editorRef.current?.getMarkdown()?.replace(/(\n\s*)+$/, "").trim();
if (!content || isRunning || disabled || noAgent) {
if (!content || isRunning || isSubmitting || disabled || noAgent) {
logger.debug("input.send skipped", {
emptyContent: !content,
isRunning,
isSubmitting,
disabled,
noAgent,
});
@@ -172,7 +205,17 @@ export function ChatInput({
draftKey: keyAtSend,
attachmentCount: activeIds.length,
});
onSend(content, activeIds.length > 0 ? activeIds : undefined);
setIsSubmitting(true);
let accepted: void | boolean;
try {
accepted = await onSend(content, activeIds.length > 0 ? activeIds : undefined);
} catch (err) {
logger.warn("input.send failed", err);
setIsSubmitting(false);
return;
}
setIsSubmitting(false);
if (accepted === false) return;
editorRef.current?.clearContent();
// Drop focus so the caret doesn't keep blinking under the StatusPill /
// streaming reply that's about to take over the user's attention. The
@@ -228,7 +271,7 @@ export function ChatInput({
// intentionally does not depend on activeSessionId.
key={editorKey}
ref={editorRef}
defaultValue={inputDraft}
defaultValue={activeRestore?.content ?? inputDraft}
placeholder={placeholder}
onUpdate={(md) => {
setIsEmpty(!md.trim());
@@ -264,7 +307,7 @@ export function ChatInput({
)}
<SubmitButton
onClick={handleSend}
disabled={isEmpty || !!disabled || !!noAgent || pendingUploads > 0}
disabled={isEmpty || isSubmitting || !!disabled || !!noAgent || pendingUploads > 0}
running={isRunning}
onStop={onStop}
tooltip={`${t(($) => $.input.send_tooltip)} · ${formatShortcut(modKey, enterKey)}`}

View File

@@ -12,6 +12,7 @@ import {
PopoverContent,
PopoverTrigger,
} from "@multica/ui/components/ui/popover";
import { toast } from "sonner";
import { useWorkspaceId } from "@multica/core/hooks";
import { useAuthStore } from "@multica/core/auth";
import { agentListOptions, memberListOptions } from "@multica/core/workspace/queries";
@@ -35,6 +36,7 @@ import {
pendingChatTaskOptions,
pendingChatTasksOptions,
chatKeys,
isTaskMessageTaskId,
} from "@multica/core/chat/queries";
import {
useCreateChatSession,
@@ -75,6 +77,106 @@ function seedChatMessagesPageCache(
);
}
function appendChatMessageToLatestPageCache(
qc: ReturnType<typeof useQueryClient>,
sessionId: string,
message: ChatMessage,
) {
qc.setQueryData<InfiniteData<ChatMessagesPage>>(
chatKeys.messagesPage(sessionId),
(old) => {
if (!old) {
return {
pages: [{
messages: [message],
limit: 50,
has_more: false,
next_cursor: null,
}],
pageParams: [null],
};
}
if (old.pages.some((page) => page.messages.some((m) => m.id === message.id))) {
return old;
}
return {
...old,
pages: old.pages.map((page, index) =>
index === 0 ? { ...page, messages: [...page.messages, message] } : page,
),
};
},
);
}
function removeChatMessageFromPageCache(
qc: ReturnType<typeof useQueryClient>,
sessionId: string,
messageId: string,
) {
qc.setQueryData<InfiniteData<ChatMessagesPage> | undefined>(
chatKeys.messagesPage(sessionId),
(old) => {
if (!old) return old;
return {
...old,
pages: old.pages.map((page) => ({
...page,
messages: page.messages.filter((m) => m.id !== messageId),
})),
};
},
);
}
function removeChatMessageFromCaches(
qc: ReturnType<typeof useQueryClient>,
sessionId: string,
messageId: string,
) {
qc.setQueryData<ChatMessage[]>(
chatKeys.messages(sessionId),
(old) => old?.filter((m) => m.id !== messageId) ?? old,
);
removeChatMessageFromPageCache(qc, sessionId, messageId);
}
function replaceOptimisticChatMessageId(
qc: ReturnType<typeof useQueryClient>,
sessionId: string,
optimisticId: string,
messageId: string,
taskId: string,
) {
const replace = (messages: ChatMessage[] | undefined) => {
if (!messages) return messages;
if (messages.some((m) => m.id === messageId)) {
return messages.filter((m) => m.id !== optimisticId);
}
return messages.map((m) =>
m.id === optimisticId ? { ...m, id: messageId, task_id: taskId } : m,
);
};
qc.setQueryData<ChatMessage[]>(
chatKeys.messages(sessionId),
replace,
);
qc.setQueryData<InfiniteData<ChatMessagesPage> | undefined>(
chatKeys.messagesPage(sessionId),
(old) => {
if (!old) return old;
return {
...old,
pages: old.pages.map((page) => ({
...page,
messages: replace(page.messages) ?? page.messages,
})),
};
},
);
}
export function ChatWindow() {
const { t } = useT("chat");
const wsId = useWorkspaceId();
@@ -123,6 +225,14 @@ export function ChatWindow() {
pendingChatTaskOptions(activeSessionId ?? ""),
);
const pendingTaskId = pendingTask?.task_id ?? null;
const stopRequestedBeforeTaskRef = useRef(false);
const [restoreDraftRequest, setRestoreDraftRequest] = useState<{
id: string;
content: string;
} | null>(null);
const handleRestoreDraftConsumed = useCallback(() => {
setRestoreDraftRequest(null);
}, []);
// Legacy archived sessions (the old soft-archive feature was removed but
// pre-existing rows with status='archived' may still exist) are excluded
@@ -277,11 +387,58 @@ export function ChatWindow() {
[ensureSession, uploadWithToast, qc, setActiveSession],
);
const cancelChatTask = useCallback(
async (
taskId: string,
sessionId: string,
options: { restoreDraftToInput: boolean; source: string },
) => {
apiLogger.info("cancelTask.start", {
taskId,
sessionId,
source: options.source,
});
qc.setQueryData(chatKeys.pendingTask(sessionId), {});
try {
const result = await api.cancelTaskById(taskId);
const restored = result.cancelled_chat_message;
if (restored?.restore_to_input) {
removeChatMessageFromCaches(qc, restored.chat_session_id, restored.message_id);
if (options.restoreDraftToInput && restored.chat_session_id === sessionId) {
setRestoreDraftRequest({
id: restored.message_id,
content: restored.content,
});
}
}
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
apiLogger.info("cancelTask.success", {
taskId,
sessionId,
restoredToInput: !!restored?.restore_to_input && options.restoreDraftToInput,
});
return result;
} catch (err) {
apiLogger.warn("cancelTask.error (task may have already finished)", {
taskId,
sessionId,
err,
});
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
return null;
}
},
[qc],
);
const handleSend = useCallback(
async (content: string, attachmentIds?: string[]) => {
async (content: string, attachmentIds?: string[]): Promise<boolean> => {
if (!activeAgent) {
apiLogger.warn("sendChatMessage skipped: no active agent");
return;
return false;
}
const finalContent = content;
@@ -296,10 +453,17 @@ export function ChatWindow() {
attachmentCount: attachmentIds?.length ?? 0,
});
const sessionId = await ensureSession(finalContent);
let sessionId: string | null = null;
try {
sessionId = await ensureSession(finalContent);
} catch (err) {
apiLogger.error("sendChatMessage.ensureSession.error", err);
toast.error(t(($) => $.input.send_failed_toast));
return false;
}
if (!sessionId) {
apiLogger.warn("sendChatMessage aborted: ensureSession returned null");
return;
return false;
}
// Optimistic burst — everything that gives the user "I sent a message
@@ -322,7 +486,7 @@ export function ChatWindow() {
// "new-chat first-message" white flash. Priming the cache first means
// the very first read after activeSessionId flips hits data
// synchronously and ChatMessageList mounts directly.
seedChatMessagesPageCache(qc, sessionId, [optimistic]);
appendChatMessageToLatestPageCache(qc, sessionId, optimistic);
qc.setQueryData<ChatMessage[]>(
chatKeys.messages(sessionId),
(old) => (old ? [...old, optimistic] : [optimistic]),
@@ -342,12 +506,23 @@ export function ChatWindow() {
setActiveSession(sessionId);
apiLogger.debug("sendChatMessage.optimistic", { sessionId, optimisticId: optimistic.id });
const result = await api.sendChatMessage(sessionId, finalContent, attachmentIds);
let result;
try {
result = await api.sendChatMessage(sessionId, finalContent, attachmentIds);
} catch (err) {
apiLogger.error("sendChatMessage.error.rollback", { sessionId, optimisticId: optimistic.id, err });
stopRequestedBeforeTaskRef.current = false;
removeChatMessageFromCaches(qc, sessionId, optimistic.id);
qc.setQueryData(chatKeys.pendingTask(sessionId), {});
toast.error(t(($) => $.input.send_failed_toast));
return false;
}
apiLogger.info("sendChatMessage.success", {
sessionId,
messageId: result.message_id,
taskId: result.task_id,
});
replaceOptimisticChatMessageId(qc, sessionId, optimistic.id, result.message_id, result.task_id);
// Replace the temporary task_id with the server's real one (so the WS
// task: handlers can match against it) and snap the anchor to the
// server's created_at — keeping the elapsed-seconds reading stable.
@@ -356,15 +531,26 @@ export function ChatWindow() {
status: "queued",
created_at: result.created_at,
});
if (stopRequestedBeforeTaskRef.current) {
stopRequestedBeforeTaskRef.current = false;
await cancelChatTask(result.task_id, sessionId, {
restoreDraftToInput: true,
source: "deferred-send",
});
return false;
}
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
return true;
},
[
activeSessionId,
activeAgent,
ensureSession,
cancelChatTask,
qc,
setActiveSession,
t,
],
);
@@ -373,27 +559,19 @@ export function ChatWindow() {
apiLogger.debug("cancelTask skipped: no pending task");
return;
}
// Optimistic clear — pill disappears + input unlocks the moment the
// user clicks Stop, instead of after the HTTP roundtrip. WS
// task:cancelled will confirm later (no-op if cache is already empty);
// if the cancel POST fails because the task already finished, the
// assistant message arrives via task:completed → chat:done and renders
// normally. Either way the UI is in sync with reality without latency.
apiLogger.info("cancelTask.start", { taskId: pendingTaskId, sessionId: activeSessionId });
qc.setQueryData(chatKeys.pendingTask(activeSessionId), {});
qc.invalidateQueries({ queryKey: chatKeys.messages(activeSessionId) });
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(activeSessionId) });
// Fire-and-forget — UI is already in its post-cancel state. We log the
// outcome but never block on it.
api.cancelTaskById(pendingTaskId).then(
() => apiLogger.info("cancelTask.success", { taskId: pendingTaskId }),
(err) =>
apiLogger.warn("cancelTask.error (task may have already finished)", {
taskId: pendingTaskId,
err,
}),
);
}, [pendingTaskId, activeSessionId, qc]);
if (!isTaskMessageTaskId(pendingTaskId)) {
stopRequestedBeforeTaskRef.current = true;
apiLogger.info("cancelTask.deferred until server task id", {
taskId: pendingTaskId,
sessionId: activeSessionId,
});
return;
}
void cancelChatTask(pendingTaskId, activeSessionId, {
restoreDraftToInput: true,
source: "active-input",
});
}, [pendingTaskId, activeSessionId, cancelChatTask]);
const handleSelectAgent = useCallback(
(agent: Agent) => {
@@ -590,6 +768,8 @@ export function ChatWindow() {
* when there's no agent (the EmptyState above carries the CTA). */}
<ChatInput
onSend={handleSend}
restoreDraftRequest={restoreDraftRequest}
onRestoreDraftConsumed={handleRestoreDraftConsumed}
onUploadFile={handleUploadFile}
onStop={handleStop}
isRunning={!!pendingTaskId}
@@ -914,7 +1094,13 @@ function SessionDropdown({
queryClient.invalidateQueries({ queryKey: chatKeys.messagesPage(session.id) });
api.cancelTaskById(task.task_id).then(
() => apiLogger.info("cancelTask.success (history row)", { taskId: task.task_id, sessionId: session.id }),
(result) => {
const restored = result.cancelled_chat_message;
if (restored?.restore_to_input) {
removeChatMessageFromCaches(queryClient, restored.chat_session_id, restored.message_id);
}
apiLogger.info("cancelTask.success (history row)", { taskId: task.task_id, sessionId: session.id });
},
(err) =>
apiLogger.warn("cancelTask.error (history row; task may have already finished)", {
taskId: task.task_id,

View File

@@ -11,7 +11,8 @@
"placeholder_named": "Message {{name}}…",
"placeholder_default": "Start a message…",
"send_tooltip": "Send",
"stop_tooltip": "Stop"
"stop_tooltip": "Stop",
"send_failed_toast": "Failed to send message"
},
"message_list": {
"show_details": "Show details",

View File

@@ -10,7 +10,8 @@
"placeholder_named": "{{name}} にメッセージ…",
"placeholder_default": "メッセージを入力…",
"send_tooltip": "送信",
"stop_tooltip": "停止"
"stop_tooltip": "停止",
"send_failed_toast": "メッセージを送信できませんでした"
},
"message_list": {
"show_details": "詳細を表示",

View File

@@ -11,7 +11,8 @@
"placeholder_named": "{{name}}에게 메시지 보내기…",
"placeholder_default": "메시지 입력…",
"send_tooltip": "보내기",
"stop_tooltip": "중지"
"stop_tooltip": "중지",
"send_failed_toast": "메시지를 보내지 못했습니다"
},
"message_list": {
"show_details": "세부 정보 보기",

View File

@@ -10,7 +10,8 @@
"placeholder_named": "给 {{name}} 发消息…",
"placeholder_default": "输入消息…",
"send_tooltip": "发送",
"stop_tooltip": "停止"
"stop_tooltip": "停止",
"send_failed_toast": "发送消息失败"
},
"message_list": {
"show_details": "查看详情",

View File

@@ -2,6 +2,7 @@ package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
@@ -313,6 +314,133 @@ func TestCancelTaskByUser_ChatTask_NonCreator_Returns403(t *testing.T) {
}
}
func TestCancelTaskByUser_ChatTaskWithTranscript_PersistsAssistantSnapshot(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
agentID := createHandlerTestAgent(t, "CancelChatTranscriptAgent", []byte("[]"))
sessionID := createHandlerTestChatSession(t, agentID)
var taskID string
if err := testPool.QueryRow(context.Background(), `
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, issue_id, chat_session_id, created_at)
VALUES ($1, (SELECT runtime_id FROM agent WHERE id = $1), 'running', 0, NULL, $2, now() - interval '5 seconds')
RETURNING id
`, agentID, sessionID).Scan(&taskID); err != nil {
t.Fatalf("create chat task: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
if _, err := testPool.Exec(context.Background(), `
INSERT INTO chat_message (chat_session_id, role, content, task_id)
VALUES ($1, 'user', 'please answer', $2)
`, sessionID, taskID); err != nil {
t.Fatalf("create linked user chat message: %v", err)
}
if _, err := testPool.Exec(context.Background(), `
INSERT INTO task_message (task_id, seq, type, content)
VALUES ($1, 1, 'text', 'partial answer')
`, taskID); err != nil {
t.Fatalf("create task message: %v", err)
}
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, taskID))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp CancelTaskByUserResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode cancel response: %v", err)
}
if resp.CancelledChatMessage != nil {
t.Fatalf("expected no restore payload when transcript exists, got %#v", resp.CancelledChatMessage)
}
if got := taskStatus(t, taskID); got != "cancelled" {
t.Fatalf("task not cancelled: status = %q", got)
}
var role, content, messageTaskID string
if err := testPool.QueryRow(context.Background(), `
SELECT role, content, COALESCE(task_id::text, '')
FROM chat_message
WHERE chat_session_id = $1 AND role = 'assistant'
`, sessionID).Scan(&role, &content, &messageTaskID); err != nil {
t.Fatalf("read cancelled assistant chat message: %v", err)
}
if role != "assistant" || content != "Stopped." || messageTaskID != taskID {
t.Fatalf("assistant snapshot mismatch: role=%q content=%q task_id=%q", role, content, messageTaskID)
}
}
func TestCancelTaskByUser_ChatTaskWithoutTranscript_RestoresUserDraft(t *testing.T) {
if testHandler == nil {
t.Skip("database not available")
}
agentID := createHandlerTestAgent(t, "CancelChatNoTranscriptAgent", []byte("[]"))
sessionID := createHandlerTestChatSession(t, agentID)
var taskID string
if err := testPool.QueryRow(context.Background(), `
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, issue_id, chat_session_id)
VALUES ($1, (SELECT runtime_id FROM agent WHERE id = $1), 'running', 0, NULL, $2)
RETURNING id
`, agentID, sessionID).Scan(&taskID); err != nil {
t.Fatalf("create chat task: %v", err)
}
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
var userMessageID string
const userContent = "keep this prompt"
if err := testPool.QueryRow(context.Background(), `
INSERT INTO chat_message (chat_session_id, role, content, task_id)
VALUES ($1, 'user', $2, $3)
RETURNING id
`, sessionID, userContent, taskID).Scan(&userMessageID); err != nil {
t.Fatalf("create linked user chat message: %v", err)
}
w := httptest.NewRecorder()
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, taskID))
if w.Code != http.StatusOK {
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
}
var resp CancelTaskByUserResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode cancel response: %v", err)
}
if resp.CancelledChatMessage == nil {
t.Fatal("expected restore payload for empty transcript cancel")
}
if resp.CancelledChatMessage.MessageID != userMessageID ||
resp.CancelledChatMessage.Content != userContent ||
!resp.CancelledChatMessage.RestoreToInput {
t.Fatalf("restore payload mismatch: %#v", resp.CancelledChatMessage)
}
var count int
if err := testPool.QueryRow(context.Background(), `
SELECT count(*) FROM chat_message
WHERE chat_session_id = $1 AND role = 'assistant'
`, sessionID).Scan(&count); err != nil {
t.Fatalf("count assistant chat messages: %v", err)
}
if count != 0 {
t.Fatalf("expected no assistant snapshot for empty transcript, got %d", count)
}
if err := testPool.QueryRow(context.Background(), `
SELECT count(*) FROM chat_message
WHERE id = $1
`, userMessageID).Scan(&count); err != nil {
t.Fatalf("count deleted user chat message: %v", err)
}
if count != 0 {
t.Fatalf("expected linked user message to be deleted, got %d", count)
}
}
// TestCancelTaskByUser_PrivateAgent_PlainMember_Returns403 verifies the cancel
// endpoint mirrors the agent Activity / snapshot visibility gate: a plain
// member who cannot see a private agent's tasks cannot cancel them either.

View File

@@ -473,6 +473,19 @@ func (h *Handler) SendChatMessage(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusInternalServerError, "failed to enqueue chat task: "+err.Error())
return
}
if err := h.Queries.LinkChatMessageToTask(r.Context(), db.LinkChatMessageToTaskParams{
ID: msg.ID,
TaskID: task.ID,
}); err != nil {
// Don't fail the send: the task already exists and the user message
// is persisted. The link is only needed for precise empty-cancel
// cleanup; older/unlinked rows simply keep the historical behavior.
slog.Warn("link user chat message to task failed",
"message_id", uuidToString(msg.ID),
"task_id", uuidToString(task.ID),
"error", err,
)
}
// Touch session updated_at.
if err := h.Queries.TouchChatSession(r.Context(), session.ID); err != nil {
@@ -700,6 +713,18 @@ type PendingChatTaskItem struct {
ChatSessionID string `json:"chat_session_id"`
}
type CancelledChatMessageResponse struct {
ChatSessionID string `json:"chat_session_id"`
MessageID string `json:"message_id"`
Content string `json:"content"`
RestoreToInput bool `json:"restore_to_input"`
}
type CancelTaskByUserResponse struct {
AgentTaskResponse
CancelledChatMessage *CancelledChatMessageResponse `json:"cancelled_chat_message,omitempty"`
}
// ListPendingChatTasks returns every in-flight chat task owned by the current
// user in this workspace. Drives the FAB's "running" indicator when the chat
// window is closed (no per-session query is subscribed). Tasks belonging to
@@ -879,13 +904,25 @@ func (h *Handler) CancelTaskByUser(w http.ResponseWriter, r *http.Request) {
}
}
cancelled, err := h.TaskService.CancelTask(r.Context(), taskUUID)
cancelled, err := h.TaskService.CancelTaskWithResult(r.Context(), taskUUID)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
writeJSON(w, http.StatusOK, taskToResponse(*cancelled, workspaceID))
resp := CancelTaskByUserResponse{
AgentTaskResponse: taskToResponse(cancelled.Task, workspaceID),
}
if cancelled.CancelledChatMessage != nil {
resp.CancelledChatMessage = &CancelledChatMessageResponse{
ChatSessionID: cancelled.CancelledChatMessage.ChatSessionID,
MessageID: cancelled.CancelledChatMessage.MessageID,
Content: cancelled.CancelledChatMessage.Content,
RestoreToInput: cancelled.CancelledChatMessage.RestoreToInput,
}
}
writeJSON(w, http.StatusOK, resp)
}
// ---------------------------------------------------------------------------

View File

@@ -90,6 +90,21 @@ func TestSendChatMessage_LinksAttachments(t *testing.T) {
if sendResp.MessageID == "" {
t.Fatal("expected non-empty message_id in send response")
}
if sendResp.TaskID == "" {
t.Fatal("expected non-empty task_id in send response")
}
var messageTaskID string
if err := testPool.QueryRow(
context.Background(),
`SELECT COALESCE(task_id::text, '') FROM chat_message WHERE id = $1`,
sendResp.MessageID,
).Scan(&messageTaskID); err != nil {
t.Fatalf("query chat message task id: %v", err)
}
if messageTaskID != sendResp.TaskID {
t.Fatalf("chat message task_id mismatch: want %s, got %s", sendResp.TaskID, messageTaskID)
}
// 3. Verify the attachment row now points at the new message.
var dbMessageID *string

View File

@@ -812,16 +812,38 @@ func (s *TaskService) CaptureCancelledTasks(ctx context.Context, cancelled []db.
}
}
type CancelledChatMessageResult struct {
ChatSessionID string
MessageID string
Content string
RestoreToInput bool
}
type CancelTaskResult struct {
Task db.AgentTaskQueue
CancelledChatMessage *CancelledChatMessageResult
}
// CancelTask cancels a single task by ID. It broadcasts a task:cancelled event
// so frontends can update immediately.
func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
result, err := s.CancelTaskWithResult(ctx, taskID)
if err != nil {
return nil, err
}
return &result.Task, nil
}
// CancelTaskWithResult cancels a single task and returns any chat-specific
// cleanup result needed by user-facing callers.
func (s *TaskService) CancelTaskWithResult(ctx context.Context, taskID pgtype.UUID) (*CancelTaskResult, error) {
task, err := s.Queries.CancelAgentTask(ctx, taskID)
if errors.Is(err, pgx.ErrNoRows) {
existing, err := s.Queries.GetAgentTask(ctx, taskID)
if err != nil {
return nil, fmt.Errorf("cancel task: %w", err)
}
return &existing, nil
return &CancelTaskResult{Task: existing}, nil
}
if err != nil {
return nil, fmt.Errorf("cancel task: %w", err)
@@ -829,6 +851,7 @@ func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.A
slog.Info("task cancelled", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
s.captureTaskCancelled(ctx, task)
cancelledChatMessage := s.finalizeCancelledChatMessage(ctx, task)
// Reconcile agent status
s.ReconcileAgentStatus(ctx, task.AgentID)
@@ -836,7 +859,57 @@ func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.A
// Broadcast cancellation as a task:failed event so frontends clear the live card
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, task)
return &task, nil
return &CancelTaskResult{
Task: task,
CancelledChatMessage: cancelledChatMessage,
}, nil
}
func (s *TaskService) finalizeCancelledChatMessage(ctx context.Context, task db.AgentTaskQueue) *CancelledChatMessageResult {
if !task.ChatSessionID.Valid {
return nil
}
var cancelled *CancelledChatMessageResult
if err := s.runInTx(ctx, func(qtx *db.Queries) error {
messages, err := qtx.ListTaskMessages(ctx, task.ID)
if err != nil {
return fmt.Errorf("list cancelled chat task messages: %w", err)
}
if len(messages) == 0 {
deleted, err := qtx.DeleteUserChatMessageByTask(ctx, task.ID)
if errors.Is(err, pgx.ErrNoRows) {
return nil
}
if err != nil {
return fmt.Errorf("delete empty cancelled chat user message: %w", err)
}
cancelled = &CancelledChatMessageResult{
ChatSessionID: util.UUIDToString(deleted.ChatSessionID),
MessageID: util.UUIDToString(deleted.ID),
Content: deleted.Content,
RestoreToInput: true,
}
return nil
}
if _, err := qtx.CreateChatMessage(ctx, db.CreateChatMessageParams{
ChatSessionID: task.ChatSessionID,
Role: "assistant",
Content: "Stopped.",
TaskID: task.ID,
ElapsedMs: computeChatElapsedMs(task),
}); err != nil {
return fmt.Errorf("create cancelled chat message: %w", err)
}
return nil
}); err != nil {
slog.Error("failed to finalize cancelled chat message",
"task_id", util.UUIDToString(task.ID),
"chat_session_id", util.UUIDToString(task.ChatSessionID),
"error", err,
)
return nil
}
return cancelled
}
// ClaimTask atomically claims the next queued task for an agent,

View File

@@ -164,6 +164,28 @@ func (q *Queries) DeleteChatSession(ctx context.Context, arg DeleteChatSessionPa
return err
}
const deleteUserChatMessageByTask = `-- name: DeleteUserChatMessageByTask :one
DELETE FROM chat_message
WHERE task_id = $1 AND role = 'user'
RETURNING id, chat_session_id, role, content, task_id, created_at, failure_reason, elapsed_ms
`
func (q *Queries) DeleteUserChatMessageByTask(ctx context.Context, taskID pgtype.UUID) (ChatMessage, error) {
row := q.db.QueryRow(ctx, deleteUserChatMessageByTask, taskID)
var i ChatMessage
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.Role,
&i.Content,
&i.TaskID,
&i.CreatedAt,
&i.FailureReason,
&i.ElapsedMs,
)
return i, err
}
const getChatMessage = `-- name: GetChatMessage :one
SELECT id, chat_session_id, role, content, task_id, created_at, failure_reason, elapsed_ms FROM chat_message
WHERE id = $1
@@ -329,6 +351,22 @@ func (q *Queries) GetPendingChatTask(ctx context.Context, chatSessionID pgtype.U
return i, err
}
const linkChatMessageToTask = `-- name: LinkChatMessageToTask :exec
UPDATE chat_message
SET task_id = $2
WHERE id = $1 AND role = 'user'
`
type LinkChatMessageToTaskParams struct {
ID pgtype.UUID `json:"id"`
TaskID pgtype.UUID `json:"task_id"`
}
func (q *Queries) LinkChatMessageToTask(ctx context.Context, arg LinkChatMessageToTaskParams) error {
_, err := q.db.Exec(ctx, linkChatMessageToTask, arg.ID, arg.TaskID)
return err
}
const listAllChatSessionsByCreator = `-- name: ListAllChatSessionsByCreator :many
SELECT cs.id, cs.workspace_id, cs.agent_id, cs.creator_id, cs.title, cs.session_id, cs.work_dir, cs.status, cs.created_at, cs.updated_at, cs.unread_since, cs.runtime_id,
(cs.unread_since IS NOT NULL)::bool AS has_unread

View File

@@ -78,6 +78,16 @@ INSERT INTO chat_message (chat_session_id, role, content, task_id, failure_reaso
VALUES ($1, $2, $3, sqlc.narg(task_id), sqlc.narg(failure_reason), sqlc.narg(elapsed_ms))
RETURNING *;
-- name: LinkChatMessageToTask :exec
UPDATE chat_message
SET task_id = $2
WHERE id = $1 AND role = 'user';
-- name: DeleteUserChatMessageByTask :one
DELETE FROM chat_message
WHERE task_id = $1 AND role = 'user'
RETURNING *;
-- name: ListChatMessages :many
SELECT * FROM chat_message
WHERE chat_session_id = $1