Files
multica/server/internal/handler/chat.go
Bohan Jiang 24b162cdbc feat(daemon): surface the real task initiator to the agent runtime (MUL-2645) (#3899)
* feat(daemon): surface the real task initiator to the agent runtime (MUL-2645)

In a multi-person workspace the agent runtime only ever saw the runtime
OWNER identity: the brief's `## Requesting User` is sourced from
runtime.OwnerID and the task-scoped token is owner-bound, so every
requester (whoever commented, @mentioned, or chatted) appeared to the
agent as the owner. Agents that route by initiator for permission,
privacy, or audit all misjudged.

Resolve the real task initiator at claim time and surface it distinctly
from the owner:
- comment / mention trigger -> triggering comment's author (member or agent)
- chat task -> chat session creator (sessions are creator-only)
- on-assign / autopilot / quick-create -> no attributable initiator (omitted)

Adds initiator_{type,id,name,email} to the claim response, the daemon
Task, and TaskContextForEnv, rendered into the brief as a new
`## Task Initiator` section. The section documents the privacy boundary:
the agent's credentials stay owner-scoped, so this is an attested
identity for the agent's own routing/privacy logic, not act-as. No DB
migration — both paths are derivable from existing rows.

Tests: brief rendering (member/agent/omit/sanitize) + email guard unit
tests, and claim-handler tests for the comment and chat paths.

Co-authored-by: multica-agent <github@multica.ai>

* fix(chat): store real sender as task initiator, not chat_session creator (MUL-2645)

Review fix (Niko, PR #3899). v1 resolved the chat task initiator from
chat_session.creator_id at claim time. That is correct for web chat and
Lark p2p (creator == sender), but WRONG for Lark group chats: the group
session creator is deliberately the installer (stable identity across
member churn), not the message sender. So in a Lark group, every member
who triggered the agent showed up in the brief as the installer/owner —
the exact bug this issue is about, still live at that entry point.

Capture the real sender at enqueue time instead of deriving it from the
session creator at claim time:

- migration 117: agent_task_queue.initiator_user_id (FK user, ON DELETE
  SET NULL); NULL for non-chat and pre-migration rows.
- EnqueueChatTask now takes an explicit initiatorUserID. Web chat passes
  the authenticated request user; the Lark dispatcher threads the inbound
  sender (binding.MulticaUserID) through scheduleRun -> flushChatRun. The
  debouncer keeps the latest scheduled flush per session, so in a multi-
  sender silence window the LATEST sender wins (documented + tested).
- claim handler resolves the initiator from task.initiator_user_id and
  drops the creator_id fallback entirely.

The Lark group session creator stays the installer (unchanged) — only the
task initiator is corrected, keeping the two concepts cleanly separate.

Tests: dispatcher group regression (initiator = sender, not installer),
latest-sender-wins, p2p initiator assertion; the chat claim handler test
now sets creator != initiator and asserts the stored sender wins.

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: J <j@multica.ai>
Co-authored-by: multica-agent <github@multica.ai>
2026-06-08 19:29:57 +08:00

955 lines
32 KiB
Go

package handler
import (
"encoding/json"
"errors"
"log/slog"
"net/http"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/analytics"
obsmetrics "github.com/multica-ai/multica/server/internal/metrics"
"github.com/multica-ai/multica/server/internal/middleware"
"github.com/multica-ai/multica/server/internal/util"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// chatSessionTitleMaxLen caps the rename input. Long enough to fit a
// meaningful summary, short enough to keep the dropdown row scannable.
const chatSessionTitleMaxLen = 200
// ---------------------------------------------------------------------------
// Chat Sessions
// ---------------------------------------------------------------------------
type CreateChatSessionRequest struct {
AgentID string `json:"agent_id"`
Title string `json:"title"`
}
func (h *Handler) CreateChatSession(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
var req CreateChatSessionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.AgentID == "" {
writeError(w, http.StatusBadRequest, "agent_id is required")
return
}
agentID, ok := parseUUIDOrBadRequest(w, req.AgentID, "agent_id")
if !ok {
return
}
workspaceUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace id")
if !ok {
return
}
// Verify agent exists in workspace.
agent, err := h.Queries.GetAgentInWorkspace(r.Context(), db.GetAgentInWorkspaceParams{
ID: agentID,
WorkspaceID: workspaceUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "agent not found")
return
}
if agent.ArchivedAt.Valid {
writeError(w, http.StatusBadRequest, "agent is archived")
return
}
// Private-agent gate: members must be in allowed_principals to start
// a chat with a private agent. Agent-to-agent chat sessions bypass
// the gate so A2A collaboration still works.
actorType, actorID := h.resolveActor(r, userID, workspaceID)
if !h.canAccessPrivateAgent(r.Context(), agent, actorType, actorID, workspaceID) {
writeError(w, http.StatusForbidden, "you do not have access to this agent")
return
}
session, err := h.Queries.CreateChatSession(r.Context(), db.CreateChatSessionParams{
WorkspaceID: workspaceUUID,
AgentID: agentID,
CreatorID: parseUUID(userID),
Title: req.Title,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create chat session")
return
}
writeJSON(w, http.StatusCreated, chatSessionToResponse(session))
}
func (h *Handler) ListChatSessions(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
// Compute the accessible-agents set once and use it to drop sessions
// whose target agent the caller no longer has access to — without this,
// a member whose role was downgraded would still see the session list
// (and transcripts via ListChatMessages) for any private agent they
// previously had access to. Falls back to the user's role from the
// workspace member context.
member, ok := h.workspaceMember(w, r, workspaceID)
if !ok {
return
}
actorType, actorID := h.resolveActor(r, userID, workspaceID)
allowed, ok := h.accessibleAgentIDs(r.Context(), workspaceID, actorType, actorID, member.Role)
if !ok {
writeError(w, http.StatusInternalServerError, "failed to resolve agent access")
return
}
status := r.URL.Query().Get("status")
// Two call sites → two row types with identical shape. Collect into a
// common response slice via small per-branch loops.
var resp []ChatSessionResponse
if status == "all" {
rows, err := h.Queries.ListAllChatSessionsByCreator(r.Context(), db.ListAllChatSessionsByCreatorParams{
WorkspaceID: parseUUID(workspaceID),
CreatorID: parseUUID(userID),
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list chat sessions")
return
}
resp = make([]ChatSessionResponse, 0, len(rows))
for _, s := range rows {
if _, ok := allowed[uuidToString(s.AgentID)]; !ok {
continue
}
resp = append(resp, ChatSessionResponse{
ID: uuidToString(s.ID),
WorkspaceID: uuidToString(s.WorkspaceID),
AgentID: uuidToString(s.AgentID),
CreatorID: uuidToString(s.CreatorID),
Title: s.Title,
Status: s.Status,
HasUnread: s.HasUnread,
CreatedAt: timestampToString(s.CreatedAt),
UpdatedAt: timestampToString(s.UpdatedAt),
})
}
} else {
rows, err := h.Queries.ListChatSessionsByCreator(r.Context(), db.ListChatSessionsByCreatorParams{
WorkspaceID: parseUUID(workspaceID),
CreatorID: parseUUID(userID),
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list chat sessions")
return
}
resp = make([]ChatSessionResponse, 0, len(rows))
for _, s := range rows {
if _, ok := allowed[uuidToString(s.AgentID)]; !ok {
continue
}
resp = append(resp, ChatSessionResponse{
ID: uuidToString(s.ID),
WorkspaceID: uuidToString(s.WorkspaceID),
AgentID: uuidToString(s.AgentID),
CreatorID: uuidToString(s.CreatorID),
Title: s.Title,
Status: s.Status,
HasUnread: s.HasUnread,
CreatedAt: timestampToString(s.CreatedAt),
UpdatedAt: timestampToString(s.UpdatedAt),
})
}
}
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) loadChatSessionForUser(w http.ResponseWriter, r *http.Request, userID, workspaceID, sessionID string) (db.ChatSession, bool) {
sessionUUID, ok := parseUUIDOrBadRequest(w, sessionID, "chat session id")
if !ok {
return db.ChatSession{}, false
}
workspaceUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace id")
if !ok {
return db.ChatSession{}, false
}
session, err := h.Queries.GetChatSessionInWorkspace(r.Context(), db.GetChatSessionInWorkspaceParams{
ID: sessionUUID,
WorkspaceID: workspaceUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "chat session not found")
return db.ChatSession{}, false
}
if uuidToString(session.CreatorID) != userID {
writeError(w, http.StatusForbidden, "not your chat session")
return db.ChatSession{}, false
}
return session, true
}
// gateChatSessionForUser combines the session ownership check with the
// private-agent access gate so a member who has lost access to the target
// agent (role downgrade, ownership transfer, agent flipped to private)
// cannot continue reading the chat transcript even though they remain the
// session creator. Returns ok=false after writing the error response.
func (h *Handler) gateChatSessionForUser(w http.ResponseWriter, r *http.Request, userID, workspaceID, sessionID string) (db.ChatSession, bool) {
session, ok := h.loadChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return db.ChatSession{}, false
}
agent, err := h.Queries.GetAgent(r.Context(), session.AgentID)
if err != nil {
writeError(w, http.StatusNotFound, "agent not found")
return db.ChatSession{}, false
}
actorType, actorID := h.resolveActor(r, userID, workspaceID)
if !h.canAccessPrivateAgent(r.Context(), agent, actorType, actorID, workspaceID) {
writeError(w, http.StatusForbidden, "you do not have access to this agent")
return db.ChatSession{}, false
}
return session, true
}
func (h *Handler) GetChatSession(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
sessionID := chi.URLParam(r, "sessionId")
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return
}
writeJSON(w, http.StatusOK, chatSessionToResponse(session))
}
type UpdateChatSessionRequest struct {
Title *string `json:"title"`
}
// UpdateChatSession updates user-editable fields on a chat session — today
// just `title`, surfaced by the inline rename affordance in the session
// dropdown. Title is the only field accepted: `status` is legacy + read-only,
// agent/creator/workspace are immutable, the resume pointers
// (session_id / work_dir / runtime_id) are daemon-owned.
func (h *Handler) UpdateChatSession(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
sessionID := chi.URLParam(r, "sessionId")
var req UpdateChatSessionRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.Title == nil {
writeError(w, http.StatusBadRequest, "title is required")
return
}
title := strings.TrimSpace(*req.Title)
if title == "" {
writeError(w, http.StatusBadRequest, "title is required")
return
}
if len([]rune(title)) > chatSessionTitleMaxLen {
writeError(w, http.StatusBadRequest, "title is too long")
return
}
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return
}
updated, err := h.Queries.UpdateChatSessionTitle(r.Context(), db.UpdateChatSessionTitleParams{
ID: session.ID,
Title: title,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to update chat session")
return
}
resolvedSessionID := uuidToString(updated.ID)
h.publishChat(protocol.EventChatSessionUpdated, workspaceID, "member", userID, resolvedSessionID, protocol.ChatSessionUpdatedPayload{
ChatSessionID: resolvedSessionID,
Title: updated.Title,
UpdatedAt: timestampToString(updated.UpdatedAt),
})
writeJSON(w, http.StatusOK, chatSessionToResponse(updated))
}
// DeleteChatSession hard-deletes a chat session owned by the caller. The
// row lock + cancel + delete run inside a single tx so a concurrent
// SendChatMessage cannot enqueue a task that would later be orphaned by
// the FK ON DELETE SET NULL on agent_task_queue.chat_session_id. Cancel
// failure aborts the delete; events fire only after commit.
func (h *Handler) DeleteChatSession(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
sessionID := chi.URLParam(r, "sessionId")
session, ok := h.loadChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return
}
tx, err := h.TxStarter.Begin(r.Context())
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to start transaction")
return
}
defer tx.Rollback(r.Context())
qtx := h.Queries.WithTx(tx)
// FOR UPDATE on the chat_session row blocks any concurrent INSERT into
// agent_task_queue that references it (the FK validation needs a
// KEY SHARE lock). After we commit the delete, the blocked INSERT
// fails its FK check, so it can't land an orphaned task.
if _, err := qtx.LockChatSessionForDelete(r.Context(), session.ID); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
// Already gone — treat as idempotent success.
w.WriteHeader(http.StatusNoContent)
return
}
writeError(w, http.StatusInternalServerError, "failed to lock chat session")
return
}
cancelled, err := qtx.CancelAgentTasksByChatSession(r.Context(), session.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to cancel chat session tasks")
return
}
if err := qtx.DeleteChatSession(r.Context(), db.DeleteChatSessionParams{
ID: session.ID,
WorkspaceID: session.WorkspaceID,
}); err != nil {
writeError(w, http.StatusInternalServerError, "failed to delete chat session")
return
}
if err := tx.Commit(r.Context()); err != nil {
slog.Warn("commit chat session delete failed", "session_id", sessionID, "error", err)
writeError(w, http.StatusInternalServerError, "failed to commit chat session delete")
return
}
// Post-commit broadcasts. Subscribers should never observe events for a
// tx that didn't actually persist.
h.TaskService.BroadcastCancelledTasks(r.Context(), cancelled)
resolvedSessionID := uuidToString(session.ID)
h.publishChat(protocol.EventChatSessionDeleted, workspaceID, "member", userID, resolvedSessionID, protocol.ChatSessionDeletedPayload{
ChatSessionID: resolvedSessionID,
})
w.WriteHeader(http.StatusNoContent)
}
// ---------------------------------------------------------------------------
// Chat Messages
// ---------------------------------------------------------------------------
type SendChatMessageRequest struct {
Content string `json:"content"`
AttachmentIDs []string `json:"attachment_ids"`
}
type SendChatMessageResponse struct {
MessageID string `json:"message_id"`
TaskID string `json:"task_id"`
// CreatedAt anchors the chat StatusPill timer the instant the user
// hits send. Without it the front-end falls back to its local clock
// and the timer "snaps backwards" later when WS events deliver the
// real created_at. Returning it here means the pill renders 0s from
// the start with a stable anchor.
CreatedAt string `json:"created_at"`
}
func (h *Handler) SendChatMessage(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
sessionID := chi.URLParam(r, "sessionId")
var req SendChatMessageRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid request body")
return
}
if req.Content == "" {
writeError(w, http.StatusBadRequest, "content is required")
return
}
// Pre-validate attachment ids early so invalid input returns 400 before
// any state mutation. The actual link runs after CreateChatMessage so we
// have a message_id to back-fill into the attachment rows.
attachmentIDs, ok := parseUUIDSliceOrBadRequest(w, req.AttachmentIDs, "attachment_ids")
if !ok {
return
}
// Load chat session and re-check the private-agent gate on every send.
// The session's creator passed the gate at create time, but their
// workspace role (or the agent's owner) may have changed since — keep
// stale sessions from being a back-door into a private agent the user
// can no longer reach. Agent senders bypass to preserve A2A collaboration.
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return
}
// New archive flow doesn't exist anymore, but legacy rows with
// status='archived' may still be in the DB from before the feature
// was removed. Refuse to enqueue new agent work for them — frontend
// surfaces these as read-only.
if session.Status != "active" {
writeError(w, http.StatusBadRequest, "chat session is archived")
return
}
// Create the user message first so the daemon can always find it.
msg, err := h.Queries.CreateChatMessage(r.Context(), db.CreateChatMessageParams{
ChatSessionID: session.ID,
Role: "user",
Content: req.Content,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to create chat message")
return
}
// Back-fill chat_message_id on attachments that were uploaded against
// this session while the user was composing. The query only touches rows
// where chat_session_id matches AND chat_message_id IS NULL, so it cannot
// rebind an attachment that already belongs to an earlier message.
if len(attachmentIDs) > 0 {
if err := h.Queries.LinkAttachmentsToChatMessage(r.Context(), db.LinkAttachmentsToChatMessageParams{
ChatMessageID: msg.ID,
ChatSessionID: session.ID,
Column3: attachmentIDs,
}); err != nil {
// Don't fail the send — the message content is already saved and
// the attachments remain on the session (still downloadable).
slog.Warn("link chat attachments failed", "error", err, "message_id", uuidToString(msg.ID))
}
}
// Enqueue a chat task after the message exists. For web chat the sender is
// the authenticated request user (sessions are creator-only), so they are
// the task initiator — surfaced to the agent under `## Task Initiator`.
task, err := h.TaskService.EnqueueChatTask(r.Context(), session, parseUUID(userID))
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to enqueue chat task: "+err.Error())
return
}
// Touch session updated_at.
if err := h.Queries.TouchChatSession(r.Context(), session.ID); err != nil {
slog.Warn("failed to touch chat session", "session_id", sessionID, "error", err)
}
taskContext := h.TaskService.AnalyticsContextForTask(r.Context(), task)
platform, _, _ := middleware.ClientMetadataFromContext(r.Context())
obsmetrics.RecordEvent(h.Analytics, h.Metrics, analytics.ChatMessageSent(
userID,
workspaceID,
uuidToString(session.ID),
uuidToString(task.ID),
uuidToString(session.AgentID),
taskContext.RuntimeMode,
taskContext.Provider,
platform,
))
// Broadcast the user message.
resolvedSessionID := uuidToString(session.ID)
h.publishChat(protocol.EventChatMessage, workspaceID, "member", userID, resolvedSessionID, protocol.ChatMessagePayload{
ChatSessionID: resolvedSessionID,
MessageID: uuidToString(msg.ID),
Role: "user",
Content: req.Content,
TaskID: uuidToString(task.ID),
CreatedAt: timestampToString(msg.CreatedAt),
})
writeJSON(w, http.StatusCreated, SendChatMessageResponse{
MessageID: uuidToString(msg.ID),
TaskID: uuidToString(task.ID),
CreatedAt: timestampToString(task.CreatedAt),
})
}
type ChatMessagesCursorResponse struct {
CreatedAt string `json:"created_at"`
ID string `json:"id"`
}
type ChatMessagesPageResponse struct {
Messages []ChatMessageResponse `json:"messages"`
Limit int `json:"limit"`
HasMore bool `json:"has_more"`
NextCursor *ChatMessagesCursorResponse `json:"next_cursor,omitempty"`
}
func parseChatMessagesPageParams(r *http.Request) (int, pgtype.Timestamptz, pgtype.UUID, error) {
limit := 50
if raw := r.URL.Query().Get("limit"); raw != "" {
parsed, err := strconv.Atoi(raw)
if err != nil || parsed < 1 || parsed > 100 {
return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid limit")
}
limit = parsed
}
rawBeforeCreatedAt := r.URL.Query().Get("before_created_at")
rawBeforeID := r.URL.Query().Get("before_id")
if rawBeforeCreatedAt == "" && rawBeforeID == "" {
return limit, pgtype.Timestamptz{}, pgtype.UUID{}, nil
}
if rawBeforeCreatedAt == "" || rawBeforeID == "" {
return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor")
}
beforeTime, err := time.Parse(time.RFC3339Nano, rawBeforeCreatedAt)
if err != nil {
return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor")
}
beforeID, err := util.ParseUUID(rawBeforeID)
if err != nil {
return 0, pgtype.Timestamptz{}, pgtype.UUID{}, errors.New("invalid cursor")
}
return limit, pgtype.Timestamptz{Time: beforeTime, Valid: true}, beforeID, nil
}
func (h *Handler) ListChatMessages(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
sessionID := chi.URLParam(r, "sessionId")
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return
}
messages, err := h.Queries.ListChatMessages(r.Context(), session.ID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list chat messages")
return
}
messageIDs := make([]pgtype.UUID, len(messages))
for i, m := range messages {
messageIDs[i] = m.ID
}
groupedAtt := h.groupChatMessageAttachments(r.Context(), workspaceID, messageIDs)
resp := make([]ChatMessageResponse, len(messages))
for i, m := range messages {
resp[i] = chatMessageToResponse(m, groupedAtt[uuidToString(m.ID)])
}
writeJSON(w, http.StatusOK, resp)
}
func (h *Handler) ListChatMessagesPage(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
sessionID := chi.URLParam(r, "sessionId")
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return
}
limit, beforeCreatedAt, beforeID, err := parseChatMessagesPageParams(r)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
messages, err := h.Queries.ListChatMessagesPage(r.Context(), db.ListChatMessagesPageParams{
ChatSessionID: session.ID,
Limit: int32(limit + 1),
BeforeCreatedAt: beforeCreatedAt,
BeforeID: beforeID,
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list chat messages")
return
}
hasMore := len(messages) > limit
if hasMore {
messages = messages[:limit]
}
var nextCursor *ChatMessagesCursorResponse
if hasMore && len(messages) > 0 {
oldest := messages[len(messages)-1]
nextCursor = &ChatMessagesCursorResponse{
CreatedAt: oldest.CreatedAt.Time.Format(time.RFC3339Nano),
ID: uuidToString(oldest.ID),
}
}
// SQL fetches newest windows first so the empty cursor opens at the recent
// tail. Reverse each cursor page before serializing to keep message order
// chronological within the viewport.
for i, j := 0, len(messages)-1; i < j; i, j = i+1, j-1 {
messages[i], messages[j] = messages[j], messages[i]
}
messageIDs := make([]pgtype.UUID, len(messages))
for i, m := range messages {
messageIDs[i] = m.ID
}
groupedAtt := h.groupChatMessageAttachments(r.Context(), workspaceID, messageIDs)
resp := make([]ChatMessageResponse, len(messages))
for i, m := range messages {
resp[i] = chatMessageToResponse(m, groupedAtt[uuidToString(m.ID)])
}
writeJSON(w, http.StatusOK, ChatMessagesPageResponse{
Messages: resp,
Limit: limit,
HasMore: hasMore,
NextCursor: nextCursor,
})
}
// PendingChatTaskResponse is returned by GetPendingChatTask — either the
// current in-flight task's id/status, or an empty object when none is active.
// CreatedAt is the anchor the frontend uses to time the chat StatusPill
// (elapsed seconds = now - CreatedAt). It must come from the server because
// optimistic seeds don't have a real task created_at and the timer needs to
// survive refresh / reopen.
type PendingChatTaskResponse struct {
TaskID string `json:"task_id,omitempty"`
Status string `json:"status,omitempty"`
CreatedAt string `json:"created_at,omitempty"`
}
// MarkChatSessionRead clears the session's unread_since (→ has_unread=false)
// and broadcasts chat:session_read so other devices of the same user drop
// their badges.
func (h *Handler) MarkChatSessionRead(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
sessionID := chi.URLParam(r, "sessionId")
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return
}
if err := h.Queries.MarkChatSessionRead(r.Context(), session.ID); err != nil {
writeError(w, http.StatusInternalServerError, "failed to mark session read")
return
}
resolvedSessionID := uuidToString(session.ID)
h.publishChat(protocol.EventChatSessionRead, workspaceID, "member", userID, resolvedSessionID, protocol.ChatSessionReadPayload{
ChatSessionID: resolvedSessionID,
})
w.WriteHeader(http.StatusNoContent)
}
// PendingChatTasksResponse is the aggregate view consumed by the FAB.
type PendingChatTasksResponse struct {
Tasks []PendingChatTaskItem `json:"tasks"`
}
type PendingChatTaskItem struct {
TaskID string `json:"task_id"`
Status string `json:"status"`
ChatSessionID string `json:"chat_session_id"`
}
// ListPendingChatTasks returns every in-flight chat task owned by the current
// user in this workspace. Drives the FAB's "running" indicator when the chat
// window is closed (no per-session query is subscribed). Tasks belonging to
// private agents the caller has lost access to are dropped from the response.
func (h *Handler) ListPendingChatTasks(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
member, ok := h.workspaceMember(w, r, workspaceID)
if !ok {
return
}
actorType, actorID := h.resolveActor(r, userID, workspaceID)
allowed, ok := h.accessibleAgentIDs(r.Context(), workspaceID, actorType, actorID, member.Role)
if !ok {
writeError(w, http.StatusInternalServerError, "failed to resolve agent access")
return
}
rows, err := h.Queries.ListPendingChatTasksByCreator(r.Context(), db.ListPendingChatTasksByCreatorParams{
WorkspaceID: parseUUID(workspaceID),
CreatorID: parseUUID(userID),
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list pending chat tasks")
return
}
// Map session → agent so we can filter without an N+1. The user's own
// session list is small, so one extra query is cheaper than per-row
// lookups.
sessions, err := h.Queries.ListAllChatSessionsByCreator(r.Context(), db.ListAllChatSessionsByCreatorParams{
WorkspaceID: parseUUID(workspaceID),
CreatorID: parseUUID(userID),
})
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to resolve chat session agents")
return
}
sessionAgent := make(map[string]string, len(sessions))
for _, s := range sessions {
sessionAgent[uuidToString(s.ID)] = uuidToString(s.AgentID)
}
items := make([]PendingChatTaskItem, 0, len(rows))
for _, row := range rows {
sessionID := uuidToString(row.ChatSessionID)
agentID, hasAgent := sessionAgent[sessionID]
if !hasAgent {
continue
}
if _, ok := allowed[agentID]; !ok {
continue
}
items = append(items, PendingChatTaskItem{
TaskID: uuidToString(row.TaskID),
Status: row.Status,
ChatSessionID: sessionID,
})
}
writeJSON(w, http.StatusOK, PendingChatTasksResponse{Tasks: items})
}
// GetPendingChatTask returns the most recent in-flight task (queued / dispatched
// / running) for a chat session. The frontend polls this on mount / session
// switch so pending UI state survives refresh and reopen.
func (h *Handler) GetPendingChatTask(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
sessionID := chi.URLParam(r, "sessionId")
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
if !ok {
return
}
task, err := h.Queries.GetPendingChatTask(r.Context(), session.ID)
if err != nil {
// No in-flight task — return an empty object, not an error.
writeJSON(w, http.StatusOK, PendingChatTaskResponse{})
return
}
writeJSON(w, http.StatusOK, PendingChatTaskResponse{
TaskID: uuidToString(task.ID),
Status: task.Status,
CreatedAt: timestampToString(task.CreatedAt),
})
}
// ---------------------------------------------------------------------------
// Task cancellation (user-facing, with ownership check)
// ---------------------------------------------------------------------------
// CancelTaskByUser cancels a task the caller is allowed to act on within the
// current workspace.
//
// Tenancy is enforced uniformly through the task's owning agent: every
// agent_task_queue row carries a NOT NULL agent_id (ON DELETE CASCADE, so the
// agent always exists), and agents are workspace-scoped. GetAgentTaskInWorkspace
// is therefore the single tenant guard that works regardless of which optional
// source FK (issue / chat_session / autopilot_run) is set — which is what makes
// run_only autopilot tasks and quick_create tasks (whose issue does not exist
// yet) cancellable at all. Keying cancellation off issue_id / chat_session_id
// alone is exactly what 404'd these tasks before (MUL-2827).
//
// On top of tenancy, two privacy models layer on:
// - a chat task is private to the member who started the conversation, so
// only that creator may cancel it;
// - every other task surfaces on the agent Activity tab and the workspace
// task snapshot, both of which hide private agents from members without
// access. Cancellation mirrors that gate via canAccessPrivateAgent so the
// id-only endpoint is never more permissive than the surface that exposes
// the task.
func (h *Handler) CancelTaskByUser(w http.ResponseWriter, r *http.Request) {
userID, ok := requireUserID(w, r)
if !ok {
return
}
workspaceID := ctxWorkspaceID(r.Context())
wsUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace id")
if !ok {
return
}
taskID := chi.URLParam(r, "taskId")
taskUUID, ok := parseUUIDOrBadRequest(w, taskID, "task id")
if !ok {
return
}
task, err := h.Queries.GetAgentTaskInWorkspace(r.Context(), db.GetAgentTaskInWorkspaceParams{
ID: taskUUID,
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
if task.ChatSessionID.Valid {
// Chat privacy: only the member who opened the conversation may
// cancel its task, even though the workspace is shared.
cs, err := h.Queries.GetChatSessionInWorkspace(r.Context(), db.GetChatSessionInWorkspaceParams{
ID: task.ChatSessionID,
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
if uuidToString(cs.CreatorID) != userID {
writeError(w, http.StatusForbidden, "not your task")
return
}
} else {
// Issue / autopilot / quick_create tasks are all visible on the
// agent Activity tab + workspace snapshot, which gate private
// agents. Mirror that gate here.
agent, err := h.Queries.GetAgentInWorkspace(r.Context(), db.GetAgentInWorkspaceParams{
ID: task.AgentID,
WorkspaceID: wsUUID,
})
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
actorType, actorID := h.resolveActor(r, userID, workspaceID)
if !h.canAccessPrivateAgent(r.Context(), agent, actorType, actorID, workspaceID) {
writeError(w, http.StatusForbidden, "you do not have access to this agent")
return
}
}
cancelled, err := h.TaskService.CancelTask(r.Context(), taskUUID)
if err != nil {
writeError(w, http.StatusBadRequest, err.Error())
return
}
writeJSON(w, http.StatusOK, taskToResponse(*cancelled, workspaceID))
}
// ---------------------------------------------------------------------------
// Response types & helpers
// ---------------------------------------------------------------------------
type ChatSessionResponse struct {
ID string `json:"id"`
WorkspaceID string `json:"workspace_id"`
AgentID string `json:"agent_id"`
CreatorID string `json:"creator_id"`
Title string `json:"title"`
Status string `json:"status"`
// Only populated by list endpoints — single-session fetches return false.
HasUnread bool `json:"has_unread"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
type ChatMessageResponse struct {
ID string `json:"id"`
ChatSessionID string `json:"chat_session_id"`
Role string `json:"role"`
Content string `json:"content"`
TaskID *string `json:"task_id"`
CreatedAt string `json:"created_at"`
// FailureReason flags an assistant row synthesized by FailTask's chat
// fallback. Front-end uses it to switch to the destructive bubble.
FailureReason *string `json:"failure_reason"`
// ElapsedMs is the wall-clock duration from task creation to terminal
// state. Drives "Replied in 38s" / "Failed after 12s" captions.
ElapsedMs *int64 `json:"elapsed_ms"`
// Attachments linked to this message via chat_message_id. The chat
// bubble renders file cards from these, and the daemon claim path
// (daemon.go) pulls structured metadata from the same source so the
// agent can `multica attachment download <id>` rather than guessing
// from a markdown URL that may expire.
Attachments []AttachmentResponse `json:"attachments,omitempty"`
}
func chatSessionToResponse(s db.ChatSession) ChatSessionResponse {
return ChatSessionResponse{
ID: uuidToString(s.ID),
WorkspaceID: uuidToString(s.WorkspaceID),
AgentID: uuidToString(s.AgentID),
CreatorID: uuidToString(s.CreatorID),
Title: s.Title,
Status: s.Status,
CreatedAt: timestampToString(s.CreatedAt),
UpdatedAt: timestampToString(s.UpdatedAt),
}
}
func chatMessageToResponse(m db.ChatMessage, attachments []AttachmentResponse) ChatMessageResponse {
return ChatMessageResponse{
ID: uuidToString(m.ID),
ChatSessionID: uuidToString(m.ChatSessionID),
Role: m.Role,
Content: m.Content,
TaskID: uuidToPtr(m.TaskID),
CreatedAt: timestampToString(m.CreatedAt),
FailureReason: textToPtr(m.FailureReason),
ElapsedMs: int8ToPtr(m.ElapsedMs),
Attachments: attachments,
}
}