diff --git a/packages/core/api/client.test.ts b/packages/core/api/client.test.ts index a34c76f11..e3f9517ae 100644 --- a/packages/core/api/client.test.ts +++ b/packages/core/api/client.test.ts @@ -362,6 +362,68 @@ describe("ApiClient", () => { }); }); + describe("listChatMessagesPage deployment-order fallback", () => { + const jsonResponse = (body: unknown, status: number, statusText = "") => + new Response(JSON.stringify(body), { + status, + statusText, + headers: { "Content-Type": "application/json" }, + }); + + it("falls back to the legacy full-list endpoint when the paged route 404s", async () => { + const legacy = [ + { id: "m1", role: "user", content: "hi", created_at: "2026-06-01T00:00:00Z" }, + { id: "m2", role: "assistant", content: "yo", created_at: "2026-06-01T00:00:01Z" }, + ]; + const fetchMock = vi + .fn() + .mockResolvedValueOnce(jsonResponse({ error: "not found" }, 404, "Not Found")) + .mockResolvedValueOnce(jsonResponse(legacy, 200)); + vi.stubGlobal("fetch", fetchMock); + + const client = new ApiClient("https://api.example.test"); + const page = await client.listChatMessagesPage("session-1", { limit: 50 }); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(fetchMock.mock.calls[0]![0]).toBe( + "https://api.example.test/api/chat/sessions/session-1/messages/page?limit=50", + ); + expect(fetchMock.mock.calls[1]![0]).toBe( + "https://api.example.test/api/chat/sessions/session-1/messages", + ); + expect(page).toEqual({ messages: legacy, limit: 50, has_more: false, next_cursor: null }); + }); + + it("does NOT fall back on a cursor request — a 404 there propagates", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(jsonResponse({ error: "not found" }, 404, "Not Found")); + vi.stubGlobal("fetch", fetchMock); + + const client = new ApiClient("https://api.example.test"); + await expect( + client.listChatMessagesPage("session-1", { + before: { created_at: "2026-06-01T00:00:00Z", id: "m1" }, + }), + ).rejects.toBeInstanceOf(ApiError); + // Only the paged request fires; no legacy full-list call that would duplicate messages. + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + + it("propagates non-404 errors instead of masking them with the legacy list", async () => { + const fetchMock = vi + .fn() + .mockResolvedValue(jsonResponse({ error: "boom" }, 500, "Internal Server Error")); + vi.stubGlobal("fetch", fetchMock); + + const client = new ApiClient("https://api.example.test"); + await expect(client.listChatMessagesPage("session-1")).rejects.toMatchObject({ + status: 500, + }); + expect(fetchMock).toHaveBeenCalledTimes(1); + }); + }); + describe("chat attachment wiring", () => { it("uploadFile includes chat_session_id in the FormData body", async () => { const fetchMock = vi.fn().mockResolvedValue( diff --git a/packages/core/api/client.ts b/packages/core/api/client.ts index 156ea2e1d..02f5f0d3b 100644 --- a/packages/core/api/client.ts +++ b/packages/core/api/client.ts @@ -61,6 +61,7 @@ import type { Attachment, ChatSession, ChatMessage, + ChatMessagesPage, ChatPendingTask, PendingChatTasksResponse, SendChatMessageResponse, @@ -1589,6 +1590,37 @@ export class ApiClient { return this.fetch(`/api/chat/sessions/${sessionId}/messages`); } + async listChatMessagesPage( + sessionId: string, + params: { before?: { created_at: string; id: string } | null; limit?: number } = {}, + ): Promise { + const limit = params.limit ?? 50; + const query = new URLSearchParams({ limit: String(limit) }); + if (params.before) { + query.set("before_created_at", params.before.created_at); + query.set("before_id", params.before.id); + } + try { + return await this.fetch( + `/api/chat/sessions/${sessionId}/messages/page?${query.toString()}`, + ); + } catch (err) { + // Deployment-order compatibility: a backend deployed before this endpoint + // existed returns 404 for the unknown route. Fall back to the legacy + // full-list endpoint so chat never white-screens regardless of whether + // the server or the client deploys first. Only the initial (cursorless) + // page falls back — the legacy endpoint returns every message at once, so + // the fallback page reports has_more: false and there is no follow-up + // request to translate. A 404 on a cursor request is an unexpected state + // and propagates instead of duplicating the whole list. + if (err instanceof ApiError && err.status === 404 && !params.before) { + const messages = await this.listChatMessages(sessionId); + return { messages, limit, has_more: false, next_cursor: null }; + } + throw err; + } + } + async sendChatMessage( sessionId: string, content: string, diff --git a/packages/core/chat/queries.ts b/packages/core/chat/queries.ts index bedd15403..912de1f6e 100644 --- a/packages/core/chat/queries.ts +++ b/packages/core/chat/queries.ts @@ -1,4 +1,4 @@ -import { queryOptions } from "@tanstack/react-query"; +import { infiniteQueryOptions, queryOptions } from "@tanstack/react-query"; import { api } from "../api"; // NOTE on workspace scoping: @@ -14,6 +14,7 @@ export const chatKeys = { sessions: (wsId: string) => [...chatKeys.all(wsId), "sessions"] as const, session: (wsId: string, id: string) => [...chatKeys.all(wsId), "session", id] as const, messages: (sessionId: string) => ["chat", "messages", sessionId] as const, + messagesPage: (sessionId: string) => ["chat", "messages-page", sessionId] as const, pendingTask: (sessionId: string) => ["chat", "pending-task", sessionId] as const, /** Aggregate of in-flight chat tasks for the current user — FAB reads this. */ pendingTasks: (wsId: string) => [...chatKeys.all(wsId), "pending-tasks"] as const, @@ -53,6 +54,19 @@ export function chatMessagesOptions(sessionId: string) { }); } +export function chatMessagesPageOptions(sessionId: string, limit = 50) { + return infiniteQueryOptions({ + queryKey: chatKeys.messagesPage(sessionId), + queryFn: ({ pageParam }) => + api.listChatMessagesPage(sessionId, { before: pageParam, limit }), + initialPageParam: null as { created_at: string; id: string } | null, + getNextPageParam: (lastPage) => + lastPage.has_more ? lastPage.next_cursor ?? undefined : undefined, + enabled: !!sessionId, + staleTime: Infinity, + }); +} + /** * Pending task for a chat session — the "is something still running?" signal. * Refetched via WS invalidation in useRealtimeSync when chat:message / chat:done diff --git a/packages/core/realtime/use-realtime-sync.test.ts b/packages/core/realtime/use-realtime-sync.test.ts index 7bf822ac6..fc0d64396 100644 --- a/packages/core/realtime/use-realtime-sync.test.ts +++ b/packages/core/realtime/use-realtime-sync.test.ts @@ -1,4 +1,4 @@ -import { QueryClient } from "@tanstack/react-query"; +import { QueryClient, type InfiniteData } from "@tanstack/react-query"; import { describe, expect, it, vi } from "vitest"; import { chatKeys } from "../chat/queries"; import { issueKeys } from "../issues/queries"; @@ -7,6 +7,7 @@ import type { ChatDonePayload, ChatMessage, ChatPendingTask, + ChatMessagesPage, Workspace, } from "../types"; import { @@ -64,7 +65,7 @@ describe("applyChatDoneToCache", () => { applyChatDoneToCache(qc, donePayload()); expect(setQueryData.mock.calls[0]?.[0]).toEqual(messagesKey); - expect(setQueryData.mock.calls[1]?.[0]).toEqual(pendingKey); + expect(setQueryData.mock.calls[2]?.[0]).toEqual(pendingKey); expect(qc.getQueryData(pendingKey)).toEqual({}); expect(qc.getQueryData(messagesKey)).toEqual([ userMessage(), @@ -201,3 +202,34 @@ describe("applyWorkspaceUpdatedToCache", () => { }); }); }); + + +describe("applyChatDoneToCache paged messages", () => { + it("patches page zero and skips older pages without duplicating replayed events", () => { + const qc = createQueryClient(); + const older = userMessage(); + const latest: ChatMessage = { + id: "msg-latest", + chat_session_id: sessionId, + role: "user", + content: "latest", + task_id: null, + created_at: "2026-05-13T05:00:01Z", + }; + qc.setQueryData>(chatKeys.messagesPage(sessionId), { + pages: [ + { messages: [latest], limit: 1, has_more: true, next_cursor: { created_at: latest.created_at, id: latest.id } }, + { messages: [older], limit: 1, has_more: false, next_cursor: null }, + ], + pageParams: [null, { created_at: latest.created_at, id: latest.id }], + }); + + applyChatDoneToCache(qc, donePayload()); + applyChatDoneToCache(qc, donePayload()); + + const paged = qc.getQueryData>(chatKeys.messagesPage(sessionId)); + + expect(paged?.pages[0]?.messages.map((m) => m.id)).toEqual(["msg-latest", "msg-assistant"]); + expect(paged?.pages[1]?.messages.map((m) => m.id)).toEqual(["msg-user"]); + }); +}); diff --git a/packages/core/realtime/use-realtime-sync.ts b/packages/core/realtime/use-realtime-sync.ts index 2d31176b0..3674a8e83 100644 --- a/packages/core/realtime/use-realtime-sync.ts +++ b/packages/core/realtime/use-realtime-sync.ts @@ -1,7 +1,7 @@ "use client"; import { useEffect, useRef } from "react"; -import { useQueryClient, type QueryClient } from "@tanstack/react-query"; +import { useQueryClient, type InfiniteData, type QueryClient } from "@tanstack/react-query"; import type { WSClient } from "../api/ws-client"; import type { StoreApi, UseBoundStore } from "zustand"; import type { AuthState } from "../auth/store"; @@ -71,6 +71,7 @@ import type { ChatDonePayload, ChatMessage, ChatPendingTask, + ChatMessagesPage, InvitationCreatedPayload, } from "../types"; @@ -87,33 +88,57 @@ export function applyChatDoneToCache( const messageId = payload.message_id; const content = payload.content; if (messageId && content !== undefined) { + const assistant: ChatMessage = { + id: messageId, + chat_session_id: sessionId, + role: "assistant", + content, + task_id: taskId, + created_at: payload.created_at ?? new Date().toISOString(), + elapsed_ms: payload.elapsed_ms ?? null, + }; qc.setQueryData( chatKeys.messages(sessionId), (old) => { if (!old) return old; // first fetch will pick it up // Idempotent against reconnect replay. if (old.some((m) => m.id === messageId)) return old; - const assistant: ChatMessage = { - id: messageId, - chat_session_id: sessionId, - role: "assistant", - content, - task_id: taskId, - created_at: payload.created_at ?? new Date().toISOString(), - elapsed_ms: payload.elapsed_ms ?? null, - }; return [...old, assistant]; }, ); + qc.setQueryData | undefined>( + chatKeys.messagesPage(sessionId), + (old) => patchLatestChatMessagePage(old, assistant), + ); } // Replacement is in the messages list now; safe to drop pending. qc.setQueryData(chatKeys.pendingTask(sessionId), {}); // Authoritative refetch reconciles redaction / migrations / clients // that took the fallback branch above. qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) }); + qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) }); qc.invalidateQueries({ queryKey: chatKeys.pendingTask(sessionId) }); } +function patchLatestChatMessagePage( + old: InfiniteData | undefined, + message: ChatMessage, +): InfiniteData | undefined { + if (!old?.pages.length) return old; + const seen = old.pages.some((page) => page.messages.some((m) => m.id === message.id)); + if (seen) return old; + return { + ...old, + pages: old.pages.map((page, index) => { + if (index !== 0) return page; + return { + ...page, + messages: [...page.messages, message], + }; + }), + }; +} + /** * Apply a workspace:updated event to the cache. Always refreshes the * workspace list. If the incoming `issue_prefix` differs from what's diff --git a/packages/core/types/chat.ts b/packages/core/types/chat.ts index 50ce7ccb5..762b01a6a 100644 --- a/packages/core/types/chat.ts +++ b/packages/core/types/chat.ts @@ -55,6 +55,18 @@ export interface ChatMessage { elapsed_ms?: number | null; } +export interface ChatMessagesCursor { + created_at: string; + id: string; +} + +export interface ChatMessagesPage { + messages: ChatMessage[]; + limit: number; + has_more: boolean; + next_cursor?: ChatMessagesCursor | null; +} + export interface SendChatMessageResponse { message_id: string; task_id: string; diff --git a/packages/core/types/index.ts b/packages/core/types/index.ts index e1f8cc893..445607ebc 100644 --- a/packages/core/types/index.ts +++ b/packages/core/types/index.ts @@ -65,7 +65,7 @@ export type { IssueSubscriber } from "./subscriber"; export type * from "./events"; export type * from "./api"; export type { Attachment } from "./attachment"; -export type { ChatSession, ChatMessage, ChatPendingTask, PendingChatTaskItem, PendingChatTasksResponse, SendChatMessageResponse } from "./chat"; +export type { ChatSession, ChatMessage, ChatMessagesPage, ChatPendingTask, PendingChatTaskItem, PendingChatTasksResponse, SendChatMessageResponse } from "./chat"; export type { StorageAdapter } from "./storage"; export type { Project, diff --git a/packages/views/chat/components/chat-message-list.tsx b/packages/views/chat/components/chat-message-list.tsx index 8d67297b6..474f861c8 100644 --- a/packages/views/chat/components/chat-message-list.tsx +++ b/packages/views/chat/components/chat-message-list.tsx @@ -1,8 +1,9 @@ "use client"; -import { useState, useRef } from "react"; +import { useCallback, useRef, useState } from "react"; import { toast } from "sonner"; import { useQuery } from "@tanstack/react-query"; +import { Virtuoso } from "react-virtuoso"; import { cn } from "@multica/ui/lib/utils"; import { Skeleton } from "@multica/ui/components/ui/skeleton"; import { Button } from "@multica/ui/components/ui/button"; @@ -18,7 +19,6 @@ import { } from "@multica/ui/components/ui/tooltip"; import { ChevronRight, ChevronDown, Brain, AlertCircle, AlertTriangle, Copy } from "lucide-react"; import { useScrollFade } from "@multica/ui/hooks/use-scroll-fade"; -import { useAutoScroll } from "@multica/ui/hooks/use-auto-scroll"; import { isTaskMessageTaskId, taskMessagesOptions } from "@multica/core/chat/queries"; import { Markdown } from "@multica/views/common/markdown"; import { copyMarkdown } from "../../editor"; @@ -44,16 +44,30 @@ interface ChatMessageListProps { pendingTask: ChatPendingTask | null | undefined; /** Resolved presence; pass `undefined` while loading to keep the pill copy neutral. */ availability: AgentAvailability | undefined; + firstItemIndex?: number; + hasOlderMessages?: boolean; + isFetchingOlderMessages?: boolean; + onLoadOlderMessages?: () => void; } export function ChatMessageList({ messages, pendingTask, availability, + firstItemIndex = 0, + hasOlderMessages = false, + isFetchingOlderMessages = false, + onLoadOlderMessages, }: ChatMessageListProps) { const scrollRef = useRef(null); + const [scrollContainerEl, setScrollContainerEl] = useState(null); + const [isNearBottom, setIsNearBottom] = useState(true); + const setScrollContainerRef = useCallback((node: HTMLDivElement | null) => { + scrollRef.current = node; + setScrollContainerEl(node); + }, []); const fadeStyle = useScrollFade(scrollRef); - useAutoScroll(scrollRef); + const { t } = useT("chat"); const pendingTaskId = pendingTask?.task_id ?? null; @@ -77,38 +91,70 @@ export function ChatMessageList({ const hasLive = showLiveTimeline && liveTimeline.length > 0; const showStatusPill = !!pendingTaskId && !pendingAlreadyPersisted && !!pendingTask; + const totalCount = messages.length + (hasLive || showStatusPill ? 1 : 0); + const firstIndex = totalCount > 0 ? firstItemIndex : 0; + return (
- {/* Inner container matches issue / project detail width convention - * (max-w-4xl + mx-auto) so switching between chat and content - * views doesn't jolt the reading width. px-5 is a touch tighter - * than issue-detail's px-8 because the chat window can be narrow. */} -
- {messages.map((msg) => ( - - ))} - {hasLive && ( -
- + {!scrollContainerEl ? ( +
+ +
+ ) : ( + (!isFetchingOlderMessages && isNearBottom ? "smooth" : false)} + startReached={() => { + if (hasOlderMessages && !isFetchingOlderMessages) { + onLoadOlderMessages?.(); + } + }} + computeItemKey={(_, msg) => msg.id} + components={{ + Header: () => ( +
+ {isFetchingOlderMessages && ( +
{t(($) => $.message_list.loading_older)}
+ )} +
+ ), + Footer: () => ( +
+ {hasLive && ( +
+ +
+ )} + {showStatusPill && pendingTask && ( + + )} +
+ ), + }} + itemContent={(_, msg) => ( +
+
)} - {showStatusPill && pendingTask && ( - - )} -
+ /> + )}
); } diff --git a/packages/views/chat/components/chat-window.tsx b/packages/views/chat/components/chat-window.tsx index 766a5a653..be24b7158 100644 --- a/packages/views/chat/components/chat-window.tsx +++ b/packages/views/chat/components/chat-window.tsx @@ -1,7 +1,7 @@ "use client"; import React, { useCallback, useEffect, useMemo, useRef, useState } from "react"; -import { useQuery, useQueryClient } from "@tanstack/react-query"; +import { useInfiniteQuery, useQuery, useQueryClient, type InfiniteData } from "@tanstack/react-query"; import { motion } from "motion/react"; import { Minus, Maximize2, Minimize2, ChevronDown, Plus, Check, Trash2, Pencil, Loader2, Square } from "lucide-react"; import { Button } from "@multica/ui/components/ui/button"; @@ -33,7 +33,7 @@ import { OfflineBanner } from "./offline-banner"; import { NoAgentBanner } from "./no-agent-banner"; import { chatSessionsOptions, - chatMessagesOptions, + chatMessagesPageOptions, pendingChatTaskOptions, pendingChatTasksOptions, chatKeys, @@ -56,11 +56,31 @@ import { import { ChatResizeHandles } from "./chat-resize-handles"; import { useChatResize } from "./use-chat-resize"; import { createLogger } from "@multica/core/logger"; -import type { Agent, ChatMessage, ChatPendingTask, ChatSession, PendingChatTasksResponse } from "@multica/core/types"; +import type { Agent, ChatMessage, ChatMessagesPage, ChatPendingTask, ChatSession, PendingChatTasksResponse } from "@multica/core/types"; import { useT } from "../../i18n"; const uiLogger = createLogger("chat.ui"); const apiLogger = createLogger("chat.api"); +const CHAT_VIRTUOSO_INITIAL_FIRST_ITEM_INDEX = 1_000_000; + +function seedChatMessagesPageCache( + qc: ReturnType, + sessionId: string, + messages: ChatMessage[], +) { + qc.setQueryData>( + chatKeys.messagesPage(sessionId), + (old) => old ?? { + pages: [{ + messages, + limit: 50, + has_more: false, + next_cursor: null, + }], + pageParams: [null], + }, + ); +} export function ChatWindow() { const { t } = useT("chat"); @@ -77,11 +97,25 @@ export function ChatWindow() { // Single sessions cache — eliminates the separate active/all queries // that used to drift during the WS-invalidate window. const { data: sessions = [] } = useQuery(chatSessionsOptions(wsId)); - const { data: rawMessages, isLoading: messagesLoading } = useQuery( - chatMessagesOptions(activeSessionId ?? ""), - ); - // When no active session, always show empty — don't use stale cache - const messages = activeSessionId ? rawMessages ?? [] : []; + const { + data: rawMessagePages, + isLoading: messagesLoading, + fetchNextPage: fetchOlderMessages, + hasNextPage: hasOlderMessages, + isFetchingNextPage: isFetchingOlderMessages, + } = useInfiniteQuery(chatMessagesPageOptions(activeSessionId ?? "")); + // When no active session, always show empty — don't use stale cache. + // Page 0 contains the latest chronological window; later cursor pages are + // older chronological windows. Reverse pages so older fetched pages render + // above the initial latest page. The Virtuoso firstItemIndex is client-owned: + // it starts from a large stable base and only subtracts the count of loaded + // prepended rows, so concurrent server inserts cannot drift the scroll anchor. + const messagePages = activeSessionId ? rawMessagePages?.pages ?? [] : []; + const messages = [...messagePages].reverse().flatMap((page) => page.messages); + const olderMessageCount = messagePages.slice(1).reduce((sum, page) => sum + page.messages.length, 0); + const firstItemIndex = messages.length > 0 + ? CHAT_VIRTUOSO_INITIAL_FIRST_ITEM_INDEX - olderMessageCount + : 0; // Skeleton only shows for an un-cached session fetch. Cached switches // return data synchronously — no flash. `enabled: false` (new chat) // keeps isLoading false so the starter prompts aren't hidden. @@ -244,6 +278,7 @@ export function ChatWindow() { // ChatMessageList mounts directly (no Skeleton frame). Skip the write // when an entry already exists — a concurrent handleSend may have // seeded an optimistic message we must not clobber. + seedChatMessagesPageCache(qc, sessionId, []); qc.setQueryData( chatKeys.messages(sessionId), (old) => old ?? [], @@ -303,6 +338,7 @@ export function ChatWindow() { // "new-chat first-message" white flash. Priming the cache first means // the very first read after activeSessionId flips hits data // synchronously and ChatMessageList mounts directly. + seedChatMessagesPageCache(qc, sessionId, [optimistic]); qc.setQueryData( chatKeys.messages(sessionId), (old) => (old ? [...old, optimistic] : [optimistic]), @@ -337,6 +373,7 @@ export function ChatWindow() { created_at: result.created_at, }); qc.invalidateQueries({ queryKey: chatKeys.messages(sessionId) }); + qc.invalidateQueries({ queryKey: chatKeys.messagesPage(sessionId) }); }, [ activeSessionId, @@ -362,6 +399,7 @@ export function ChatWindow() { apiLogger.info("cancelTask.start", { taskId: pendingTaskId, sessionId: activeSessionId }); qc.setQueryData(chatKeys.pendingTask(activeSessionId), {}); qc.invalidateQueries({ queryKey: chatKeys.messages(activeSessionId) }); + qc.invalidateQueries({ queryKey: chatKeys.messagesPage(activeSessionId) }); // Fire-and-forget — UI is already in its post-cancel state. We log the // outcome but never block on it. api.cancelTaskById(pendingTaskId).then( @@ -531,9 +569,14 @@ export function ChatWindow() { ) : hasMessages ? ( void fetchOlderMessages()} /> ) : ( apiLogger.info("cancelTask.success (history row)", { taskId: task.task_id, sessionId: session.id }), diff --git a/packages/views/locales/en/chat.json b/packages/views/locales/en/chat.json index f6a5de813..ea3153957 100644 --- a/packages/views/locales/en/chat.json +++ b/packages/views/locales/en/chat.json @@ -26,7 +26,8 @@ "process_steps_other": "{{count}} steps", "copy_action": "Copy", "copied_toast": "Copied", - "copy_failed_toast": "Copy failed" + "copy_failed_toast": "Copy failed", + "loading_older": "Loading older messages…" }, "session_history": { "untitled": "Untitled", diff --git a/packages/views/locales/ja/chat.json b/packages/views/locales/ja/chat.json index 2fea3ab2a..76943a3f8 100644 --- a/packages/views/locales/ja/chat.json +++ b/packages/views/locales/ja/chat.json @@ -23,7 +23,8 @@ "process_steps_other": "ステップ {{count}}個", "copy_action": "コピー", "copied_toast": "コピーしました", - "copy_failed_toast": "コピーに失敗しました" + "copy_failed_toast": "コピーに失敗しました", + "loading_older": "古いメッセージを読み込み中…" }, "session_history": { "untitled": "無題", diff --git a/packages/views/locales/ko/chat.json b/packages/views/locales/ko/chat.json index 57f99d7f7..f0086a945 100644 --- a/packages/views/locales/ko/chat.json +++ b/packages/views/locales/ko/chat.json @@ -26,7 +26,8 @@ "process_steps_other": "단계 {{count}}개", "copy_action": "복사", "copied_toast": "복사했습니다", - "copy_failed_toast": "복사 실패" + "copy_failed_toast": "복사 실패", + "loading_older": "이전 메시지를 불러오는 중…" }, "session_history": { "untitled": "제목 없음", diff --git a/packages/views/locales/zh-Hans/chat.json b/packages/views/locales/zh-Hans/chat.json index 7d48bce79..a058a4701 100644 --- a/packages/views/locales/zh-Hans/chat.json +++ b/packages/views/locales/zh-Hans/chat.json @@ -23,7 +23,8 @@ "process_steps_other": "{{count}} 步", "copy_action": "复制", "copied_toast": "已复制", - "copy_failed_toast": "复制失败" + "copy_failed_toast": "复制失败", + "loading_older": "正在加载更早的消息…" }, "session_history": { "untitled": "无标题", diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 88bb31329..7e5a3b48a 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -707,6 +707,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus r.Delete("/", h.DeleteChatSession) r.Post("/messages", h.SendChatMessage) r.Get("/messages", h.ListChatMessages) + r.Get("/messages/page", h.ListChatMessagesPage) r.Get("/pending-task", h.GetPendingChatTask) r.Post("/read", h.MarkChatSessionRead) }) diff --git a/server/internal/handler/chat.go b/server/internal/handler/chat.go index 4fa9953a4..5ca59b3f6 100644 --- a/server/internal/handler/chat.go +++ b/server/internal/handler/chat.go @@ -5,12 +5,15 @@ import ( "errors" "log/slog" "net/http" + "strconv" "strings" + "time" "github.com/go-chi/chi/v5" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" "github.com/multica-ai/multica/server/internal/analytics" + "github.com/multica-ai/multica/server/internal/util" db "github.com/multica-ai/multica/server/pkg/db/generated" "github.com/multica-ai/multica/server/pkg/protocol" ) @@ -500,6 +503,47 @@ func (h *Handler) SendChatMessage(w http.ResponseWriter, r *http.Request) { }) } +type ChatMessagesCursorResponse struct { + CreatedAt string `json:"created_at"` + ID string `json:"id"` +} + +type ChatMessagesPageResponse struct { + Messages []ChatMessageResponse `json:"messages"` + Limit int `json:"limit"` + HasMore bool `json:"has_more"` + NextCursor *ChatMessagesCursorResponse `json:"next_cursor,omitempty"` +} + +func parseChatMessagesPageParams(r *http.Request) (int, pgtype.Timestamptz, pgtype.UUID, error) { + limit := 50 + if raw := r.URL.Query().Get("limit"); raw != "" { + parsed, err := strconv.Atoi(raw) + if err != nil || parsed < 1 || parsed > 100 { + return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid limit") + } + limit = parsed + } + + rawBeforeCreatedAt := r.URL.Query().Get("before_created_at") + rawBeforeID := r.URL.Query().Get("before_id") + if rawBeforeCreatedAt == "" && rawBeforeID == "" { + return limit, pgtype.Timestamptz{}, pgtype.UUID{}, nil + } + if rawBeforeCreatedAt == "" || rawBeforeID == "" { + return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor") + } + beforeTime, err := time.Parse(time.RFC3339Nano, rawBeforeCreatedAt) + if err != nil { + return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor") + } + beforeID, err := util.ParseUUID(rawBeforeID) + if err != nil { + return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor") + } + return limit, pgtype.Timestamptz{Time: beforeTime, Valid: true}, beforeID, nil +} + func (h *Handler) ListChatMessages(w http.ResponseWriter, r *http.Request) { userID, ok := requireUserID(w, r) if !ok { @@ -532,6 +576,72 @@ func (h *Handler) ListChatMessages(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, resp) } +func (h *Handler) ListChatMessagesPage(w http.ResponseWriter, r *http.Request) { + userID, ok := requireUserID(w, r) + if !ok { + return + } + workspaceID := ctxWorkspaceID(r.Context()) + sessionID := chi.URLParam(r, "sessionId") + + session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID) + if !ok { + return + } + + limit, beforeCreatedAt, beforeID, err := parseChatMessagesPageParams(r) + if err != nil { + writeError(w, http.StatusBadRequest, err.Error()) + return + } + + messages, err := h.Queries.ListChatMessagesPage(r.Context(), db.ListChatMessagesPageParams{ + ChatSessionID: session.ID, + Limit: int32(limit + 1), + BeforeCreatedAt: beforeCreatedAt, + BeforeID: beforeID, + }) + if err != nil { + writeError(w, http.StatusInternalServerError, "failed to list chat messages") + return + } + hasMore := len(messages) > limit + if hasMore { + messages = messages[:limit] + } + var nextCursor *ChatMessagesCursorResponse + if hasMore && len(messages) > 0 { + oldest := messages[len(messages)-1] + nextCursor = &ChatMessagesCursorResponse{ + CreatedAt: oldest.CreatedAt.Time.Format(time.RFC3339Nano), + ID: uuidToString(oldest.ID), + } + } + // SQL fetches newest windows first so the empty cursor opens at the recent + // tail. Reverse each cursor page before serializing to keep message order + // chronological within the viewport. + for i, j := 0, len(messages)-1; i < j; i, j = i+1, j-1 { + messages[i], messages[j] = messages[j], messages[i] + } + + messageIDs := make([]pgtype.UUID, len(messages)) + for i, m := range messages { + messageIDs[i] = m.ID + } + groupedAtt := h.groupChatMessageAttachments(r.Context(), workspaceID, messageIDs) + + resp := make([]ChatMessageResponse, len(messages)) + for i, m := range messages { + resp[i] = chatMessageToResponse(m, groupedAtt[uuidToString(m.ID)]) + } + writeJSON(w, http.StatusOK, ChatMessagesPageResponse{ + Messages: resp, + Limit: limit, + HasMore: hasMore, + NextCursor: nextCursor, + }) +} + // PendingChatTaskResponse is returned by GetPendingChatTask — either the // current in-flight task's id/status, or an empty object when none is active. // CreatedAt is the anchor the frontend uses to time the chat StatusPill diff --git a/server/internal/handler/chat_test.go b/server/internal/handler/chat_test.go index f5986e4bc..78bb8ea5d 100644 --- a/server/internal/handler/chat_test.go +++ b/server/internal/handler/chat_test.go @@ -7,6 +7,7 @@ import ( "mime/multipart" "net/http" "net/http/httptest" + "net/url" "testing" "github.com/multica-ai/multica/server/internal/middleware" @@ -194,3 +195,162 @@ func TestSendChatMessage_InvalidAttachmentIDs(t *testing.T) { t.Fatalf("expected 0 chat_message rows after rejected send, got %d", count) } } + +func fetchChatMessagesPageForTest(t *testing.T, sessionID string, params url.Values) ChatMessagesPageResponse { + t.Helper() + target := "/api/chat/sessions/" + sessionID + "/messages/page" + if encoded := params.Encode(); encoded != "" { + target += "?" + encoded + } + req := httptest.NewRequest(http.MethodGet, target, nil) + req.Header.Set("X-User-ID", testUserID) + req = withURLParam(req, "sessionId", sessionID) + req = withChatTestWorkspaceCtx(t, req) + w := httptest.NewRecorder() + testHandler.ListChatMessagesPage(w, req) + if w.Code != http.StatusOK { + t.Fatalf("ListChatMessagesPage: expected 200, got %d: %s", w.Code, w.Body.String()) + } + var page ChatMessagesPageResponse + if err := json.Unmarshal(w.Body.Bytes(), &page); err != nil { + t.Fatalf("decode page messages: %v", err) + } + return page +} + +func TestListChatMessagesPage_UsesCursorWithoutChangingLegacyList(t *testing.T) { + agentID := createHandlerTestAgent(t, "ChatCursorPaginationAgent", []byte("[]")) + sessionID := createHandlerTestChatSession(t, agentID) + + for i, content := range []string{"oldest", "middle", "newest"} { + _, err := testPool.Exec( + context.Background(), + `INSERT INTO chat_message (chat_session_id, role, content, created_at) + VALUES ($1, 'user', $2, timestamp '2026-01-01 00:00:00' + ($3::int * interval '1 second'))`, + sessionID, + content, + i, + ) + if err != nil { + t.Fatalf("insert chat message %d: %v", i, err) + } + } + + legacyReq := httptest.NewRequest(http.MethodGet, "/api/chat/sessions/"+sessionID+"/messages", nil) + legacyReq.Header.Set("X-User-ID", testUserID) + legacyReq = withURLParam(legacyReq, "sessionId", sessionID) + legacyReq = withChatTestWorkspaceCtx(t, legacyReq) + legacyW := httptest.NewRecorder() + testHandler.ListChatMessages(legacyW, legacyReq) + if legacyW.Code != http.StatusOK { + t.Fatalf("ListChatMessages: expected 200, got %d: %s", legacyW.Code, legacyW.Body.String()) + } + var legacy []ChatMessageResponse + if err := json.Unmarshal(legacyW.Body.Bytes(), &legacy); err != nil { + t.Fatalf("decode legacy messages: %v", err) + } + if len(legacy) != 3 || legacy[0].Content != "oldest" || legacy[2].Content != "newest" { + t.Fatalf("legacy messages = %#v", legacy) + } + + latest := fetchChatMessagesPageForTest(t, sessionID, url.Values{"limit": {"2"}}) + if latest.Limit != 2 || !latest.HasMore || latest.NextCursor == nil { + t.Fatalf("latest page metadata = %#v", latest) + } + if len(latest.Messages) != 2 || latest.Messages[0].Content != "middle" || latest.Messages[1].Content != "newest" { + t.Fatalf("latest page messages = %#v", latest) + } + + older := fetchChatMessagesPageForTest(t, sessionID, url.Values{ + "limit": {"2"}, + "before_created_at": {latest.NextCursor.CreatedAt}, + "before_id": {latest.NextCursor.ID}, + }) + if older.HasMore || older.NextCursor != nil { + t.Fatalf("older page metadata = %#v", older) + } + if len(older.Messages) != 1 || older.Messages[0].Content != "oldest" { + t.Fatalf("older page messages = %#v", older) + } +} + +func TestListChatMessagesPage_CursorTieBreaksSameTimestampWithoutDupesOrGaps(t *testing.T) { + agentID := createHandlerTestAgent(t, "ChatCursorTieBreakAgent", []byte("[]")) + sessionID := createHandlerTestChatSession(t, agentID) + + contents := []string{"a", "b", "c", "d", "e"} + for _, content := range contents { + _, err := testPool.Exec( + context.Background(), + `INSERT INTO chat_message (chat_session_id, role, content, created_at) + VALUES ($1, 'user', $2, timestamp '2026-01-01 00:00:00')`, + sessionID, + content, + ) + if err != nil { + t.Fatalf("insert chat message %q: %v", content, err) + } + } + + seen := map[string]bool{} + var ordered []string + params := url.Values{"limit": {"2"}} + for { + page := fetchChatMessagesPageForTest(t, sessionID, params) + for _, msg := range page.Messages { + if seen[msg.ID] { + t.Fatalf("duplicate message id %s across cursor pages", msg.ID) + } + seen[msg.ID] = true + ordered = append(ordered, msg.Content) + } + if !page.HasMore { + if page.NextCursor != nil { + t.Fatalf("terminal page has next cursor: %#v", page.NextCursor) + } + break + } + if page.NextCursor == nil { + t.Fatalf("has_more page missing next cursor: %#v", page) + } + params = url.Values{ + "limit": {"2"}, + "before_created_at": {page.NextCursor.CreatedAt}, + "before_id": {page.NextCursor.ID}, + } + } + + if len(ordered) != len(contents) { + t.Fatalf("expected %d messages across pages, got %d: %v", len(contents), len(ordered), ordered) + } + // Pages are newest-window first and chronological within each page. With all + // timestamps equal, the id tie-break must still produce a deterministic, + // gap-free traversal. + for _, content := range contents { + found := false + for _, got := range ordered { + if got == content { + found = true + break + } + } + if !found { + t.Fatalf("missing content %q across cursor pages: %v", content, ordered) + } + } +} + +func TestListChatMessagesPage_RejectsInvalidLimit(t *testing.T) { + agentID := createHandlerTestAgent(t, "ChatPaginationBadLimitAgent", []byte("[]")) + sessionID := createHandlerTestChatSession(t, agentID) + + req := httptest.NewRequest(http.MethodGet, "/api/chat/sessions/"+sessionID+"/messages/page?limit=0", nil) + req.Header.Set("X-User-ID", testUserID) + req = withURLParam(req, "sessionId", sessionID) + req = withChatTestWorkspaceCtx(t, req) + w := httptest.NewRecorder() + testHandler.ListChatMessagesPage(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("ListChatMessagesPage invalid limit: expected 400, got %d: %s", w.Code, w.Body.String()) + } +} diff --git a/server/pkg/db/generated/chat.sql.go b/server/pkg/db/generated/chat.sql.go index d6cfe7c36..38aa1f404 100644 --- a/server/pkg/db/generated/chat.sql.go +++ b/server/pkg/db/generated/chat.sql.go @@ -396,6 +396,58 @@ func (q *Queries) ListChatMessages(ctx context.Context, chatSessionID pgtype.UUI return items, nil } +const listChatMessagesPage = `-- name: ListChatMessagesPage :many +SELECT id, chat_session_id, role, content, task_id, created_at, failure_reason, elapsed_ms FROM chat_message +WHERE chat_session_id = $1 + AND ( + $3::timestamptz IS NULL + OR (created_at, id) < ($3::timestamptz, $4::uuid) + ) +ORDER BY created_at DESC, id DESC +LIMIT $2 +` + +type ListChatMessagesPageParams struct { + ChatSessionID pgtype.UUID `json:"chat_session_id"` + Limit int32 `json:"limit"` + BeforeCreatedAt pgtype.Timestamptz `json:"before_created_at"` + BeforeID pgtype.UUID `json:"before_id"` +} + +func (q *Queries) ListChatMessagesPage(ctx context.Context, arg ListChatMessagesPageParams) ([]ChatMessage, error) { + rows, err := q.db.Query(ctx, listChatMessagesPage, + arg.ChatSessionID, + arg.Limit, + arg.BeforeCreatedAt, + arg.BeforeID, + ) + if err != nil { + return nil, err + } + defer rows.Close() + items := []ChatMessage{} + for rows.Next() { + var i ChatMessage + if err := rows.Scan( + &i.ID, + &i.ChatSessionID, + &i.Role, + &i.Content, + &i.TaskID, + &i.CreatedAt, + &i.FailureReason, + &i.ElapsedMs, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listChatSessionsByCreator = `-- name: ListChatSessionsByCreator :many SELECT cs.id, cs.workspace_id, cs.agent_id, cs.creator_id, cs.title, cs.session_id, cs.work_dir, cs.status, cs.created_at, cs.updated_at, cs.unread_since, cs.runtime_id, (cs.unread_since IS NOT NULL)::bool AS has_unread diff --git a/server/pkg/db/queries/chat.sql b/server/pkg/db/queries/chat.sql index b9604e5ce..75dcf8219 100644 --- a/server/pkg/db/queries/chat.sql +++ b/server/pkg/db/queries/chat.sql @@ -83,6 +83,16 @@ SELECT * FROM chat_message WHERE chat_session_id = $1 ORDER BY created_at ASC; +-- name: ListChatMessagesPage :many +SELECT * FROM chat_message +WHERE chat_session_id = $1 + AND ( + sqlc.narg('before_created_at')::timestamptz IS NULL + OR (created_at, id) < (sqlc.narg('before_created_at')::timestamptz, sqlc.narg('before_id')::uuid) + ) +ORDER BY created_at DESC, id DESC +LIMIT $2; + -- name: GetChatMessage :one SELECT * FROM chat_message WHERE id = $1;