mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-30 10:59:31 +02:00
Compare commits
3 Commits
agent/lamb
...
agent/walt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
13fa01bb0a | ||
|
|
f660d28b4c | ||
|
|
a08c4283c3 |
@@ -487,6 +487,109 @@ describe("ApiClient", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("cancelTaskById response parsing", () => {
|
||||
const taskResponse = {
|
||||
id: "task-1",
|
||||
agent_id: "agent-1",
|
||||
runtime_id: "runtime-1",
|
||||
issue_id: "",
|
||||
status: "cancelled",
|
||||
priority: 0,
|
||||
dispatched_at: null,
|
||||
started_at: null,
|
||||
completed_at: "2026-06-12T06:40:00Z",
|
||||
result: null,
|
||||
error: null,
|
||||
created_at: "2026-06-12T06:39:00Z",
|
||||
};
|
||||
|
||||
it("parses the cancelled chat message payload", async () => {
|
||||
const fetchMock = vi.fn().mockResolvedValue(
|
||||
new Response(JSON.stringify({
|
||||
...taskResponse,
|
||||
cancelled_chat_message: {
|
||||
chat_session_id: "session-1",
|
||||
message_id: "message-1",
|
||||
content: "restore me",
|
||||
restore_to_input: true,
|
||||
},
|
||||
}), {
|
||||
status: 200,
|
||||
headers: { "Content-Type": "application/json" },
|
||||
}),
|
||||
);
|
||||
vi.stubGlobal("fetch", fetchMock);
|
||||
|
||||
const client = new ApiClient("https://api.example.test");
|
||||
const result = await client.cancelTaskById("task-1");
|
||||
|
||||
expect(fetchMock.mock.calls[0]).toMatchObject([
|
||||
"https://api.example.test/api/tasks/task-1/cancel",
|
||||
{ method: "POST" },
|
||||
]);
|
||||
expect(result.cancelled_chat_message).toEqual({
|
||||
chat_session_id: "session-1",
|
||||
message_id: "message-1",
|
||||
content: "restore me",
|
||||
restore_to_input: true,
|
||||
});
|
||||
});
|
||||
|
||||
it("treats a null cancelled chat message as absent", async () => {
|
||||
vi.stubGlobal(
|
||||
"fetch",
|
||||
vi.fn().mockResolvedValue(
|
||||
new Response(JSON.stringify({
|
||||
...taskResponse,
|
||||
cancelled_chat_message: null,
|
||||
}), {
|
||||
status: 200,
|
||||
headers: { "Content-Type": "application/json" },
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
const client = new ApiClient("https://api.example.test");
|
||||
const result = await client.cancelTaskById("task-1");
|
||||
|
||||
expect(result.id).toBe("task-1");
|
||||
expect(result.cancelled_chat_message).toBeUndefined();
|
||||
});
|
||||
|
||||
it.each([
|
||||
["a missing task id", { ...taskResponse, id: undefined }],
|
||||
[
|
||||
"a malformed cancelled chat message",
|
||||
{
|
||||
...taskResponse,
|
||||
cancelled_chat_message: {
|
||||
chat_session_id: "session-1",
|
||||
message_id: "message-1",
|
||||
content: "restore me",
|
||||
restore_to_input: "true",
|
||||
},
|
||||
},
|
||||
],
|
||||
["a null body", null],
|
||||
])("falls back for %s", async (_label, body) => {
|
||||
vi.stubGlobal(
|
||||
"fetch",
|
||||
vi.fn().mockResolvedValue(
|
||||
new Response(JSON.stringify(body), {
|
||||
status: 200,
|
||||
headers: { "Content-Type": "application/json" },
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
const client = new ApiClient("https://api.example.test");
|
||||
const result = await client.cancelTaskById("task-1");
|
||||
|
||||
expect(result.id).toBe("");
|
||||
expect(result.cancelled_chat_message).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
describe("chat attachment wiring", () => {
|
||||
it("uploadFile includes chat_session_id in the FormData body", async () => {
|
||||
const fetchMock = vi.fn().mockResolvedValue(
|
||||
|
||||
@@ -66,6 +66,7 @@ import type {
|
||||
ChatPendingTask,
|
||||
PendingChatTasksResponse,
|
||||
SendChatMessageResponse,
|
||||
CancelTaskResponse,
|
||||
Project,
|
||||
CreateProjectRequest,
|
||||
UpdateProjectRequest,
|
||||
@@ -132,6 +133,7 @@ import {
|
||||
AgentTemplateSchema,
|
||||
AgentTemplateSummaryListSchema,
|
||||
AttachmentResponseSchema,
|
||||
CancelTaskResponseSchema,
|
||||
ChildIssuesResponseSchema,
|
||||
CommentsListSchema,
|
||||
CommentTriggerPreviewSchema,
|
||||
@@ -190,6 +192,7 @@ import {
|
||||
EMPTY_CREATE_BILLING_CHECKOUT_SESSION_RESPONSE,
|
||||
EMPTY_BILLING_CHECKOUT_SESSION_STATUS,
|
||||
EMPTY_CREATE_BILLING_PORTAL_SESSION_RESPONSE,
|
||||
EMPTY_CANCEL_TASK_RESPONSE,
|
||||
} from "./schemas";
|
||||
|
||||
/** Identifies the calling client to the server.
|
||||
@@ -1683,8 +1686,11 @@ export class ApiClient {
|
||||
await this.fetch(`/api/chat/sessions/${sessionId}/read`, { method: "POST" });
|
||||
}
|
||||
|
||||
async cancelTaskById(taskId: string): Promise<void> {
|
||||
await this.fetch(`/api/tasks/${taskId}/cancel`, { method: "POST" });
|
||||
async cancelTaskById(taskId: string): Promise<CancelTaskResponse> {
|
||||
const raw = await this.fetch<unknown>(`/api/tasks/${taskId}/cancel`, { method: "POST" });
|
||||
return parseWithFallback(raw, CancelTaskResponseSchema, EMPTY_CANCEL_TASK_RESPONSE, {
|
||||
endpoint: "POST /api/tasks/{taskId}/cancel",
|
||||
});
|
||||
}
|
||||
|
||||
async listAttachments(issueId: string): Promise<Attachment[]> {
|
||||
|
||||
@@ -10,6 +10,7 @@ import type {
|
||||
BillingPriceTier,
|
||||
BillingTopupsPage,
|
||||
BillingTransactionsPage,
|
||||
CancelTaskResponse,
|
||||
CreateAgentFromTemplateResponse,
|
||||
CreateBillingCheckoutSessionResponse,
|
||||
CreateBillingPortalSessionResponse,
|
||||
@@ -420,6 +421,67 @@ const RuntimeUsageByHourSchema = z.object({
|
||||
|
||||
export const RuntimeUsageByHourListSchema = z.array(RuntimeUsageByHourSchema);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Task cancellation (`POST /api/tasks/:id/cancel`)
|
||||
//
|
||||
// This response is consumed directly by chat recovery. The embedded task
|
||||
// object stays loose so daemon/runtime fields can drift, but the optional
|
||||
// `cancelled_chat_message` payload must be well-formed before the UI deletes
|
||||
// a message from cache or restores text into the input.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const AgentTaskResponseSchema = z.object({
|
||||
id: z.string(),
|
||||
agent_id: z.string().default(""),
|
||||
runtime_id: z.string().default(""),
|
||||
issue_id: z.string().default(""),
|
||||
status: z.string().default("cancelled"),
|
||||
priority: z.number().default(0),
|
||||
dispatched_at: z.string().nullable().default(null),
|
||||
started_at: z.string().nullable().default(null),
|
||||
completed_at: z.string().nullable().default(null),
|
||||
result: z.unknown().default(null),
|
||||
error: z.string().nullable().default(null),
|
||||
failure_reason: z.string().optional(),
|
||||
created_at: z.string().default(""),
|
||||
chat_session_id: z.string().optional(),
|
||||
autopilot_run_id: z.string().optional(),
|
||||
parent_task_id: z.string().optional(),
|
||||
attempt: z.number().optional(),
|
||||
trigger_comment_id: z.string().optional(),
|
||||
trigger_summary: z.string().optional(),
|
||||
kind: z.string().optional(),
|
||||
work_dir: z.string().optional(),
|
||||
relative_work_dir: z.string().optional(),
|
||||
}).loose();
|
||||
|
||||
const CancelledChatMessageSchema = z.object({
|
||||
chat_session_id: z.string(),
|
||||
message_id: z.string(),
|
||||
content: z.string(),
|
||||
restore_to_input: z.boolean().default(false),
|
||||
}).loose();
|
||||
|
||||
export const CancelTaskResponseSchema = AgentTaskResponseSchema.extend({
|
||||
cancelled_chat_message: CancelledChatMessageSchema.nullish()
|
||||
.transform((value) => value ?? undefined),
|
||||
}).loose();
|
||||
|
||||
export const EMPTY_CANCEL_TASK_RESPONSE: CancelTaskResponse = {
|
||||
id: "",
|
||||
agent_id: "",
|
||||
runtime_id: "",
|
||||
issue_id: "",
|
||||
status: "cancelled",
|
||||
priority: 0,
|
||||
dispatched_at: null,
|
||||
started_at: null,
|
||||
completed_at: null,
|
||||
result: null,
|
||||
error: null,
|
||||
created_at: "",
|
||||
};
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Agent template catalog — `/api/agent-templates*` and the
|
||||
// create-from-template response. The desktop app's create-agent picker
|
||||
|
||||
@@ -19,6 +19,7 @@ import {
|
||||
applyChatDoneToCache,
|
||||
applyWorkspaceUpdatedToCache,
|
||||
handleInboxNew,
|
||||
invalidateChatMessageQueries,
|
||||
resolveInboxSourceSlug,
|
||||
} from "./use-realtime-sync";
|
||||
|
||||
@@ -134,6 +135,18 @@ describe("applyChatDoneToCache", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("invalidateChatMessageQueries", () => {
|
||||
it("invalidates both legacy and paged chat message caches", () => {
|
||||
const qc = createQueryClient();
|
||||
const invalidate = vi.spyOn(qc, "invalidateQueries");
|
||||
|
||||
invalidateChatMessageQueries(qc, sessionId);
|
||||
|
||||
expect(invalidate).toHaveBeenCalledWith({ queryKey: chatKeys.messages(sessionId) });
|
||||
expect(invalidate).toHaveBeenCalledWith({ queryKey: chatKeys.messagesPage(sessionId) });
|
||||
});
|
||||
});
|
||||
|
||||
describe("applyWorkspaceUpdatedToCache", () => {
|
||||
const wsId = "ws-1";
|
||||
|
||||
|
||||
@@ -89,6 +89,14 @@ const chatWsLogger = createLogger("chat.ws");
|
||||
|
||||
const logger = createLogger("realtime-sync");
|
||||
|
||||
export function invalidateChatMessageQueries(
|
||||
qc: QueryClient,
|
||||
sessionId: string,
|
||||
) {
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
|
||||
}
|
||||
|
||||
export function applyChatDoneToCache(
|
||||
qc: QueryClient,
|
||||
payload: ChatDonePayload,
|
||||
@@ -125,8 +133,7 @@ export function applyChatDoneToCache(
|
||||
qc.setQueryData(chatKeys.pendingTask(sessionId), {});
|
||||
// Authoritative refetch reconciles redaction / migrations / clients
|
||||
// that took the fallback branch above.
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
|
||||
invalidateChatMessageQueries(qc, sessionId);
|
||||
qc.invalidateQueries({ queryKey: chatKeys.pendingTask(sessionId) });
|
||||
}
|
||||
|
||||
@@ -835,7 +842,7 @@ export function useRealtimeSync(
|
||||
const unsubChatMessage = ws.on("chat:message", (p) => {
|
||||
const payload = p as { chat_session_id: string };
|
||||
chatWsLogger.info("chat:message (global)", { chat_session_id: payload.chat_session_id });
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messages(payload.chat_session_id) });
|
||||
invalidateChatMessageQueries(qc, payload.chat_session_id);
|
||||
qc.invalidateQueries({ queryKey: chatKeys.pendingTask(payload.chat_session_id) });
|
||||
invalidatePendingAggregate();
|
||||
});
|
||||
@@ -949,6 +956,9 @@ export function useRealtimeSync(
|
||||
// 2. another tab / admin / system cancels — this is the only path that
|
||||
// drops the pending pill in those cases. Without it the pill spins
|
||||
// forever in the second-tab scenario.
|
||||
// CancelTask also persists a best-effort assistant snapshot when the
|
||||
// stopped chat task had already streamed transcript rows, so refresh the
|
||||
// message page along with clearing pending.
|
||||
const unsubTaskCancelled = ws.on("task:cancelled", (p) => {
|
||||
const payload = p as TaskCancelledPayload;
|
||||
if (!payload.chat_session_id) return;
|
||||
@@ -957,6 +967,7 @@ export function useRealtimeSync(
|
||||
chat_session_id: payload.chat_session_id,
|
||||
});
|
||||
qc.setQueryData(chatKeys.pendingTask(payload.chat_session_id), {});
|
||||
invalidateChatMessageQueries(qc, payload.chat_session_id);
|
||||
invalidatePendingAggregate();
|
||||
});
|
||||
|
||||
@@ -990,7 +1001,7 @@ export function useRealtimeSync(
|
||||
// this branch only flipped pending — the comment "No new message"
|
||||
// was true then, but FailTask now persists a row.
|
||||
qc.setQueryData(chatKeys.pendingTask(payload.chat_session_id), {});
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messages(payload.chat_session_id) });
|
||||
invalidateChatMessageQueries(qc, payload.chat_session_id);
|
||||
qc.invalidateQueries({ queryKey: chatKeys.pendingTask(payload.chat_session_id) });
|
||||
invalidatePendingAggregate();
|
||||
});
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import type { AgentTask } from "./agent";
|
||||
|
||||
export interface ChatSession {
|
||||
id: string;
|
||||
workspace_id: string;
|
||||
@@ -79,6 +81,17 @@ export interface SendChatMessageResponse {
|
||||
created_at: string;
|
||||
}
|
||||
|
||||
export interface CancelledChatMessage {
|
||||
chat_session_id: string;
|
||||
message_id: string;
|
||||
content: string;
|
||||
restore_to_input: boolean;
|
||||
}
|
||||
|
||||
export interface CancelTaskResponse extends AgentTask {
|
||||
cancelled_chat_message?: CancelledChatMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Response from GET /api/chat/sessions/{id}/pending-task.
|
||||
* All fields are absent when the session has no in-flight task.
|
||||
|
||||
@@ -68,7 +68,17 @@ export type * from "./events";
|
||||
export type * from "./api";
|
||||
export type { Attachment } from "./attachment";
|
||||
export { attachmentDownloadPath, attachmentIdFromDownloadURL, contentReferencesAttachment } from "./attachment-url";
|
||||
export type { ChatSession, ChatMessage, ChatMessagesPage, ChatPendingTask, PendingChatTaskItem, PendingChatTasksResponse, SendChatMessageResponse } from "./chat";
|
||||
export type {
|
||||
ChatSession,
|
||||
ChatMessage,
|
||||
ChatMessagesPage,
|
||||
ChatPendingTask,
|
||||
PendingChatTaskItem,
|
||||
PendingChatTasksResponse,
|
||||
SendChatMessageResponse,
|
||||
CancelledChatMessage,
|
||||
CancelTaskResponse,
|
||||
} from "./chat";
|
||||
export type { StorageAdapter } from "./storage";
|
||||
export type {
|
||||
Project,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { forwardRef, useRef, useImperativeHandle } from "react";
|
||||
import { describe, it, expect, vi } from "vitest";
|
||||
import { beforeEach, describe, it, expect, vi } from "vitest";
|
||||
import { act, render, screen, fireEvent, waitFor } from "@testing-library/react";
|
||||
import { I18nProvider } from "@multica/core/i18n/react";
|
||||
import type { UploadResult } from "@multica/core/hooks/use-file-upload";
|
||||
@@ -130,6 +130,24 @@ vi.mock("@multica/core/chat", () => {
|
||||
});
|
||||
|
||||
import { ChatInput } from "./chat-input";
|
||||
import { useChatStore } from "@multica/core/chat";
|
||||
|
||||
beforeEach(() => {
|
||||
dropHandlers.onDrop = null;
|
||||
editorProps.last = null;
|
||||
const state = useChatStore.getState() as unknown as {
|
||||
activeSessionId: string | null;
|
||||
selectedAgentId: string;
|
||||
inputDrafts: Record<string, string>;
|
||||
setInputDraft: ReturnType<typeof vi.fn>;
|
||||
clearInputDraft: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
state.activeSessionId = null;
|
||||
state.selectedAgentId = "agent-1";
|
||||
state.inputDrafts = {};
|
||||
state.setInputDraft.mockClear();
|
||||
state.clearInputDraft.mockClear();
|
||||
});
|
||||
|
||||
function renderInput(props: Partial<React.ComponentProps<typeof ChatInput>> = {}) {
|
||||
const onSend = props.onSend ?? vi.fn();
|
||||
@@ -313,3 +331,105 @@ describe("ChatInput attachment wiring", () => {
|
||||
expect(buttons.length).toBe(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ChatInput async send", () => {
|
||||
it("restores a cancelled empty run draft into the editor", async () => {
|
||||
const onRestoreDraftConsumed = vi.fn();
|
||||
renderInput({
|
||||
restoreDraftRequest: {
|
||||
id: "msg-restored",
|
||||
content: "bring this back",
|
||||
},
|
||||
onRestoreDraftConsumed,
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(useChatStore.getState().setInputDraft).toHaveBeenCalledWith(
|
||||
"__draft_new__:agent-1",
|
||||
"bring this back",
|
||||
);
|
||||
expect(editorProps.last?.defaultValue).toBe("bring this back");
|
||||
expect(onRestoreDraftConsumed).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
});
|
||||
|
||||
it("consumes a restore request even when an existing draft blocks restore", async () => {
|
||||
const state = useChatStore.getState() as unknown as {
|
||||
inputDrafts: Record<string, string>;
|
||||
setInputDraft: ReturnType<typeof vi.fn>;
|
||||
};
|
||||
state.inputDrafts["__draft_new__:agent-1"] = "already typing";
|
||||
const onRestoreDraftConsumed = vi.fn();
|
||||
|
||||
renderInput({
|
||||
restoreDraftRequest: {
|
||||
id: "msg-restored",
|
||||
content: "bring this back",
|
||||
},
|
||||
onRestoreDraftConsumed,
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(onRestoreDraftConsumed).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
expect(state.setInputDraft).not.toHaveBeenCalledWith(
|
||||
"__draft_new__:agent-1",
|
||||
"bring this back",
|
||||
);
|
||||
});
|
||||
|
||||
it("keeps the draft while send is pending and clears after acceptance", async () => {
|
||||
let resolveSend: (accepted: boolean) => void;
|
||||
const sendPromise = new Promise<boolean>((res) => {
|
||||
resolveSend = res;
|
||||
});
|
||||
const onSend = vi.fn(() => sendPromise);
|
||||
renderInput({ onSend });
|
||||
|
||||
fireEvent.change(screen.getByTestId("editor"), { target: { value: "slow network" } });
|
||||
|
||||
let sendButton: HTMLElement;
|
||||
await waitFor(() => {
|
||||
const buttons = screen.getAllByRole("button");
|
||||
sendButton = buttons[buttons.length - 1]!;
|
||||
expect(sendButton).not.toBeDisabled();
|
||||
});
|
||||
|
||||
fireEvent.click(sendButton!);
|
||||
|
||||
expect(onSend).toHaveBeenCalledWith("slow network", undefined);
|
||||
expect(useChatStore.getState().clearInputDraft).not.toHaveBeenCalled();
|
||||
await waitFor(() => expect(sendButton!).toBeDisabled());
|
||||
|
||||
await act(async () => {
|
||||
resolveSend!(true);
|
||||
await sendPromise;
|
||||
});
|
||||
|
||||
await waitFor(() => {
|
||||
expect(useChatStore.getState().clearInputDraft).toHaveBeenCalledWith("__draft_new__:agent-1");
|
||||
});
|
||||
});
|
||||
|
||||
it("keeps the draft when send is rejected by the owner", async () => {
|
||||
const onSend = vi.fn(async () => false);
|
||||
renderInput({ onSend });
|
||||
|
||||
fireEvent.change(screen.getByTestId("editor"), { target: { value: "retry me" } });
|
||||
|
||||
let sendButton: HTMLElement;
|
||||
await waitFor(() => {
|
||||
const buttons = screen.getAllByRole("button");
|
||||
sendButton = buttons[buttons.length - 1]!;
|
||||
expect(sendButton).not.toBeDisabled();
|
||||
});
|
||||
|
||||
await act(async () => {
|
||||
fireEvent.click(sendButton!);
|
||||
await Promise.resolve();
|
||||
});
|
||||
|
||||
expect(onSend).toHaveBeenCalledWith("retry me", undefined);
|
||||
expect(useChatStore.getState().clearInputDraft).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
"use client";
|
||||
|
||||
import type { ReactNode } from "react";
|
||||
import { useCallback, useRef, useState } from "react";
|
||||
import { useCallback, useEffect, useRef, useState } from "react";
|
||||
import { cn } from "@multica/ui/lib/utils";
|
||||
import {
|
||||
ContentEditor,
|
||||
@@ -21,7 +21,9 @@ import { useT } from "../../i18n";
|
||||
const logger = createLogger("chat.ui");
|
||||
|
||||
interface ChatInputProps {
|
||||
onSend: (content: string, attachmentIds?: string[]) => void;
|
||||
onSend: (content: string, attachmentIds?: string[]) => void | boolean | Promise<void | boolean>;
|
||||
restoreDraftRequest?: { id: string; content: string } | null;
|
||||
onRestoreDraftConsumed?: () => void;
|
||||
/** Receives a File and returns the attachment row (with id + CDN link).
|
||||
* The wrapper owner (ChatWindow) lazy-creates a chat_session if needed
|
||||
* and forwards `chatSessionId` to the upload — chat-input only cares
|
||||
@@ -46,6 +48,8 @@ interface ChatInputProps {
|
||||
|
||||
export function ChatInput({
|
||||
onSend,
|
||||
restoreDraftRequest,
|
||||
onRestoreDraftConsumed,
|
||||
onUploadFile,
|
||||
onStop,
|
||||
isRunning,
|
||||
@@ -67,9 +71,10 @@ export function ChatInput({
|
||||
// mid-compose gives each agent its own draft. This is a STORAGE key, not
|
||||
// a React identity.
|
||||
//
|
||||
// `editorKey` — React `key` on the ContentEditor. Used ONLY to force a
|
||||
// `editorKey` — React `key` on the ContentEditor. Used to force a
|
||||
// remount when the user explicitly switches agent (so Tiptap's
|
||||
// Placeholder, which only reads on mount, refreshes to "Tell {agent}…").
|
||||
// Placeholder, which only reads on mount, refreshes to "Tell {agent}…")
|
||||
// or when a cancelled empty run restores a draft from the server.
|
||||
// Crucially this does NOT include `activeSessionId`: when the user
|
||||
// uploads a file in a brand-new chat, `handleUploadFile` first awaits
|
||||
// `ensureSession` which lazily creates the session and flips
|
||||
@@ -82,12 +87,19 @@ export function ChatInput({
|
||||
// first-upload-creates-session work the same as second-upload.
|
||||
const draftKey =
|
||||
activeSessionId ?? `${DRAFT_NEW_SESSION}:${selectedAgentId ?? ""}`;
|
||||
const editorKey = selectedAgentId ?? "no-agent";
|
||||
// Select a primitive — empty-string fallback keeps referential stability.
|
||||
const inputDraft = useChatStore((s) => s.inputDrafts[draftKey] ?? "");
|
||||
const setInputDraft = useChatStore((s) => s.setInputDraft);
|
||||
const clearInputDraft = useChatStore((s) => s.clearInputDraft);
|
||||
const [isEmpty, setIsEmpty] = useState(!inputDraft.trim());
|
||||
const [isSubmitting, setIsSubmitting] = useState(false);
|
||||
const [editorRestore, setEditorRestore] = useState<{
|
||||
id: string;
|
||||
content: string;
|
||||
draftKey: string;
|
||||
} | null>(null);
|
||||
const activeRestore = editorRestore?.draftKey === draftKey ? editorRestore : null;
|
||||
const editorKey = `${selectedAgentId ?? "no-agent"}:${activeRestore?.id ?? "base"}`;
|
||||
// Number of in-flight uploads. We track this explicitly (rather than
|
||||
// peeking at the editor on every render) so the SubmitButton visibly
|
||||
// disables the instant an upload starts and re-enables the instant it
|
||||
@@ -110,6 +122,26 @@ export function ChatInput({
|
||||
// attachment never binds to the chat message.
|
||||
const uploadMapRef = useRef<Map<string, string>>(new Map());
|
||||
|
||||
useEffect(() => {
|
||||
if (!restoreDraftRequest) return;
|
||||
if (inputDraft.trim()) {
|
||||
logger.info("input.restore skipped: draft already has content", {
|
||||
draftKey,
|
||||
restoreId: restoreDraftRequest.id,
|
||||
});
|
||||
onRestoreDraftConsumed?.();
|
||||
return;
|
||||
}
|
||||
setInputDraft(draftKey, restoreDraftRequest.content);
|
||||
setIsEmpty(!restoreDraftRequest.content.trim());
|
||||
setEditorRestore({
|
||||
id: restoreDraftRequest.id,
|
||||
content: restoreDraftRequest.content,
|
||||
draftKey,
|
||||
});
|
||||
onRestoreDraftConsumed?.();
|
||||
}, [draftKey, inputDraft, onRestoreDraftConsumed, restoreDraftRequest, setInputDraft]);
|
||||
|
||||
const handleUpload = useCallback(
|
||||
async (file: File): Promise<UploadResult | null> => {
|
||||
if (!onUploadFile) return null;
|
||||
@@ -135,12 +167,13 @@ export function ChatInput({
|
||||
onDrop: (files) => files.forEach((f) => editorRef.current?.uploadFile(f)),
|
||||
});
|
||||
|
||||
const handleSend = () => {
|
||||
const handleSend = async () => {
|
||||
const content = editorRef.current?.getMarkdown()?.replace(/(\n\s*)+$/, "").trim();
|
||||
if (!content || isRunning || disabled || noAgent) {
|
||||
if (!content || isRunning || isSubmitting || disabled || noAgent) {
|
||||
logger.debug("input.send skipped", {
|
||||
emptyContent: !content,
|
||||
isRunning,
|
||||
isSubmitting,
|
||||
disabled,
|
||||
noAgent,
|
||||
});
|
||||
@@ -172,7 +205,17 @@ export function ChatInput({
|
||||
draftKey: keyAtSend,
|
||||
attachmentCount: activeIds.length,
|
||||
});
|
||||
onSend(content, activeIds.length > 0 ? activeIds : undefined);
|
||||
setIsSubmitting(true);
|
||||
let accepted: void | boolean;
|
||||
try {
|
||||
accepted = await onSend(content, activeIds.length > 0 ? activeIds : undefined);
|
||||
} catch (err) {
|
||||
logger.warn("input.send failed", err);
|
||||
setIsSubmitting(false);
|
||||
return;
|
||||
}
|
||||
setIsSubmitting(false);
|
||||
if (accepted === false) return;
|
||||
editorRef.current?.clearContent();
|
||||
// Drop focus so the caret doesn't keep blinking under the StatusPill /
|
||||
// streaming reply that's about to take over the user's attention. The
|
||||
@@ -228,7 +271,7 @@ export function ChatInput({
|
||||
// intentionally does not depend on activeSessionId.
|
||||
key={editorKey}
|
||||
ref={editorRef}
|
||||
defaultValue={inputDraft}
|
||||
defaultValue={activeRestore?.content ?? inputDraft}
|
||||
placeholder={placeholder}
|
||||
onUpdate={(md) => {
|
||||
setIsEmpty(!md.trim());
|
||||
@@ -264,7 +307,7 @@ export function ChatInput({
|
||||
)}
|
||||
<SubmitButton
|
||||
onClick={handleSend}
|
||||
disabled={isEmpty || !!disabled || !!noAgent || pendingUploads > 0}
|
||||
disabled={isEmpty || isSubmitting || !!disabled || !!noAgent || pendingUploads > 0}
|
||||
running={isRunning}
|
||||
onStop={onStop}
|
||||
tooltip={`${t(($) => $.input.send_tooltip)} · ${formatShortcut(modKey, enterKey)}`}
|
||||
|
||||
@@ -12,6 +12,7 @@ import {
|
||||
PopoverContent,
|
||||
PopoverTrigger,
|
||||
} from "@multica/ui/components/ui/popover";
|
||||
import { toast } from "sonner";
|
||||
import { useWorkspaceId } from "@multica/core/hooks";
|
||||
import { useAuthStore } from "@multica/core/auth";
|
||||
import { agentListOptions, memberListOptions } from "@multica/core/workspace/queries";
|
||||
@@ -35,6 +36,7 @@ import {
|
||||
pendingChatTaskOptions,
|
||||
pendingChatTasksOptions,
|
||||
chatKeys,
|
||||
isTaskMessageTaskId,
|
||||
} from "@multica/core/chat/queries";
|
||||
import {
|
||||
useCreateChatSession,
|
||||
@@ -75,6 +77,106 @@ function seedChatMessagesPageCache(
|
||||
);
|
||||
}
|
||||
|
||||
function appendChatMessageToLatestPageCache(
|
||||
qc: ReturnType<typeof useQueryClient>,
|
||||
sessionId: string,
|
||||
message: ChatMessage,
|
||||
) {
|
||||
qc.setQueryData<InfiniteData<ChatMessagesPage>>(
|
||||
chatKeys.messagesPage(sessionId),
|
||||
(old) => {
|
||||
if (!old) {
|
||||
return {
|
||||
pages: [{
|
||||
messages: [message],
|
||||
limit: 50,
|
||||
has_more: false,
|
||||
next_cursor: null,
|
||||
}],
|
||||
pageParams: [null],
|
||||
};
|
||||
}
|
||||
if (old.pages.some((page) => page.messages.some((m) => m.id === message.id))) {
|
||||
return old;
|
||||
}
|
||||
return {
|
||||
...old,
|
||||
pages: old.pages.map((page, index) =>
|
||||
index === 0 ? { ...page, messages: [...page.messages, message] } : page,
|
||||
),
|
||||
};
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function removeChatMessageFromPageCache(
|
||||
qc: ReturnType<typeof useQueryClient>,
|
||||
sessionId: string,
|
||||
messageId: string,
|
||||
) {
|
||||
qc.setQueryData<InfiniteData<ChatMessagesPage> | undefined>(
|
||||
chatKeys.messagesPage(sessionId),
|
||||
(old) => {
|
||||
if (!old) return old;
|
||||
return {
|
||||
...old,
|
||||
pages: old.pages.map((page) => ({
|
||||
...page,
|
||||
messages: page.messages.filter((m) => m.id !== messageId),
|
||||
})),
|
||||
};
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
function removeChatMessageFromCaches(
|
||||
qc: ReturnType<typeof useQueryClient>,
|
||||
sessionId: string,
|
||||
messageId: string,
|
||||
) {
|
||||
qc.setQueryData<ChatMessage[]>(
|
||||
chatKeys.messages(sessionId),
|
||||
(old) => old?.filter((m) => m.id !== messageId) ?? old,
|
||||
);
|
||||
removeChatMessageFromPageCache(qc, sessionId, messageId);
|
||||
}
|
||||
|
||||
function replaceOptimisticChatMessageId(
|
||||
qc: ReturnType<typeof useQueryClient>,
|
||||
sessionId: string,
|
||||
optimisticId: string,
|
||||
messageId: string,
|
||||
taskId: string,
|
||||
) {
|
||||
const replace = (messages: ChatMessage[] | undefined) => {
|
||||
if (!messages) return messages;
|
||||
if (messages.some((m) => m.id === messageId)) {
|
||||
return messages.filter((m) => m.id !== optimisticId);
|
||||
}
|
||||
return messages.map((m) =>
|
||||
m.id === optimisticId ? { ...m, id: messageId, task_id: taskId } : m,
|
||||
);
|
||||
};
|
||||
|
||||
qc.setQueryData<ChatMessage[]>(
|
||||
chatKeys.messages(sessionId),
|
||||
replace,
|
||||
);
|
||||
qc.setQueryData<InfiniteData<ChatMessagesPage> | undefined>(
|
||||
chatKeys.messagesPage(sessionId),
|
||||
(old) => {
|
||||
if (!old) return old;
|
||||
return {
|
||||
...old,
|
||||
pages: old.pages.map((page) => ({
|
||||
...page,
|
||||
messages: replace(page.messages) ?? page.messages,
|
||||
})),
|
||||
};
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
export function ChatWindow() {
|
||||
const { t } = useT("chat");
|
||||
const wsId = useWorkspaceId();
|
||||
@@ -123,6 +225,14 @@ export function ChatWindow() {
|
||||
pendingChatTaskOptions(activeSessionId ?? ""),
|
||||
);
|
||||
const pendingTaskId = pendingTask?.task_id ?? null;
|
||||
const stopRequestedBeforeTaskRef = useRef(false);
|
||||
const [restoreDraftRequest, setRestoreDraftRequest] = useState<{
|
||||
id: string;
|
||||
content: string;
|
||||
} | null>(null);
|
||||
const handleRestoreDraftConsumed = useCallback(() => {
|
||||
setRestoreDraftRequest(null);
|
||||
}, []);
|
||||
|
||||
// Legacy archived sessions (the old soft-archive feature was removed but
|
||||
// pre-existing rows with status='archived' may still exist) are excluded
|
||||
@@ -277,11 +387,58 @@ export function ChatWindow() {
|
||||
[ensureSession, uploadWithToast, qc, setActiveSession],
|
||||
);
|
||||
|
||||
const cancelChatTask = useCallback(
|
||||
async (
|
||||
taskId: string,
|
||||
sessionId: string,
|
||||
options: { restoreDraftToInput: boolean; source: string },
|
||||
) => {
|
||||
apiLogger.info("cancelTask.start", {
|
||||
taskId,
|
||||
sessionId,
|
||||
source: options.source,
|
||||
});
|
||||
qc.setQueryData(chatKeys.pendingTask(sessionId), {});
|
||||
|
||||
try {
|
||||
const result = await api.cancelTaskById(taskId);
|
||||
const restored = result.cancelled_chat_message;
|
||||
if (restored?.restore_to_input) {
|
||||
removeChatMessageFromCaches(qc, restored.chat_session_id, restored.message_id);
|
||||
if (options.restoreDraftToInput && restored.chat_session_id === sessionId) {
|
||||
setRestoreDraftRequest({
|
||||
id: restored.message_id,
|
||||
content: restored.content,
|
||||
});
|
||||
}
|
||||
}
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
|
||||
apiLogger.info("cancelTask.success", {
|
||||
taskId,
|
||||
sessionId,
|
||||
restoredToInput: !!restored?.restore_to_input && options.restoreDraftToInput,
|
||||
});
|
||||
return result;
|
||||
} catch (err) {
|
||||
apiLogger.warn("cancelTask.error (task may have already finished)", {
|
||||
taskId,
|
||||
sessionId,
|
||||
err,
|
||||
});
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
|
||||
return null;
|
||||
}
|
||||
},
|
||||
[qc],
|
||||
);
|
||||
|
||||
const handleSend = useCallback(
|
||||
async (content: string, attachmentIds?: string[]) => {
|
||||
async (content: string, attachmentIds?: string[]): Promise<boolean> => {
|
||||
if (!activeAgent) {
|
||||
apiLogger.warn("sendChatMessage skipped: no active agent");
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
const finalContent = content;
|
||||
@@ -296,10 +453,17 @@ export function ChatWindow() {
|
||||
attachmentCount: attachmentIds?.length ?? 0,
|
||||
});
|
||||
|
||||
const sessionId = await ensureSession(finalContent);
|
||||
let sessionId: string | null = null;
|
||||
try {
|
||||
sessionId = await ensureSession(finalContent);
|
||||
} catch (err) {
|
||||
apiLogger.error("sendChatMessage.ensureSession.error", err);
|
||||
toast.error(t(($) => $.input.send_failed_toast));
|
||||
return false;
|
||||
}
|
||||
if (!sessionId) {
|
||||
apiLogger.warn("sendChatMessage aborted: ensureSession returned null");
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Optimistic burst — everything that gives the user "I sent a message
|
||||
@@ -322,7 +486,7 @@ export function ChatWindow() {
|
||||
// "new-chat first-message" white flash. Priming the cache first means
|
||||
// the very first read after activeSessionId flips hits data
|
||||
// synchronously and ChatMessageList mounts directly.
|
||||
seedChatMessagesPageCache(qc, sessionId, [optimistic]);
|
||||
appendChatMessageToLatestPageCache(qc, sessionId, optimistic);
|
||||
qc.setQueryData<ChatMessage[]>(
|
||||
chatKeys.messages(sessionId),
|
||||
(old) => (old ? [...old, optimistic] : [optimistic]),
|
||||
@@ -342,12 +506,23 @@ export function ChatWindow() {
|
||||
setActiveSession(sessionId);
|
||||
apiLogger.debug("sendChatMessage.optimistic", { sessionId, optimisticId: optimistic.id });
|
||||
|
||||
const result = await api.sendChatMessage(sessionId, finalContent, attachmentIds);
|
||||
let result;
|
||||
try {
|
||||
result = await api.sendChatMessage(sessionId, finalContent, attachmentIds);
|
||||
} catch (err) {
|
||||
apiLogger.error("sendChatMessage.error.rollback", { sessionId, optimisticId: optimistic.id, err });
|
||||
stopRequestedBeforeTaskRef.current = false;
|
||||
removeChatMessageFromCaches(qc, sessionId, optimistic.id);
|
||||
qc.setQueryData(chatKeys.pendingTask(sessionId), {});
|
||||
toast.error(t(($) => $.input.send_failed_toast));
|
||||
return false;
|
||||
}
|
||||
apiLogger.info("sendChatMessage.success", {
|
||||
sessionId,
|
||||
messageId: result.message_id,
|
||||
taskId: result.task_id,
|
||||
});
|
||||
replaceOptimisticChatMessageId(qc, sessionId, optimistic.id, result.message_id, result.task_id);
|
||||
// Replace the temporary task_id with the server's real one (so the WS
|
||||
// task: handlers can match against it) and snap the anchor to the
|
||||
// server's created_at — keeping the elapsed-seconds reading stable.
|
||||
@@ -356,15 +531,26 @@ export function ChatWindow() {
|
||||
status: "queued",
|
||||
created_at: result.created_at,
|
||||
});
|
||||
if (stopRequestedBeforeTaskRef.current) {
|
||||
stopRequestedBeforeTaskRef.current = false;
|
||||
await cancelChatTask(result.task_id, sessionId, {
|
||||
restoreDraftToInput: true,
|
||||
source: "deferred-send",
|
||||
});
|
||||
return false;
|
||||
}
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) });
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) });
|
||||
return true;
|
||||
},
|
||||
[
|
||||
activeSessionId,
|
||||
activeAgent,
|
||||
ensureSession,
|
||||
cancelChatTask,
|
||||
qc,
|
||||
setActiveSession,
|
||||
t,
|
||||
],
|
||||
);
|
||||
|
||||
@@ -373,27 +559,19 @@ export function ChatWindow() {
|
||||
apiLogger.debug("cancelTask skipped: no pending task");
|
||||
return;
|
||||
}
|
||||
// Optimistic clear — pill disappears + input unlocks the moment the
|
||||
// user clicks Stop, instead of after the HTTP roundtrip. WS
|
||||
// task:cancelled will confirm later (no-op if cache is already empty);
|
||||
// if the cancel POST fails because the task already finished, the
|
||||
// assistant message arrives via task:completed → chat:done and renders
|
||||
// normally. Either way the UI is in sync with reality without latency.
|
||||
apiLogger.info("cancelTask.start", { taskId: pendingTaskId, sessionId: activeSessionId });
|
||||
qc.setQueryData(chatKeys.pendingTask(activeSessionId), {});
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messages(activeSessionId) });
|
||||
qc.invalidateQueries({ queryKey: chatKeys.messagesPage(activeSessionId) });
|
||||
// Fire-and-forget — UI is already in its post-cancel state. We log the
|
||||
// outcome but never block on it.
|
||||
api.cancelTaskById(pendingTaskId).then(
|
||||
() => apiLogger.info("cancelTask.success", { taskId: pendingTaskId }),
|
||||
(err) =>
|
||||
apiLogger.warn("cancelTask.error (task may have already finished)", {
|
||||
taskId: pendingTaskId,
|
||||
err,
|
||||
}),
|
||||
);
|
||||
}, [pendingTaskId, activeSessionId, qc]);
|
||||
if (!isTaskMessageTaskId(pendingTaskId)) {
|
||||
stopRequestedBeforeTaskRef.current = true;
|
||||
apiLogger.info("cancelTask.deferred until server task id", {
|
||||
taskId: pendingTaskId,
|
||||
sessionId: activeSessionId,
|
||||
});
|
||||
return;
|
||||
}
|
||||
void cancelChatTask(pendingTaskId, activeSessionId, {
|
||||
restoreDraftToInput: true,
|
||||
source: "active-input",
|
||||
});
|
||||
}, [pendingTaskId, activeSessionId, cancelChatTask]);
|
||||
|
||||
const handleSelectAgent = useCallback(
|
||||
(agent: Agent) => {
|
||||
@@ -590,6 +768,8 @@ export function ChatWindow() {
|
||||
* when there's no agent (the EmptyState above carries the CTA). */}
|
||||
<ChatInput
|
||||
onSend={handleSend}
|
||||
restoreDraftRequest={restoreDraftRequest}
|
||||
onRestoreDraftConsumed={handleRestoreDraftConsumed}
|
||||
onUploadFile={handleUploadFile}
|
||||
onStop={handleStop}
|
||||
isRunning={!!pendingTaskId}
|
||||
@@ -914,7 +1094,13 @@ function SessionDropdown({
|
||||
queryClient.invalidateQueries({ queryKey: chatKeys.messagesPage(session.id) });
|
||||
|
||||
api.cancelTaskById(task.task_id).then(
|
||||
() => apiLogger.info("cancelTask.success (history row)", { taskId: task.task_id, sessionId: session.id }),
|
||||
(result) => {
|
||||
const restored = result.cancelled_chat_message;
|
||||
if (restored?.restore_to_input) {
|
||||
removeChatMessageFromCaches(queryClient, restored.chat_session_id, restored.message_id);
|
||||
}
|
||||
apiLogger.info("cancelTask.success (history row)", { taskId: task.task_id, sessionId: session.id });
|
||||
},
|
||||
(err) =>
|
||||
apiLogger.warn("cancelTask.error (history row; task may have already finished)", {
|
||||
taskId: task.task_id,
|
||||
|
||||
@@ -11,7 +11,8 @@
|
||||
"placeholder_named": "Message {{name}}…",
|
||||
"placeholder_default": "Start a message…",
|
||||
"send_tooltip": "Send",
|
||||
"stop_tooltip": "Stop"
|
||||
"stop_tooltip": "Stop",
|
||||
"send_failed_toast": "Failed to send message"
|
||||
},
|
||||
"message_list": {
|
||||
"show_details": "Show details",
|
||||
|
||||
@@ -10,7 +10,8 @@
|
||||
"placeholder_named": "{{name}} にメッセージ…",
|
||||
"placeholder_default": "メッセージを入力…",
|
||||
"send_tooltip": "送信",
|
||||
"stop_tooltip": "停止"
|
||||
"stop_tooltip": "停止",
|
||||
"send_failed_toast": "メッセージを送信できませんでした"
|
||||
},
|
||||
"message_list": {
|
||||
"show_details": "詳細を表示",
|
||||
|
||||
@@ -11,7 +11,8 @@
|
||||
"placeholder_named": "{{name}}에게 메시지 보내기…",
|
||||
"placeholder_default": "메시지 입력…",
|
||||
"send_tooltip": "보내기",
|
||||
"stop_tooltip": "중지"
|
||||
"stop_tooltip": "중지",
|
||||
"send_failed_toast": "메시지를 보내지 못했습니다"
|
||||
},
|
||||
"message_list": {
|
||||
"show_details": "세부 정보 보기",
|
||||
|
||||
@@ -10,7 +10,8 @@
|
||||
"placeholder_named": "给 {{name}} 发消息…",
|
||||
"placeholder_default": "输入消息…",
|
||||
"send_tooltip": "发送",
|
||||
"stop_tooltip": "停止"
|
||||
"stop_tooltip": "停止",
|
||||
"send_failed_toast": "发送消息失败"
|
||||
},
|
||||
"message_list": {
|
||||
"show_details": "查看详情",
|
||||
|
||||
@@ -2,6 +2,7 @@ package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
@@ -313,6 +314,133 @@ func TestCancelTaskByUser_ChatTask_NonCreator_Returns403(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancelTaskByUser_ChatTaskWithTranscript_PersistsAssistantSnapshot(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
agentID := createHandlerTestAgent(t, "CancelChatTranscriptAgent", []byte("[]"))
|
||||
sessionID := createHandlerTestChatSession(t, agentID)
|
||||
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(context.Background(), `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, issue_id, chat_session_id, created_at)
|
||||
VALUES ($1, (SELECT runtime_id FROM agent WHERE id = $1), 'running', 0, NULL, $2, now() - interval '5 seconds')
|
||||
RETURNING id
|
||||
`, agentID, sessionID).Scan(&taskID); err != nil {
|
||||
t.Fatalf("create chat task: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||||
|
||||
if _, err := testPool.Exec(context.Background(), `
|
||||
INSERT INTO chat_message (chat_session_id, role, content, task_id)
|
||||
VALUES ($1, 'user', 'please answer', $2)
|
||||
`, sessionID, taskID); err != nil {
|
||||
t.Fatalf("create linked user chat message: %v", err)
|
||||
}
|
||||
if _, err := testPool.Exec(context.Background(), `
|
||||
INSERT INTO task_message (task_id, seq, type, content)
|
||||
VALUES ($1, 1, 'text', 'partial answer')
|
||||
`, taskID); err != nil {
|
||||
t.Fatalf("create task message: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, taskID))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp CancelTaskByUserResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("decode cancel response: %v", err)
|
||||
}
|
||||
if resp.CancelledChatMessage != nil {
|
||||
t.Fatalf("expected no restore payload when transcript exists, got %#v", resp.CancelledChatMessage)
|
||||
}
|
||||
if got := taskStatus(t, taskID); got != "cancelled" {
|
||||
t.Fatalf("task not cancelled: status = %q", got)
|
||||
}
|
||||
|
||||
var role, content, messageTaskID string
|
||||
if err := testPool.QueryRow(context.Background(), `
|
||||
SELECT role, content, COALESCE(task_id::text, '')
|
||||
FROM chat_message
|
||||
WHERE chat_session_id = $1 AND role = 'assistant'
|
||||
`, sessionID).Scan(&role, &content, &messageTaskID); err != nil {
|
||||
t.Fatalf("read cancelled assistant chat message: %v", err)
|
||||
}
|
||||
if role != "assistant" || content != "Stopped." || messageTaskID != taskID {
|
||||
t.Fatalf("assistant snapshot mismatch: role=%q content=%q task_id=%q", role, content, messageTaskID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCancelTaskByUser_ChatTaskWithoutTranscript_RestoresUserDraft(t *testing.T) {
|
||||
if testHandler == nil {
|
||||
t.Skip("database not available")
|
||||
}
|
||||
|
||||
agentID := createHandlerTestAgent(t, "CancelChatNoTranscriptAgent", []byte("[]"))
|
||||
sessionID := createHandlerTestChatSession(t, agentID)
|
||||
|
||||
var taskID string
|
||||
if err := testPool.QueryRow(context.Background(), `
|
||||
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, issue_id, chat_session_id)
|
||||
VALUES ($1, (SELECT runtime_id FROM agent WHERE id = $1), 'running', 0, NULL, $2)
|
||||
RETURNING id
|
||||
`, agentID, sessionID).Scan(&taskID); err != nil {
|
||||
t.Fatalf("create chat task: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) })
|
||||
|
||||
var userMessageID string
|
||||
const userContent = "keep this prompt"
|
||||
if err := testPool.QueryRow(context.Background(), `
|
||||
INSERT INTO chat_message (chat_session_id, role, content, task_id)
|
||||
VALUES ($1, 'user', $2, $3)
|
||||
RETURNING id
|
||||
`, sessionID, userContent, taskID).Scan(&userMessageID); err != nil {
|
||||
t.Fatalf("create linked user chat message: %v", err)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.CancelTaskByUser(w, cancelTaskByUserRequest(t, testUserID, taskID))
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var resp CancelTaskByUserResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
|
||||
t.Fatalf("decode cancel response: %v", err)
|
||||
}
|
||||
if resp.CancelledChatMessage == nil {
|
||||
t.Fatal("expected restore payload for empty transcript cancel")
|
||||
}
|
||||
if resp.CancelledChatMessage.MessageID != userMessageID ||
|
||||
resp.CancelledChatMessage.Content != userContent ||
|
||||
!resp.CancelledChatMessage.RestoreToInput {
|
||||
t.Fatalf("restore payload mismatch: %#v", resp.CancelledChatMessage)
|
||||
}
|
||||
|
||||
var count int
|
||||
if err := testPool.QueryRow(context.Background(), `
|
||||
SELECT count(*) FROM chat_message
|
||||
WHERE chat_session_id = $1 AND role = 'assistant'
|
||||
`, sessionID).Scan(&count); err != nil {
|
||||
t.Fatalf("count assistant chat messages: %v", err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Fatalf("expected no assistant snapshot for empty transcript, got %d", count)
|
||||
}
|
||||
if err := testPool.QueryRow(context.Background(), `
|
||||
SELECT count(*) FROM chat_message
|
||||
WHERE id = $1
|
||||
`, userMessageID).Scan(&count); err != nil {
|
||||
t.Fatalf("count deleted user chat message: %v", err)
|
||||
}
|
||||
if count != 0 {
|
||||
t.Fatalf("expected linked user message to be deleted, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
// TestCancelTaskByUser_PrivateAgent_PlainMember_Returns403 verifies the cancel
|
||||
// endpoint mirrors the agent Activity / snapshot visibility gate: a plain
|
||||
// member who cannot see a private agent's tasks cannot cancel them either.
|
||||
|
||||
@@ -473,6 +473,19 @@ func (h *Handler) SendChatMessage(w http.ResponseWriter, r *http.Request) {
|
||||
writeError(w, http.StatusInternalServerError, "failed to enqueue chat task: "+err.Error())
|
||||
return
|
||||
}
|
||||
if err := h.Queries.LinkChatMessageToTask(r.Context(), db.LinkChatMessageToTaskParams{
|
||||
ID: msg.ID,
|
||||
TaskID: task.ID,
|
||||
}); err != nil {
|
||||
// Don't fail the send: the task already exists and the user message
|
||||
// is persisted. The link is only needed for precise empty-cancel
|
||||
// cleanup; older/unlinked rows simply keep the historical behavior.
|
||||
slog.Warn("link user chat message to task failed",
|
||||
"message_id", uuidToString(msg.ID),
|
||||
"task_id", uuidToString(task.ID),
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
|
||||
// Touch session updated_at.
|
||||
if err := h.Queries.TouchChatSession(r.Context(), session.ID); err != nil {
|
||||
@@ -700,6 +713,18 @@ type PendingChatTaskItem struct {
|
||||
ChatSessionID string `json:"chat_session_id"`
|
||||
}
|
||||
|
||||
type CancelledChatMessageResponse struct {
|
||||
ChatSessionID string `json:"chat_session_id"`
|
||||
MessageID string `json:"message_id"`
|
||||
Content string `json:"content"`
|
||||
RestoreToInput bool `json:"restore_to_input"`
|
||||
}
|
||||
|
||||
type CancelTaskByUserResponse struct {
|
||||
AgentTaskResponse
|
||||
CancelledChatMessage *CancelledChatMessageResponse `json:"cancelled_chat_message,omitempty"`
|
||||
}
|
||||
|
||||
// ListPendingChatTasks returns every in-flight chat task owned by the current
|
||||
// user in this workspace. Drives the FAB's "running" indicator when the chat
|
||||
// window is closed (no per-session query is subscribed). Tasks belonging to
|
||||
@@ -879,13 +904,25 @@ func (h *Handler) CancelTaskByUser(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
cancelled, err := h.TaskService.CancelTask(r.Context(), taskUUID)
|
||||
cancelled, err := h.TaskService.CancelTaskWithResult(r.Context(), taskUUID)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, taskToResponse(*cancelled, workspaceID))
|
||||
resp := CancelTaskByUserResponse{
|
||||
AgentTaskResponse: taskToResponse(cancelled.Task, workspaceID),
|
||||
}
|
||||
if cancelled.CancelledChatMessage != nil {
|
||||
resp.CancelledChatMessage = &CancelledChatMessageResponse{
|
||||
ChatSessionID: cancelled.CancelledChatMessage.ChatSessionID,
|
||||
MessageID: cancelled.CancelledChatMessage.MessageID,
|
||||
Content: cancelled.CancelledChatMessage.Content,
|
||||
RestoreToInput: cancelled.CancelledChatMessage.RestoreToInput,
|
||||
}
|
||||
}
|
||||
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
@@ -90,6 +90,21 @@ func TestSendChatMessage_LinksAttachments(t *testing.T) {
|
||||
if sendResp.MessageID == "" {
|
||||
t.Fatal("expected non-empty message_id in send response")
|
||||
}
|
||||
if sendResp.TaskID == "" {
|
||||
t.Fatal("expected non-empty task_id in send response")
|
||||
}
|
||||
|
||||
var messageTaskID string
|
||||
if err := testPool.QueryRow(
|
||||
context.Background(),
|
||||
`SELECT COALESCE(task_id::text, '') FROM chat_message WHERE id = $1`,
|
||||
sendResp.MessageID,
|
||||
).Scan(&messageTaskID); err != nil {
|
||||
t.Fatalf("query chat message task id: %v", err)
|
||||
}
|
||||
if messageTaskID != sendResp.TaskID {
|
||||
t.Fatalf("chat message task_id mismatch: want %s, got %s", sendResp.TaskID, messageTaskID)
|
||||
}
|
||||
|
||||
// 3. Verify the attachment row now points at the new message.
|
||||
var dbMessageID *string
|
||||
|
||||
@@ -812,16 +812,38 @@ func (s *TaskService) CaptureCancelledTasks(ctx context.Context, cancelled []db.
|
||||
}
|
||||
}
|
||||
|
||||
type CancelledChatMessageResult struct {
|
||||
ChatSessionID string
|
||||
MessageID string
|
||||
Content string
|
||||
RestoreToInput bool
|
||||
}
|
||||
|
||||
type CancelTaskResult struct {
|
||||
Task db.AgentTaskQueue
|
||||
CancelledChatMessage *CancelledChatMessageResult
|
||||
}
|
||||
|
||||
// CancelTask cancels a single task by ID. It broadcasts a task:cancelled event
|
||||
// so frontends can update immediately.
|
||||
func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.AgentTaskQueue, error) {
|
||||
result, err := s.CancelTaskWithResult(ctx, taskID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &result.Task, nil
|
||||
}
|
||||
|
||||
// CancelTaskWithResult cancels a single task and returns any chat-specific
|
||||
// cleanup result needed by user-facing callers.
|
||||
func (s *TaskService) CancelTaskWithResult(ctx context.Context, taskID pgtype.UUID) (*CancelTaskResult, error) {
|
||||
task, err := s.Queries.CancelAgentTask(ctx, taskID)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
existing, err := s.Queries.GetAgentTask(ctx, taskID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cancel task: %w", err)
|
||||
}
|
||||
return &existing, nil
|
||||
return &CancelTaskResult{Task: existing}, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cancel task: %w", err)
|
||||
@@ -829,6 +851,7 @@ func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.A
|
||||
|
||||
slog.Info("task cancelled", "task_id", util.UUIDToString(task.ID), "issue_id", util.UUIDToString(task.IssueID))
|
||||
s.captureTaskCancelled(ctx, task)
|
||||
cancelledChatMessage := s.finalizeCancelledChatMessage(ctx, task)
|
||||
|
||||
// Reconcile agent status
|
||||
s.ReconcileAgentStatus(ctx, task.AgentID)
|
||||
@@ -836,7 +859,57 @@ func (s *TaskService) CancelTask(ctx context.Context, taskID pgtype.UUID) (*db.A
|
||||
// Broadcast cancellation as a task:failed event so frontends clear the live card
|
||||
s.broadcastTaskEvent(ctx, protocol.EventTaskCancelled, task)
|
||||
|
||||
return &task, nil
|
||||
return &CancelTaskResult{
|
||||
Task: task,
|
||||
CancelledChatMessage: cancelledChatMessage,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *TaskService) finalizeCancelledChatMessage(ctx context.Context, task db.AgentTaskQueue) *CancelledChatMessageResult {
|
||||
if !task.ChatSessionID.Valid {
|
||||
return nil
|
||||
}
|
||||
var cancelled *CancelledChatMessageResult
|
||||
if err := s.runInTx(ctx, func(qtx *db.Queries) error {
|
||||
messages, err := qtx.ListTaskMessages(ctx, task.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list cancelled chat task messages: %w", err)
|
||||
}
|
||||
if len(messages) == 0 {
|
||||
deleted, err := qtx.DeleteUserChatMessageByTask(ctx, task.ID)
|
||||
if errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete empty cancelled chat user message: %w", err)
|
||||
}
|
||||
cancelled = &CancelledChatMessageResult{
|
||||
ChatSessionID: util.UUIDToString(deleted.ChatSessionID),
|
||||
MessageID: util.UUIDToString(deleted.ID),
|
||||
Content: deleted.Content,
|
||||
RestoreToInput: true,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if _, err := qtx.CreateChatMessage(ctx, db.CreateChatMessageParams{
|
||||
ChatSessionID: task.ChatSessionID,
|
||||
Role: "assistant",
|
||||
Content: "Stopped.",
|
||||
TaskID: task.ID,
|
||||
ElapsedMs: computeChatElapsedMs(task),
|
||||
}); err != nil {
|
||||
return fmt.Errorf("create cancelled chat message: %w", err)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
slog.Error("failed to finalize cancelled chat message",
|
||||
"task_id", util.UUIDToString(task.ID),
|
||||
"chat_session_id", util.UUIDToString(task.ChatSessionID),
|
||||
"error", err,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
return cancelled
|
||||
}
|
||||
|
||||
// ClaimTask atomically claims the next queued task for an agent,
|
||||
|
||||
@@ -164,6 +164,28 @@ func (q *Queries) DeleteChatSession(ctx context.Context, arg DeleteChatSessionPa
|
||||
return err
|
||||
}
|
||||
|
||||
const deleteUserChatMessageByTask = `-- name: DeleteUserChatMessageByTask :one
|
||||
DELETE FROM chat_message
|
||||
WHERE task_id = $1 AND role = 'user'
|
||||
RETURNING id, chat_session_id, role, content, task_id, created_at, failure_reason, elapsed_ms
|
||||
`
|
||||
|
||||
func (q *Queries) DeleteUserChatMessageByTask(ctx context.Context, taskID pgtype.UUID) (ChatMessage, error) {
|
||||
row := q.db.QueryRow(ctx, deleteUserChatMessageByTask, taskID)
|
||||
var i ChatMessage
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.ChatSessionID,
|
||||
&i.Role,
|
||||
&i.Content,
|
||||
&i.TaskID,
|
||||
&i.CreatedAt,
|
||||
&i.FailureReason,
|
||||
&i.ElapsedMs,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getChatMessage = `-- name: GetChatMessage :one
|
||||
SELECT id, chat_session_id, role, content, task_id, created_at, failure_reason, elapsed_ms FROM chat_message
|
||||
WHERE id = $1
|
||||
@@ -329,6 +351,22 @@ func (q *Queries) GetPendingChatTask(ctx context.Context, chatSessionID pgtype.U
|
||||
return i, err
|
||||
}
|
||||
|
||||
const linkChatMessageToTask = `-- name: LinkChatMessageToTask :exec
|
||||
UPDATE chat_message
|
||||
SET task_id = $2
|
||||
WHERE id = $1 AND role = 'user'
|
||||
`
|
||||
|
||||
type LinkChatMessageToTaskParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
TaskID pgtype.UUID `json:"task_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) LinkChatMessageToTask(ctx context.Context, arg LinkChatMessageToTaskParams) error {
|
||||
_, err := q.db.Exec(ctx, linkChatMessageToTask, arg.ID, arg.TaskID)
|
||||
return err
|
||||
}
|
||||
|
||||
const listAllChatSessionsByCreator = `-- name: ListAllChatSessionsByCreator :many
|
||||
SELECT cs.id, cs.workspace_id, cs.agent_id, cs.creator_id, cs.title, cs.session_id, cs.work_dir, cs.status, cs.created_at, cs.updated_at, cs.unread_since, cs.runtime_id,
|
||||
(cs.unread_since IS NOT NULL)::bool AS has_unread
|
||||
|
||||
@@ -78,6 +78,16 @@ INSERT INTO chat_message (chat_session_id, role, content, task_id, failure_reaso
|
||||
VALUES ($1, $2, $3, sqlc.narg(task_id), sqlc.narg(failure_reason), sqlc.narg(elapsed_ms))
|
||||
RETURNING *;
|
||||
|
||||
-- name: LinkChatMessageToTask :exec
|
||||
UPDATE chat_message
|
||||
SET task_id = $2
|
||||
WHERE id = $1 AND role = 'user';
|
||||
|
||||
-- name: DeleteUserChatMessageByTask :one
|
||||
DELETE FROM chat_message
|
||||
WHERE task_id = $1 AND role = 'user'
|
||||
RETURNING *;
|
||||
|
||||
-- name: ListChatMessages :many
|
||||
SELECT * FROM chat_message
|
||||
WHERE chat_session_id = $1
|
||||
|
||||
Reference in New Issue
Block a user