mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
Optimize chat message loading (#3685)
* Optimize chat message loading Co-authored-by: multica-agent <github@multica.ai> * Fix chat history cursor pagination Co-authored-by: multica-agent <github@multica.ai> * Fix chat session list remount key Co-authored-by: multica-agent <github@multica.ai> * fix(chat): fall back to legacy /messages when paged endpoint 404s Deployment-order compatibility: a backend deployed before the /messages/page endpoint existed returns 404 for the unknown route. The cursorless initial page now falls back to the legacy full-list /messages endpoint and wraps it in a single has_more:false page, so chat never white-screens regardless of which side deploys first. A 404 on a cursor request still propagates to avoid duplicating the full list. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Co-authored-by: multica-agent <github@multica.ai> --------- Co-authored-by: multica-agent <github@multica.ai> Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -707,6 +707,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
r.Delete("/", h.DeleteChatSession)
|
||||
r.Post("/messages", h.SendChatMessage)
|
||||
r.Get("/messages", h.ListChatMessages)
|
||||
r.Get("/messages/page", h.ListChatMessagesPage)
|
||||
r.Get("/pending-task", h.GetPendingChatTask)
|
||||
r.Post("/read", h.MarkChatSessionRead)
|
||||
})
|
||||
|
||||
@@ -5,12 +5,15 @@ import (
|
||||
"errors"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/multica-ai/multica/server/internal/analytics"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
@@ -500,6 +503,47 @@ func (h *Handler) SendChatMessage(w http.ResponseWriter, r *http.Request) {
|
||||
})
|
||||
}
|
||||
|
||||
type ChatMessagesCursorResponse struct {
|
||||
CreatedAt string `json:"created_at"`
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
type ChatMessagesPageResponse struct {
|
||||
Messages []ChatMessageResponse `json:"messages"`
|
||||
Limit int `json:"limit"`
|
||||
HasMore bool `json:"has_more"`
|
||||
NextCursor *ChatMessagesCursorResponse `json:"next_cursor,omitempty"`
|
||||
}
|
||||
|
||||
func parseChatMessagesPageParams(r *http.Request) (int, pgtype.Timestamptz, pgtype.UUID, error) {
|
||||
limit := 50
|
||||
if raw := r.URL.Query().Get("limit"); raw != "" {
|
||||
parsed, err := strconv.Atoi(raw)
|
||||
if err != nil || parsed < 1 || parsed > 100 {
|
||||
return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid limit")
|
||||
}
|
||||
limit = parsed
|
||||
}
|
||||
|
||||
rawBeforeCreatedAt := r.URL.Query().Get("before_created_at")
|
||||
rawBeforeID := r.URL.Query().Get("before_id")
|
||||
if rawBeforeCreatedAt == "" && rawBeforeID == "" {
|
||||
return limit, pgtype.Timestamptz{}, pgtype.UUID{}, nil
|
||||
}
|
||||
if rawBeforeCreatedAt == "" || rawBeforeID == "" {
|
||||
return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor")
|
||||
}
|
||||
beforeTime, err := time.Parse(time.RFC3339Nano, rawBeforeCreatedAt)
|
||||
if err != nil {
|
||||
return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor")
|
||||
}
|
||||
beforeID, err := util.ParseUUID(rawBeforeID)
|
||||
if err != nil {
|
||||
return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor")
|
||||
}
|
||||
return limit, pgtype.Timestamptz{Time: beforeTime, Valid: true}, beforeID, nil
|
||||
}
|
||||
|
||||
func (h *Handler) ListChatMessages(w http.ResponseWriter, r *http.Request) {
|
||||
userID, ok := requireUserID(w, r)
|
||||
if !ok {
|
||||
@@ -532,6 +576,72 @@ func (h *Handler) ListChatMessages(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
func (h *Handler) ListChatMessagesPage(w http.ResponseWriter, r *http.Request) {
|
||||
userID, ok := requireUserID(w, r)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
workspaceID := ctxWorkspaceID(r.Context())
|
||||
sessionID := chi.URLParam(r, "sessionId")
|
||||
|
||||
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
limit, beforeCreatedAt, beforeID, err := parseChatMessagesPageParams(r)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
messages, err := h.Queries.ListChatMessagesPage(r.Context(), db.ListChatMessagesPageParams{
|
||||
ChatSessionID: session.ID,
|
||||
Limit: int32(limit + 1),
|
||||
BeforeCreatedAt: beforeCreatedAt,
|
||||
BeforeID: beforeID,
|
||||
})
|
||||
if err != nil {
|
||||
writeError(w, http.StatusInternalServerError, "failed to list chat messages")
|
||||
return
|
||||
}
|
||||
hasMore := len(messages) > limit
|
||||
if hasMore {
|
||||
messages = messages[:limit]
|
||||
}
|
||||
var nextCursor *ChatMessagesCursorResponse
|
||||
if hasMore && len(messages) > 0 {
|
||||
oldest := messages[len(messages)-1]
|
||||
nextCursor = &ChatMessagesCursorResponse{
|
||||
CreatedAt: oldest.CreatedAt.Time.Format(time.RFC3339Nano),
|
||||
ID: uuidToString(oldest.ID),
|
||||
}
|
||||
}
|
||||
// SQL fetches newest windows first so the empty cursor opens at the recent
|
||||
// tail. Reverse each cursor page before serializing to keep message order
|
||||
// chronological within the viewport.
|
||||
for i, j := 0, len(messages)-1; i < j; i, j = i+1, j-1 {
|
||||
messages[i], messages[j] = messages[j], messages[i]
|
||||
}
|
||||
|
||||
messageIDs := make([]pgtype.UUID, len(messages))
|
||||
for i, m := range messages {
|
||||
messageIDs[i] = m.ID
|
||||
}
|
||||
groupedAtt := h.groupChatMessageAttachments(r.Context(), workspaceID, messageIDs)
|
||||
|
||||
resp := make([]ChatMessageResponse, len(messages))
|
||||
for i, m := range messages {
|
||||
resp[i] = chatMessageToResponse(m, groupedAtt[uuidToString(m.ID)])
|
||||
}
|
||||
writeJSON(w, http.StatusOK, ChatMessagesPageResponse{
|
||||
Messages: resp,
|
||||
Limit: limit,
|
||||
HasMore: hasMore,
|
||||
NextCursor: nextCursor,
|
||||
})
|
||||
}
|
||||
|
||||
// PendingChatTaskResponse is returned by GetPendingChatTask — either the
|
||||
// current in-flight task's id/status, or an empty object when none is active.
|
||||
// CreatedAt is the anchor the frontend uses to time the chat StatusPill
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/middleware"
|
||||
@@ -194,3 +195,162 @@ func TestSendChatMessage_InvalidAttachmentIDs(t *testing.T) {
|
||||
t.Fatalf("expected 0 chat_message rows after rejected send, got %d", count)
|
||||
}
|
||||
}
|
||||
|
||||
func fetchChatMessagesPageForTest(t *testing.T, sessionID string, params url.Values) ChatMessagesPageResponse {
|
||||
t.Helper()
|
||||
target := "/api/chat/sessions/" + sessionID + "/messages/page"
|
||||
if encoded := params.Encode(); encoded != "" {
|
||||
target += "?" + encoded
|
||||
}
|
||||
req := httptest.NewRequest(http.MethodGet, target, nil)
|
||||
req.Header.Set("X-User-ID", testUserID)
|
||||
req = withURLParam(req, "sessionId", sessionID)
|
||||
req = withChatTestWorkspaceCtx(t, req)
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.ListChatMessagesPage(w, req)
|
||||
if w.Code != http.StatusOK {
|
||||
t.Fatalf("ListChatMessagesPage: expected 200, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
var page ChatMessagesPageResponse
|
||||
if err := json.Unmarshal(w.Body.Bytes(), &page); err != nil {
|
||||
t.Fatalf("decode page messages: %v", err)
|
||||
}
|
||||
return page
|
||||
}
|
||||
|
||||
func TestListChatMessagesPage_UsesCursorWithoutChangingLegacyList(t *testing.T) {
|
||||
agentID := createHandlerTestAgent(t, "ChatCursorPaginationAgent", []byte("[]"))
|
||||
sessionID := createHandlerTestChatSession(t, agentID)
|
||||
|
||||
for i, content := range []string{"oldest", "middle", "newest"} {
|
||||
_, err := testPool.Exec(
|
||||
context.Background(),
|
||||
`INSERT INTO chat_message (chat_session_id, role, content, created_at)
|
||||
VALUES ($1, 'user', $2, timestamp '2026-01-01 00:00:00' + ($3::int * interval '1 second'))`,
|
||||
sessionID,
|
||||
content,
|
||||
i,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("insert chat message %d: %v", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
legacyReq := httptest.NewRequest(http.MethodGet, "/api/chat/sessions/"+sessionID+"/messages", nil)
|
||||
legacyReq.Header.Set("X-User-ID", testUserID)
|
||||
legacyReq = withURLParam(legacyReq, "sessionId", sessionID)
|
||||
legacyReq = withChatTestWorkspaceCtx(t, legacyReq)
|
||||
legacyW := httptest.NewRecorder()
|
||||
testHandler.ListChatMessages(legacyW, legacyReq)
|
||||
if legacyW.Code != http.StatusOK {
|
||||
t.Fatalf("ListChatMessages: expected 200, got %d: %s", legacyW.Code, legacyW.Body.String())
|
||||
}
|
||||
var legacy []ChatMessageResponse
|
||||
if err := json.Unmarshal(legacyW.Body.Bytes(), &legacy); err != nil {
|
||||
t.Fatalf("decode legacy messages: %v", err)
|
||||
}
|
||||
if len(legacy) != 3 || legacy[0].Content != "oldest" || legacy[2].Content != "newest" {
|
||||
t.Fatalf("legacy messages = %#v", legacy)
|
||||
}
|
||||
|
||||
latest := fetchChatMessagesPageForTest(t, sessionID, url.Values{"limit": {"2"}})
|
||||
if latest.Limit != 2 || !latest.HasMore || latest.NextCursor == nil {
|
||||
t.Fatalf("latest page metadata = %#v", latest)
|
||||
}
|
||||
if len(latest.Messages) != 2 || latest.Messages[0].Content != "middle" || latest.Messages[1].Content != "newest" {
|
||||
t.Fatalf("latest page messages = %#v", latest)
|
||||
}
|
||||
|
||||
older := fetchChatMessagesPageForTest(t, sessionID, url.Values{
|
||||
"limit": {"2"},
|
||||
"before_created_at": {latest.NextCursor.CreatedAt},
|
||||
"before_id": {latest.NextCursor.ID},
|
||||
})
|
||||
if older.HasMore || older.NextCursor != nil {
|
||||
t.Fatalf("older page metadata = %#v", older)
|
||||
}
|
||||
if len(older.Messages) != 1 || older.Messages[0].Content != "oldest" {
|
||||
t.Fatalf("older page messages = %#v", older)
|
||||
}
|
||||
}
|
||||
|
||||
func TestListChatMessagesPage_CursorTieBreaksSameTimestampWithoutDupesOrGaps(t *testing.T) {
|
||||
agentID := createHandlerTestAgent(t, "ChatCursorTieBreakAgent", []byte("[]"))
|
||||
sessionID := createHandlerTestChatSession(t, agentID)
|
||||
|
||||
contents := []string{"a", "b", "c", "d", "e"}
|
||||
for _, content := range contents {
|
||||
_, err := testPool.Exec(
|
||||
context.Background(),
|
||||
`INSERT INTO chat_message (chat_session_id, role, content, created_at)
|
||||
VALUES ($1, 'user', $2, timestamp '2026-01-01 00:00:00')`,
|
||||
sessionID,
|
||||
content,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatalf("insert chat message %q: %v", content, err)
|
||||
}
|
||||
}
|
||||
|
||||
seen := map[string]bool{}
|
||||
var ordered []string
|
||||
params := url.Values{"limit": {"2"}}
|
||||
for {
|
||||
page := fetchChatMessagesPageForTest(t, sessionID, params)
|
||||
for _, msg := range page.Messages {
|
||||
if seen[msg.ID] {
|
||||
t.Fatalf("duplicate message id %s across cursor pages", msg.ID)
|
||||
}
|
||||
seen[msg.ID] = true
|
||||
ordered = append(ordered, msg.Content)
|
||||
}
|
||||
if !page.HasMore {
|
||||
if page.NextCursor != nil {
|
||||
t.Fatalf("terminal page has next cursor: %#v", page.NextCursor)
|
||||
}
|
||||
break
|
||||
}
|
||||
if page.NextCursor == nil {
|
||||
t.Fatalf("has_more page missing next cursor: %#v", page)
|
||||
}
|
||||
params = url.Values{
|
||||
"limit": {"2"},
|
||||
"before_created_at": {page.NextCursor.CreatedAt},
|
||||
"before_id": {page.NextCursor.ID},
|
||||
}
|
||||
}
|
||||
|
||||
if len(ordered) != len(contents) {
|
||||
t.Fatalf("expected %d messages across pages, got %d: %v", len(contents), len(ordered), ordered)
|
||||
}
|
||||
// Pages are newest-window first and chronological within each page. With all
|
||||
// timestamps equal, the id tie-break must still produce a deterministic,
|
||||
// gap-free traversal.
|
||||
for _, content := range contents {
|
||||
found := false
|
||||
for _, got := range ordered {
|
||||
if got == content {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
t.Fatalf("missing content %q across cursor pages: %v", content, ordered)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestListChatMessagesPage_RejectsInvalidLimit(t *testing.T) {
|
||||
agentID := createHandlerTestAgent(t, "ChatPaginationBadLimitAgent", []byte("[]"))
|
||||
sessionID := createHandlerTestChatSession(t, agentID)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/api/chat/sessions/"+sessionID+"/messages/page?limit=0", nil)
|
||||
req.Header.Set("X-User-ID", testUserID)
|
||||
req = withURLParam(req, "sessionId", sessionID)
|
||||
req = withChatTestWorkspaceCtx(t, req)
|
||||
w := httptest.NewRecorder()
|
||||
testHandler.ListChatMessagesPage(w, req)
|
||||
if w.Code != http.StatusBadRequest {
|
||||
t.Fatalf("ListChatMessagesPage invalid limit: expected 400, got %d: %s", w.Code, w.Body.String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -396,6 +396,58 @@ func (q *Queries) ListChatMessages(ctx context.Context, chatSessionID pgtype.UUI
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listChatMessagesPage = `-- name: ListChatMessagesPage :many
|
||||
SELECT id, chat_session_id, role, content, task_id, created_at, failure_reason, elapsed_ms FROM chat_message
|
||||
WHERE chat_session_id = $1
|
||||
AND (
|
||||
$3::timestamptz IS NULL
|
||||
OR (created_at, id) < ($3::timestamptz, $4::uuid)
|
||||
)
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT $2
|
||||
`
|
||||
|
||||
type ListChatMessagesPageParams struct {
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
Limit int32 `json:"limit"`
|
||||
BeforeCreatedAt pgtype.Timestamptz `json:"before_created_at"`
|
||||
BeforeID pgtype.UUID `json:"before_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) ListChatMessagesPage(ctx context.Context, arg ListChatMessagesPageParams) ([]ChatMessage, error) {
|
||||
rows, err := q.db.Query(ctx, listChatMessagesPage,
|
||||
arg.ChatSessionID,
|
||||
arg.Limit,
|
||||
arg.BeforeCreatedAt,
|
||||
arg.BeforeID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ChatMessage{}
|
||||
for rows.Next() {
|
||||
var i ChatMessage
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.ChatSessionID,
|
||||
&i.Role,
|
||||
&i.Content,
|
||||
&i.TaskID,
|
||||
&i.CreatedAt,
|
||||
&i.FailureReason,
|
||||
&i.ElapsedMs,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listChatSessionsByCreator = `-- name: ListChatSessionsByCreator :many
|
||||
SELECT cs.id, cs.workspace_id, cs.agent_id, cs.creator_id, cs.title, cs.session_id, cs.work_dir, cs.status, cs.created_at, cs.updated_at, cs.unread_since, cs.runtime_id,
|
||||
(cs.unread_since IS NOT NULL)::bool AS has_unread
|
||||
|
||||
@@ -83,6 +83,16 @@ SELECT * FROM chat_message
|
||||
WHERE chat_session_id = $1
|
||||
ORDER BY created_at ASC;
|
||||
|
||||
-- name: ListChatMessagesPage :many
|
||||
SELECT * FROM chat_message
|
||||
WHERE chat_session_id = $1
|
||||
AND (
|
||||
sqlc.narg('before_created_at')::timestamptz IS NULL
|
||||
OR (created_at, id) < (sqlc.narg('before_created_at')::timestamptz, sqlc.narg('before_id')::uuid)
|
||||
)
|
||||
ORDER BY created_at DESC, id DESC
|
||||
LIMIT $2;
|
||||
|
||||
-- name: GetChatMessage :one
|
||||
SELECT * FROM chat_message
|
||||
WHERE id = $1;
|
||||
|
||||
Reference in New Issue
Block a user