mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 13:29:44 +02:00
* MUL-2771: feat(transcript): server-derived relative work_dir chip Adds a privacy-safe `relative_work_dir` field to the agent task wire shape so the transcript dialog can show where a task ran without leaking the user's home directory. Standard tasks strip the daemon's workspaces root to `<wsUUID>/<taskShort>/workdir`; local_directory tasks fall back to the trailing two path segments (`repos/foo`), which keeps enough context for the user to recognise the directory without exposing $HOME or the username. The derivation lives in `taskToResponse` so every endpoint that serves a task — list, snapshot, claim, rerun, cancel, complete, fail — fills the field consistently. taskToResponse now also populates `workspace_id`, which the prior shape declared but never set. shortTaskID mirrors execenv.shortID; a colocated test pins the two helpers together so future daemon-side layout changes don't silently degrade the chip into the local_directory fallback. Replaces the front-end stripping attempt in PR #3379, which passed issue_id where workspace_id was required and therefore rendered the full absolute path on every standard task. Co-authored-by: multica-agent <github@multica.ai> * MUL-2771: harden privacy guards on transcript work_dir chip Address second-round review feedback from PR #3428: 1. Drop the `title={task.work_dir}` tooltip in the transcript dialog. The visible chip was safe but native browser tooltips re-rendered the absolute `/Users/<name>/...` on hover, leaking into screen shares, screenshots, and recordings — defeating the stated goal of the chip. The absolute path now never reaches the DOM (no title, aria, or data attribute). 2. Replace the "tail two segments" fallback for local_directory paths with explicit home-prefix stripping plus a basename-only final fallback. The old behaviour leaked the username on shallow paths like `/Users/alice/foo`, `/home/alice/project`, and `C:\Users\alice\foo`. The new behaviour recognises common per-user home layouts on macOS, Linux, and Windows (case-insensitive), strips them down to the remainder, and falls back to the basename for any path under an unrecognised root — a single segment can never carry the home prefix. 3. Align the Go and TypeScript field comments with the real fallback policy so future readers see "strip home / basename" instead of the outdated "tail two segments" description. Tests: expanded `TestRelativeWorkDir` to cover shallow `/Users/...`, `/home/...`, and `C:\Users\...` paths, the exact-home edge cases, case-insensitive matching, and the non-home basename-only fallback. Co-authored-by: multica-agent <github@multica.ai> --------- Co-authored-by: J <j@multica.ai> Co-authored-by: multica-agent <github@multica.ai>
806 lines
27 KiB
Go
806 lines
27 KiB
Go
package handler
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"log/slog"
|
|
"net/http"
|
|
"strings"
|
|
|
|
"github.com/go-chi/chi/v5"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
"github.com/multica-ai/multica/server/internal/analytics"
|
|
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
|
"github.com/multica-ai/multica/server/pkg/protocol"
|
|
)
|
|
|
|
// chatSessionTitleMaxLen caps the rename input. Long enough to fit a
|
|
// meaningful summary, short enough to keep the dropdown row scannable.
|
|
const chatSessionTitleMaxLen = 200
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Chat Sessions
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type CreateChatSessionRequest struct {
|
|
AgentID string `json:"agent_id"`
|
|
Title string `json:"title"`
|
|
}
|
|
|
|
func (h *Handler) CreateChatSession(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
|
|
var req CreateChatSessionRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
if req.AgentID == "" {
|
|
writeError(w, http.StatusBadRequest, "agent_id is required")
|
|
return
|
|
}
|
|
agentID, ok := parseUUIDOrBadRequest(w, req.AgentID, "agent_id")
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace id")
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Verify agent exists in workspace.
|
|
agent, err := h.Queries.GetAgentInWorkspace(r.Context(), db.GetAgentInWorkspaceParams{
|
|
ID: agentID,
|
|
WorkspaceID: workspaceUUID,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "agent not found")
|
|
return
|
|
}
|
|
if agent.ArchivedAt.Valid {
|
|
writeError(w, http.StatusBadRequest, "agent is archived")
|
|
return
|
|
}
|
|
// Private-agent gate: members must be in allowed_principals to start
|
|
// a chat with a private agent. Agent-to-agent chat sessions bypass
|
|
// the gate so A2A collaboration still works.
|
|
actorType, actorID := h.resolveActor(r, userID, workspaceID)
|
|
if !h.canAccessPrivateAgent(r.Context(), agent, actorType, actorID, workspaceID) {
|
|
writeError(w, http.StatusForbidden, "you do not have access to this agent")
|
|
return
|
|
}
|
|
|
|
session, err := h.Queries.CreateChatSession(r.Context(), db.CreateChatSessionParams{
|
|
WorkspaceID: workspaceUUID,
|
|
AgentID: agentID,
|
|
CreatorID: parseUUID(userID),
|
|
Title: req.Title,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to create chat session")
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusCreated, chatSessionToResponse(session))
|
|
}
|
|
|
|
func (h *Handler) ListChatSessions(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
|
|
// Compute the accessible-agents set once and use it to drop sessions
|
|
// whose target agent the caller no longer has access to — without this,
|
|
// a member whose role was downgraded would still see the session list
|
|
// (and transcripts via ListChatMessages) for any private agent they
|
|
// previously had access to. Falls back to the user's role from the
|
|
// workspace member context.
|
|
member, ok := h.workspaceMember(w, r, workspaceID)
|
|
if !ok {
|
|
return
|
|
}
|
|
actorType, actorID := h.resolveActor(r, userID, workspaceID)
|
|
allowed, ok := h.accessibleAgentIDs(r.Context(), workspaceID, actorType, actorID, member.Role)
|
|
if !ok {
|
|
writeError(w, http.StatusInternalServerError, "failed to resolve agent access")
|
|
return
|
|
}
|
|
|
|
status := r.URL.Query().Get("status")
|
|
|
|
// Two call sites → two row types with identical shape. Collect into a
|
|
// common response slice via small per-branch loops.
|
|
var resp []ChatSessionResponse
|
|
if status == "all" {
|
|
rows, err := h.Queries.ListAllChatSessionsByCreator(r.Context(), db.ListAllChatSessionsByCreatorParams{
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
CreatorID: parseUUID(userID),
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to list chat sessions")
|
|
return
|
|
}
|
|
resp = make([]ChatSessionResponse, 0, len(rows))
|
|
for _, s := range rows {
|
|
if _, ok := allowed[uuidToString(s.AgentID)]; !ok {
|
|
continue
|
|
}
|
|
resp = append(resp, ChatSessionResponse{
|
|
ID: uuidToString(s.ID),
|
|
WorkspaceID: uuidToString(s.WorkspaceID),
|
|
AgentID: uuidToString(s.AgentID),
|
|
CreatorID: uuidToString(s.CreatorID),
|
|
Title: s.Title,
|
|
Status: s.Status,
|
|
HasUnread: s.HasUnread,
|
|
CreatedAt: timestampToString(s.CreatedAt),
|
|
UpdatedAt: timestampToString(s.UpdatedAt),
|
|
})
|
|
}
|
|
} else {
|
|
rows, err := h.Queries.ListChatSessionsByCreator(r.Context(), db.ListChatSessionsByCreatorParams{
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
CreatorID: parseUUID(userID),
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to list chat sessions")
|
|
return
|
|
}
|
|
resp = make([]ChatSessionResponse, 0, len(rows))
|
|
for _, s := range rows {
|
|
if _, ok := allowed[uuidToString(s.AgentID)]; !ok {
|
|
continue
|
|
}
|
|
resp = append(resp, ChatSessionResponse{
|
|
ID: uuidToString(s.ID),
|
|
WorkspaceID: uuidToString(s.WorkspaceID),
|
|
AgentID: uuidToString(s.AgentID),
|
|
CreatorID: uuidToString(s.CreatorID),
|
|
Title: s.Title,
|
|
Status: s.Status,
|
|
HasUnread: s.HasUnread,
|
|
CreatedAt: timestampToString(s.CreatedAt),
|
|
UpdatedAt: timestampToString(s.UpdatedAt),
|
|
})
|
|
}
|
|
}
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
func (h *Handler) loadChatSessionForUser(w http.ResponseWriter, r *http.Request, userID, workspaceID, sessionID string) (db.ChatSession, bool) {
|
|
sessionUUID, ok := parseUUIDOrBadRequest(w, sessionID, "chat session id")
|
|
if !ok {
|
|
return db.ChatSession{}, false
|
|
}
|
|
workspaceUUID, ok := parseUUIDOrBadRequest(w, workspaceID, "workspace id")
|
|
if !ok {
|
|
return db.ChatSession{}, false
|
|
}
|
|
session, err := h.Queries.GetChatSessionInWorkspace(r.Context(), db.GetChatSessionInWorkspaceParams{
|
|
ID: sessionUUID,
|
|
WorkspaceID: workspaceUUID,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "chat session not found")
|
|
return db.ChatSession{}, false
|
|
}
|
|
if uuidToString(session.CreatorID) != userID {
|
|
writeError(w, http.StatusForbidden, "not your chat session")
|
|
return db.ChatSession{}, false
|
|
}
|
|
return session, true
|
|
}
|
|
|
|
// gateChatSessionForUser combines the session ownership check with the
|
|
// private-agent access gate so a member who has lost access to the target
|
|
// agent (role downgrade, ownership transfer, agent flipped to private)
|
|
// cannot continue reading the chat transcript even though they remain the
|
|
// session creator. Returns ok=false after writing the error response.
|
|
func (h *Handler) gateChatSessionForUser(w http.ResponseWriter, r *http.Request, userID, workspaceID, sessionID string) (db.ChatSession, bool) {
|
|
session, ok := h.loadChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
|
if !ok {
|
|
return db.ChatSession{}, false
|
|
}
|
|
agent, err := h.Queries.GetAgent(r.Context(), session.AgentID)
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "agent not found")
|
|
return db.ChatSession{}, false
|
|
}
|
|
actorType, actorID := h.resolveActor(r, userID, workspaceID)
|
|
if !h.canAccessPrivateAgent(r.Context(), agent, actorType, actorID, workspaceID) {
|
|
writeError(w, http.StatusForbidden, "you do not have access to this agent")
|
|
return db.ChatSession{}, false
|
|
}
|
|
return session, true
|
|
}
|
|
|
|
func (h *Handler) GetChatSession(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
sessionID := chi.URLParam(r, "sessionId")
|
|
|
|
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, chatSessionToResponse(session))
|
|
}
|
|
|
|
type UpdateChatSessionRequest struct {
|
|
Title *string `json:"title"`
|
|
}
|
|
|
|
// UpdateChatSession updates user-editable fields on a chat session — today
|
|
// just `title`, surfaced by the inline rename affordance in the session
|
|
// dropdown. Title is the only field accepted: `status` is legacy + read-only,
|
|
// agent/creator/workspace are immutable, the resume pointers
|
|
// (session_id / work_dir / runtime_id) are daemon-owned.
|
|
func (h *Handler) UpdateChatSession(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
sessionID := chi.URLParam(r, "sessionId")
|
|
|
|
var req UpdateChatSessionRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
if req.Title == nil {
|
|
writeError(w, http.StatusBadRequest, "title is required")
|
|
return
|
|
}
|
|
title := strings.TrimSpace(*req.Title)
|
|
if title == "" {
|
|
writeError(w, http.StatusBadRequest, "title is required")
|
|
return
|
|
}
|
|
if len([]rune(title)) > chatSessionTitleMaxLen {
|
|
writeError(w, http.StatusBadRequest, "title is too long")
|
|
return
|
|
}
|
|
|
|
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
updated, err := h.Queries.UpdateChatSessionTitle(r.Context(), db.UpdateChatSessionTitleParams{
|
|
ID: session.ID,
|
|
Title: title,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to update chat session")
|
|
return
|
|
}
|
|
|
|
resolvedSessionID := uuidToString(updated.ID)
|
|
h.publishChat(protocol.EventChatSessionUpdated, workspaceID, "member", userID, resolvedSessionID, protocol.ChatSessionUpdatedPayload{
|
|
ChatSessionID: resolvedSessionID,
|
|
Title: updated.Title,
|
|
UpdatedAt: timestampToString(updated.UpdatedAt),
|
|
})
|
|
|
|
writeJSON(w, http.StatusOK, chatSessionToResponse(updated))
|
|
}
|
|
|
|
// DeleteChatSession hard-deletes a chat session owned by the caller. The
|
|
// row lock + cancel + delete run inside a single tx so a concurrent
|
|
// SendChatMessage cannot enqueue a task that would later be orphaned by
|
|
// the FK ON DELETE SET NULL on agent_task_queue.chat_session_id. Cancel
|
|
// failure aborts the delete; events fire only after commit.
|
|
func (h *Handler) DeleteChatSession(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
sessionID := chi.URLParam(r, "sessionId")
|
|
|
|
session, ok := h.loadChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
tx, err := h.TxStarter.Begin(r.Context())
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to start transaction")
|
|
return
|
|
}
|
|
defer tx.Rollback(r.Context())
|
|
qtx := h.Queries.WithTx(tx)
|
|
|
|
// FOR UPDATE on the chat_session row blocks any concurrent INSERT into
|
|
// agent_task_queue that references it (the FK validation needs a
|
|
// KEY SHARE lock). After we commit the delete, the blocked INSERT
|
|
// fails its FK check, so it can't land an orphaned task.
|
|
if _, err := qtx.LockChatSessionForDelete(r.Context(), session.ID); err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
// Already gone — treat as idempotent success.
|
|
w.WriteHeader(http.StatusNoContent)
|
|
return
|
|
}
|
|
writeError(w, http.StatusInternalServerError, "failed to lock chat session")
|
|
return
|
|
}
|
|
|
|
cancelled, err := qtx.CancelAgentTasksByChatSession(r.Context(), session.ID)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to cancel chat session tasks")
|
|
return
|
|
}
|
|
|
|
if err := qtx.DeleteChatSession(r.Context(), db.DeleteChatSessionParams{
|
|
ID: session.ID,
|
|
WorkspaceID: session.WorkspaceID,
|
|
}); err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to delete chat session")
|
|
return
|
|
}
|
|
|
|
if err := tx.Commit(r.Context()); err != nil {
|
|
slog.Warn("commit chat session delete failed", "session_id", sessionID, "error", err)
|
|
writeError(w, http.StatusInternalServerError, "failed to commit chat session delete")
|
|
return
|
|
}
|
|
|
|
// Post-commit broadcasts. Subscribers should never observe events for a
|
|
// tx that didn't actually persist.
|
|
h.TaskService.BroadcastCancelledTasks(r.Context(), cancelled)
|
|
|
|
resolvedSessionID := uuidToString(session.ID)
|
|
h.publishChat(protocol.EventChatSessionDeleted, workspaceID, "member", userID, resolvedSessionID, protocol.ChatSessionDeletedPayload{
|
|
ChatSessionID: resolvedSessionID,
|
|
})
|
|
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Chat Messages
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type SendChatMessageRequest struct {
|
|
Content string `json:"content"`
|
|
AttachmentIDs []string `json:"attachment_ids"`
|
|
}
|
|
|
|
type SendChatMessageResponse struct {
|
|
MessageID string `json:"message_id"`
|
|
TaskID string `json:"task_id"`
|
|
// CreatedAt anchors the chat StatusPill timer the instant the user
|
|
// hits send. Without it the front-end falls back to its local clock
|
|
// and the timer "snaps backwards" later when WS events deliver the
|
|
// real created_at. Returning it here means the pill renders 0s from
|
|
// the start with a stable anchor.
|
|
CreatedAt string `json:"created_at"`
|
|
}
|
|
|
|
func (h *Handler) SendChatMessage(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
sessionID := chi.URLParam(r, "sessionId")
|
|
|
|
var req SendChatMessageRequest
|
|
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
|
writeError(w, http.StatusBadRequest, "invalid request body")
|
|
return
|
|
}
|
|
if req.Content == "" {
|
|
writeError(w, http.StatusBadRequest, "content is required")
|
|
return
|
|
}
|
|
|
|
// Pre-validate attachment ids early so invalid input returns 400 before
|
|
// any state mutation. The actual link runs after CreateChatMessage so we
|
|
// have a message_id to back-fill into the attachment rows.
|
|
attachmentIDs, ok := parseUUIDSliceOrBadRequest(w, req.AttachmentIDs, "attachment_ids")
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Load chat session and re-check the private-agent gate on every send.
|
|
// The session's creator passed the gate at create time, but their
|
|
// workspace role (or the agent's owner) may have changed since — keep
|
|
// stale sessions from being a back-door into a private agent the user
|
|
// can no longer reach. Agent senders bypass to preserve A2A collaboration.
|
|
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
// New archive flow doesn't exist anymore, but legacy rows with
|
|
// status='archived' may still be in the DB from before the feature
|
|
// was removed. Refuse to enqueue new agent work for them — frontend
|
|
// surfaces these as read-only.
|
|
if session.Status != "active" {
|
|
writeError(w, http.StatusBadRequest, "chat session is archived")
|
|
return
|
|
}
|
|
|
|
// Create the user message first so the daemon can always find it.
|
|
msg, err := h.Queries.CreateChatMessage(r.Context(), db.CreateChatMessageParams{
|
|
ChatSessionID: session.ID,
|
|
Role: "user",
|
|
Content: req.Content,
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to create chat message")
|
|
return
|
|
}
|
|
|
|
// Back-fill chat_message_id on attachments that were uploaded against
|
|
// this session while the user was composing. The query only touches rows
|
|
// where chat_session_id matches AND chat_message_id IS NULL, so it cannot
|
|
// rebind an attachment that already belongs to an earlier message.
|
|
if len(attachmentIDs) > 0 {
|
|
if err := h.Queries.LinkAttachmentsToChatMessage(r.Context(), db.LinkAttachmentsToChatMessageParams{
|
|
ChatMessageID: msg.ID,
|
|
ChatSessionID: session.ID,
|
|
Column3: attachmentIDs,
|
|
}); err != nil {
|
|
// Don't fail the send — the message content is already saved and
|
|
// the attachments remain on the session (still downloadable).
|
|
slog.Warn("link chat attachments failed", "error", err, "message_id", uuidToString(msg.ID))
|
|
}
|
|
}
|
|
|
|
// Enqueue a chat task after the message exists.
|
|
task, err := h.TaskService.EnqueueChatTask(r.Context(), session)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to enqueue chat task: "+err.Error())
|
|
return
|
|
}
|
|
|
|
// Touch session updated_at.
|
|
if err := h.Queries.TouchChatSession(r.Context(), session.ID); err != nil {
|
|
slog.Warn("failed to touch chat session", "session_id", sessionID, "error", err)
|
|
}
|
|
taskContext := h.TaskService.AnalyticsContextForTask(r.Context(), task)
|
|
h.Analytics.Capture(analytics.ChatMessageSent(
|
|
userID,
|
|
workspaceID,
|
|
uuidToString(session.ID),
|
|
uuidToString(task.ID),
|
|
uuidToString(session.AgentID),
|
|
taskContext.RuntimeMode,
|
|
taskContext.Provider,
|
|
))
|
|
|
|
// Broadcast the user message.
|
|
resolvedSessionID := uuidToString(session.ID)
|
|
h.publishChat(protocol.EventChatMessage, workspaceID, "member", userID, resolvedSessionID, protocol.ChatMessagePayload{
|
|
ChatSessionID: resolvedSessionID,
|
|
MessageID: uuidToString(msg.ID),
|
|
Role: "user",
|
|
Content: req.Content,
|
|
TaskID: uuidToString(task.ID),
|
|
CreatedAt: timestampToString(msg.CreatedAt),
|
|
})
|
|
|
|
writeJSON(w, http.StatusCreated, SendChatMessageResponse{
|
|
MessageID: uuidToString(msg.ID),
|
|
TaskID: uuidToString(task.ID),
|
|
CreatedAt: timestampToString(task.CreatedAt),
|
|
})
|
|
}
|
|
|
|
func (h *Handler) ListChatMessages(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
sessionID := chi.URLParam(r, "sessionId")
|
|
|
|
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
messages, err := h.Queries.ListChatMessages(r.Context(), session.ID)
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to list chat messages")
|
|
return
|
|
}
|
|
|
|
messageIDs := make([]pgtype.UUID, len(messages))
|
|
for i, m := range messages {
|
|
messageIDs[i] = m.ID
|
|
}
|
|
groupedAtt := h.groupChatMessageAttachments(r.Context(), workspaceID, messageIDs)
|
|
|
|
resp := make([]ChatMessageResponse, len(messages))
|
|
for i, m := range messages {
|
|
resp[i] = chatMessageToResponse(m, groupedAtt[uuidToString(m.ID)])
|
|
}
|
|
writeJSON(w, http.StatusOK, resp)
|
|
}
|
|
|
|
// PendingChatTaskResponse is returned by GetPendingChatTask — either the
|
|
// current in-flight task's id/status, or an empty object when none is active.
|
|
// CreatedAt is the anchor the frontend uses to time the chat StatusPill
|
|
// (elapsed seconds = now - CreatedAt). It must come from the server because
|
|
// optimistic seeds don't have a real task created_at and the timer needs to
|
|
// survive refresh / reopen.
|
|
type PendingChatTaskResponse struct {
|
|
TaskID string `json:"task_id,omitempty"`
|
|
Status string `json:"status,omitempty"`
|
|
CreatedAt string `json:"created_at,omitempty"`
|
|
}
|
|
|
|
// MarkChatSessionRead clears the session's unread_since (→ has_unread=false)
|
|
// and broadcasts chat:session_read so other devices of the same user drop
|
|
// their badges.
|
|
func (h *Handler) MarkChatSessionRead(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
sessionID := chi.URLParam(r, "sessionId")
|
|
|
|
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
if err := h.Queries.MarkChatSessionRead(r.Context(), session.ID); err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to mark session read")
|
|
return
|
|
}
|
|
|
|
resolvedSessionID := uuidToString(session.ID)
|
|
h.publishChat(protocol.EventChatSessionRead, workspaceID, "member", userID, resolvedSessionID, protocol.ChatSessionReadPayload{
|
|
ChatSessionID: resolvedSessionID,
|
|
})
|
|
|
|
w.WriteHeader(http.StatusNoContent)
|
|
}
|
|
|
|
// PendingChatTasksResponse is the aggregate view consumed by the FAB.
|
|
type PendingChatTasksResponse struct {
|
|
Tasks []PendingChatTaskItem `json:"tasks"`
|
|
}
|
|
|
|
type PendingChatTaskItem struct {
|
|
TaskID string `json:"task_id"`
|
|
Status string `json:"status"`
|
|
ChatSessionID string `json:"chat_session_id"`
|
|
}
|
|
|
|
// ListPendingChatTasks returns every in-flight chat task owned by the current
|
|
// user in this workspace. Drives the FAB's "running" indicator when the chat
|
|
// window is closed (no per-session query is subscribed). Tasks belonging to
|
|
// private agents the caller has lost access to are dropped from the response.
|
|
func (h *Handler) ListPendingChatTasks(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
|
|
member, ok := h.workspaceMember(w, r, workspaceID)
|
|
if !ok {
|
|
return
|
|
}
|
|
actorType, actorID := h.resolveActor(r, userID, workspaceID)
|
|
allowed, ok := h.accessibleAgentIDs(r.Context(), workspaceID, actorType, actorID, member.Role)
|
|
if !ok {
|
|
writeError(w, http.StatusInternalServerError, "failed to resolve agent access")
|
|
return
|
|
}
|
|
|
|
rows, err := h.Queries.ListPendingChatTasksByCreator(r.Context(), db.ListPendingChatTasksByCreatorParams{
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
CreatorID: parseUUID(userID),
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to list pending chat tasks")
|
|
return
|
|
}
|
|
|
|
// Map session → agent so we can filter without an N+1. The user's own
|
|
// session list is small, so one extra query is cheaper than per-row
|
|
// lookups.
|
|
sessions, err := h.Queries.ListAllChatSessionsByCreator(r.Context(), db.ListAllChatSessionsByCreatorParams{
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
CreatorID: parseUUID(userID),
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusInternalServerError, "failed to resolve chat session agents")
|
|
return
|
|
}
|
|
sessionAgent := make(map[string]string, len(sessions))
|
|
for _, s := range sessions {
|
|
sessionAgent[uuidToString(s.ID)] = uuidToString(s.AgentID)
|
|
}
|
|
|
|
items := make([]PendingChatTaskItem, 0, len(rows))
|
|
for _, row := range rows {
|
|
sessionID := uuidToString(row.ChatSessionID)
|
|
agentID, hasAgent := sessionAgent[sessionID]
|
|
if !hasAgent {
|
|
continue
|
|
}
|
|
if _, ok := allowed[agentID]; !ok {
|
|
continue
|
|
}
|
|
items = append(items, PendingChatTaskItem{
|
|
TaskID: uuidToString(row.TaskID),
|
|
Status: row.Status,
|
|
ChatSessionID: sessionID,
|
|
})
|
|
}
|
|
writeJSON(w, http.StatusOK, PendingChatTasksResponse{Tasks: items})
|
|
}
|
|
|
|
// GetPendingChatTask returns the most recent in-flight task (queued / dispatched
|
|
// / running) for a chat session. The frontend polls this on mount / session
|
|
// switch so pending UI state survives refresh and reopen.
|
|
func (h *Handler) GetPendingChatTask(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
sessionID := chi.URLParam(r, "sessionId")
|
|
|
|
session, ok := h.gateChatSessionForUser(w, r, userID, workspaceID, sessionID)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
task, err := h.Queries.GetPendingChatTask(r.Context(), session.ID)
|
|
if err != nil {
|
|
// No in-flight task — return an empty object, not an error.
|
|
writeJSON(w, http.StatusOK, PendingChatTaskResponse{})
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, PendingChatTaskResponse{
|
|
TaskID: uuidToString(task.ID),
|
|
Status: task.Status,
|
|
CreatedAt: timestampToString(task.CreatedAt),
|
|
})
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Task cancellation (user-facing, with ownership check)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// CancelTaskByUser cancels a task after verifying the requesting user owns
|
|
// the associated chat session or issue within the current workspace.
|
|
func (h *Handler) CancelTaskByUser(w http.ResponseWriter, r *http.Request) {
|
|
userID, ok := requireUserID(w, r)
|
|
if !ok {
|
|
return
|
|
}
|
|
workspaceID := ctxWorkspaceID(r.Context())
|
|
taskID := chi.URLParam(r, "taskId")
|
|
taskUUID, ok := parseUUIDOrBadRequest(w, taskID, "task id")
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
task, err := h.Queries.GetAgentTask(r.Context(), taskUUID)
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
|
|
// Verify ownership: for chat tasks, check workspace + creator;
|
|
// for issue tasks, verify the issue belongs to the current workspace.
|
|
if task.ChatSessionID.Valid {
|
|
cs, err := h.Queries.GetChatSessionInWorkspace(r.Context(), db.GetChatSessionInWorkspaceParams{
|
|
ID: task.ChatSessionID,
|
|
WorkspaceID: parseUUID(workspaceID),
|
|
})
|
|
if err != nil {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
if uuidToString(cs.CreatorID) != userID {
|
|
writeError(w, http.StatusForbidden, "not your task")
|
|
return
|
|
}
|
|
} else if task.IssueID.Valid {
|
|
issue, err := h.Queries.GetIssue(r.Context(), task.IssueID)
|
|
if err != nil || uuidToString(issue.WorkspaceID) != workspaceID {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
} else {
|
|
writeError(w, http.StatusNotFound, "task not found")
|
|
return
|
|
}
|
|
|
|
cancelled, err := h.TaskService.CancelTask(r.Context(), taskUUID)
|
|
if err != nil {
|
|
writeError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
|
|
writeJSON(w, http.StatusOK, taskToResponse(*cancelled, workspaceID))
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Response types & helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
type ChatSessionResponse struct {
|
|
ID string `json:"id"`
|
|
WorkspaceID string `json:"workspace_id"`
|
|
AgentID string `json:"agent_id"`
|
|
CreatorID string `json:"creator_id"`
|
|
Title string `json:"title"`
|
|
Status string `json:"status"`
|
|
// Only populated by list endpoints — single-session fetches return false.
|
|
HasUnread bool `json:"has_unread"`
|
|
CreatedAt string `json:"created_at"`
|
|
UpdatedAt string `json:"updated_at"`
|
|
}
|
|
|
|
type ChatMessageResponse struct {
|
|
ID string `json:"id"`
|
|
ChatSessionID string `json:"chat_session_id"`
|
|
Role string `json:"role"`
|
|
Content string `json:"content"`
|
|
TaskID *string `json:"task_id"`
|
|
CreatedAt string `json:"created_at"`
|
|
// FailureReason flags an assistant row synthesized by FailTask's chat
|
|
// fallback. Front-end uses it to switch to the destructive bubble.
|
|
FailureReason *string `json:"failure_reason"`
|
|
// ElapsedMs is the wall-clock duration from task creation to terminal
|
|
// state. Drives "Replied in 38s" / "Failed after 12s" captions.
|
|
ElapsedMs *int64 `json:"elapsed_ms"`
|
|
// Attachments linked to this message via chat_message_id. The chat
|
|
// bubble renders file cards from these, and the daemon claim path
|
|
// (daemon.go) pulls structured metadata from the same source so the
|
|
// agent can `multica attachment download <id>` rather than guessing
|
|
// from a markdown URL that may expire.
|
|
Attachments []AttachmentResponse `json:"attachments,omitempty"`
|
|
}
|
|
|
|
func chatSessionToResponse(s db.ChatSession) ChatSessionResponse {
|
|
return ChatSessionResponse{
|
|
ID: uuidToString(s.ID),
|
|
WorkspaceID: uuidToString(s.WorkspaceID),
|
|
AgentID: uuidToString(s.AgentID),
|
|
CreatorID: uuidToString(s.CreatorID),
|
|
Title: s.Title,
|
|
Status: s.Status,
|
|
CreatedAt: timestampToString(s.CreatedAt),
|
|
UpdatedAt: timestampToString(s.UpdatedAt),
|
|
}
|
|
}
|
|
|
|
func chatMessageToResponse(m db.ChatMessage, attachments []AttachmentResponse) ChatMessageResponse {
|
|
return ChatMessageResponse{
|
|
ID: uuidToString(m.ID),
|
|
ChatSessionID: uuidToString(m.ChatSessionID),
|
|
Role: m.Role,
|
|
Content: m.Content,
|
|
TaskID: uuidToPtr(m.TaskID),
|
|
CreatedAt: timestampToString(m.CreatedAt),
|
|
FailureReason: textToPtr(m.FailureReason),
|
|
ElapsedMs: int8ToPtr(m.ElapsedMs),
|
|
Attachments: attachments,
|
|
}
|
|
}
|