From 50a48cef1e24b726ef426220882b75e07cfbec05 Mon Sep 17 00:00:00 2001 From: Bohan Jiang <52446949+Bohan-J@users.noreply.github.com> Date: Tue, 30 Jun 2026 16:48:13 +0800 Subject: [PATCH] feat(slack): unified `multica chat history` pull for channel backfill (MUL-3871) (#4747) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(slack): add unified `multica chat history` pull for channel backfill (MUL-3871) Agents @mentioned in a Slack thread/channel only saw the triggering message, never the prior conversation (GitHub #4717). Instead of force-assembling a recent-context block on every inbound (the Feishu approach), expose a single channel-agnostic pull command the agent runs on demand. - channel: normalized HistoryMessage/HistoryPage/HistoryOptions vocab so the agent sees one shape regardless of platform. - slack.History: resolves session -> binding -> installation -> bot token and reads conversations.replies (real thread) or conversations.history (DM / top-level channel, capturing sibling messages). thread_ts is recorded on the binding config at session creation to pick the right call. - handler GET /api/chat/history: authorized purely by the task-scoped token (stamped X-Task-ID -> the task's own chat session), so an agent can only read the conversation it is currently running for. - multica chat history CLI command (no args; same for every channel). - buildChatPrompt nudge so the agent discovers the command. Feishu is intentionally untouched. Adding a platform = implement the reader. Co-authored-by: multica-agent * fix(slack): require task-token actor source on chat history endpoint Niko's review caught a privilege-boundary hole: the endpoint trusted X-Task-ID, but it is mounted under the general Auth group where a normal JWT / mul_ PAT request does NOT strip a client-forged X-Task-ID — only the mat_ task-token branch stamps it. A workspace member who knew a chat task id could forge the header and read that task's Slack channel/DM/thread history. Gate on the server-set X-Actor-Source == "task_token" (the Auth middleware deletes any client-supplied value and re-stamps it only on the mat_ branch), then trust X-Task-ID. Adds a regression test: a forged X-Task-ID without the task-token actor source is rejected with 403 and never reaches the reader. Co-authored-by: multica-agent * fix(slack): thread-first history for follow-ups, channel for first turn (MUL-3871) A Slack conversation has two nested histories: the surrounding channel and the agent's own thread (the bot's first reply opens a thread on the @mention). The first version picked replies-vs-history from a thread_ts fixed at session creation, so a session started by a top-level @mention always read CHANNEL history — even on follow-ups inside the bot's thread, which should read THREAD history first. - Add a HistoryScope (auto|thread|channel). The handler resolves auto: first turn (no prior bot reply) -> channel; follow-up -> thread. The agent can override with --scope channel|thread, and the response reports the scope read. - The thread root is derived from the binding (last_thread_id / composite-key suffix), available for every engaged group session, instead of the creation-time thread_ts (now removed from the binding config). - A DM degrades a thread request to channel history (DMs have no threads). - Prompt guidance + CLI help updated to explain the policy. Tests: scope selection (thread/channel/DM-fallback/no-root), root derivation, and handler auto-resolution (first->channel, follow-up->thread, explicit override). Co-authored-by: multica-agent --------- Co-authored-by: J Co-authored-by: multica-agent --- server/cmd/multica/cmd_chat.go | 109 ++++++ server/cmd/multica/main.go | 2 + server/cmd/server/router.go | 12 + server/internal/daemon/prompt.go | 7 + server/internal/daemon/prompt_test.go | 5 + server/internal/handler/chat_history.go | 183 ++++++++++ server/internal/handler/chat_history_test.go | 275 ++++++++++++++ server/internal/handler/handler.go | 7 +- .../internal/integrations/channel/history.go | 100 ++++++ server/internal/integrations/slack/history.go | 334 ++++++++++++++++++ .../integrations/slack/history_test.go | 253 +++++++++++++ .../internal/integrations/slack/resolvers.go | 5 + 12 files changed, 1291 insertions(+), 1 deletion(-) create mode 100644 server/cmd/multica/cmd_chat.go create mode 100644 server/internal/handler/chat_history.go create mode 100644 server/internal/handler/chat_history_test.go create mode 100644 server/internal/integrations/channel/history.go create mode 100644 server/internal/integrations/slack/history.go create mode 100644 server/internal/integrations/slack/history_test.go diff --git a/server/cmd/multica/cmd_chat.go b/server/cmd/multica/cmd_chat.go new file mode 100644 index 000000000..7a30d9d71 --- /dev/null +++ b/server/cmd/multica/cmd_chat.go @@ -0,0 +1,109 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "os" + "strconv" + + "github.com/spf13/cobra" + + "github.com/multica-ai/multica/server/internal/cli" +) + +var chatCmd = &cobra.Command{ + Use: "chat", + Short: "Work with the current chat conversation", +} + +var chatHistoryCmd = &cobra.Command{ + Use: "history", + Short: "Read prior messages from the chat channel this conversation came from", + Long: `Read the earlier messages of the chat channel (e.g. a Slack thread, channel, +or DM) that this conversation is connected to. + +When you are @mentioned in a Slack thread or channel you only receive the one +triggering message — not what was said before it. Run this to pull the +surrounding conversation so you understand the full context. + +A conversation has two nested histories: the surrounding CHANNEL and your own +THREAD within it (your first reply opens a thread on the @mention). By default +(--scope auto) the server reads the channel on your first reply — where the +prior context lives — and your thread on follow-ups. Use --scope channel to pull +the wider channel during a follow-up when the thread alone is not enough, or +--scope thread to force the thread. + +It is the SAME command regardless of which channel the conversation came from; +the server hides the per-platform differences. It reads only the conversation +you are currently running for — it cannot read any other session or channel.`, + Args: cobra.NoArgs, + RunE: runChatHistory, +} + +func init() { + chatHistoryCmd.Flags().String("scope", "auto", "Which history to read: auto, thread, or channel") + chatHistoryCmd.Flags().Int("limit", 0, "Maximum number of messages to return (the server clamps the range)") + chatHistoryCmd.Flags().String("before", "", "Opaque cursor (a next_cursor from a prior page) to read older messages") + chatHistoryCmd.Flags().String("output", "json", "Output format: table or json") + chatCmd.AddCommand(chatHistoryCmd) +} + +func runChatHistory(cmd *cobra.Command, _ []string) error { + client, err := newAPIClient(cmd) + if err != nil { + return err + } + + ctx, cancel := cli.APIContext(context.Background()) + defer cancel() + + scope, _ := cmd.Flags().GetString("scope") + limit, _ := cmd.Flags().GetInt("limit") + before, _ := cmd.Flags().GetString("before") + + q := url.Values{} + if scope != "" && scope != "auto" { + q.Set("scope", scope) + } + if limit > 0 { + q.Set("limit", strconv.Itoa(limit)) + } + if before != "" { + q.Set("before", before) + } + path := "/api/chat/history" + if encoded := q.Encode(); encoded != "" { + path += "?" + encoded + } + + var resp map[string]any + if err := client.GetJSON(ctx, path, &resp); err != nil { + return fmt.Errorf("read chat history: %w", err) + } + + output, _ := cmd.Flags().GetString("output") + if output == "table" { + if note := strVal(resp, "note"); note != "" { + fmt.Fprintln(os.Stdout, note) + return nil + } + if s := strVal(resp, "scope"); s != "" { + fmt.Fprintf(os.Stdout, "scope: %s\n", s) + } + msgs, _ := resp["messages"].([]any) + headers := []string{"TS", "ROLE", "AUTHOR", "TEXT"} + rows := make([][]string, 0, len(msgs)) + for _, mi := range msgs { + m, ok := mi.(map[string]any) + if !ok { + continue + } + rows = append(rows, []string{strVal(m, "ts"), strVal(m, "role"), strVal(m, "author"), strVal(m, "text")}) + } + cli.PrintTable(os.Stdout, headers, rows) + return nil + } + + return cli.PrintJSON(os.Stdout, resp) +} diff --git a/server/cmd/multica/main.go b/server/cmd/multica/main.go index 6d4169ce9..e0415ba82 100644 --- a/server/cmd/multica/main.go +++ b/server/cmd/multica/main.go @@ -52,6 +52,7 @@ func init() { repoCmd.GroupID = groupCore skillCmd.GroupID = groupCore squadCmd.GroupID = groupCore + chatCmd.GroupID = groupCore // Runtime commands daemonCmd.GroupID = groupRuntime @@ -76,6 +77,7 @@ func init() { rootCmd.AddCommand(repoCmd) rootCmd.AddCommand(skillCmd) rootCmd.AddCommand(squadCmd) + rootCmd.AddCommand(chatCmd) rootCmd.AddCommand(daemonCmd) rootCmd.AddCommand(runtimeCmd) rootCmd.AddCommand(authCmd) diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index f24d48d4c..9cc67c007 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -464,6 +464,11 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus channelRouter.Register(slack.TypeSlack, slack.NewSlackResolverSet(queries, pool, slackReplier, slackTyping)) slack.NewOutbound(queries, box.Open, slog.Default()).Register(bus) + // On-demand history reader behind the unified `multica chat history` + // command (MUL-3871): pull the session's Slack conversation when the + // agent asks, instead of force-assembling it on every inbound. + h.SlackHistory = slack.NewHistory(queries, box.Open, slog.Default()) + // Per-installation inbound: the Supervisor builds + supervises one // Socket Mode connection per active Slack installation, authenticated // with that installation's OWN app-level token (xapp-, pasted at BYO @@ -1139,6 +1144,13 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus }) r.Get("/api/chat/pending-tasks", h.ListPendingChatTasks) + // Agent-facing unified history read: `multica chat history` resolves + // the caller's task-scoped token to its own chat session and returns + // the bound channel's prior messages (MUL-3871). No session id is + // passed — the token IS the scope, so an agent can only read its own + // conversation. + r.Get("/api/chat/history", h.GetChatChannelHistory) + // Inbox r.Route("/api/inbox", func(r chi.Router) { r.Get("/", h.ListInbox) diff --git a/server/internal/daemon/prompt.go b/server/internal/daemon/prompt.go index 3ccf8a164..b29ea1958 100644 --- a/server/internal/daemon/prompt.go +++ b/server/internal/daemon/prompt.go @@ -190,6 +190,13 @@ func buildChatPrompt(task Task) string { var b strings.Builder b.WriteString("You are running as a chat assistant for a Multica workspace.\n") b.WriteString("A user is chatting with you directly. Respond to their message.\n\n") + // Discoverability nudge for the on-demand channel-history pull (MUL-3871). + // When this conversation came from an IM channel (e.g. Slack) the message + // below may be only the line that triggered the agent, not the surrounding + // thread/channel discussion. The agent has no other signal that earlier + // context exists, so point it at the unified command; it returns an empty + // list (with a note) for web-only sessions, so an always-on hint is safe. + b.WriteString("If this conversation came from a chat channel (e.g. a Slack thread, channel, or DM), the message below may be only what triggered you — not what was said earlier. Run `multica chat history --output json` before replying to read the prior conversation. It auto-selects the right scope — the surrounding channel on your first reply (where earlier context lives), your own thread on follow-ups — and you can add `--scope channel` to pull the wider channel when a follow-up needs more. It returns an empty list if there is no connected channel.\n\n") if task.Agent != nil && len(task.Agent.Skills) > 0 { refs := ExtractSlashSkills(task.ChatMessage) if len(refs) > 0 { diff --git a/server/internal/daemon/prompt_test.go b/server/internal/daemon/prompt_test.go index 00ffcc3af..9979e3824 100644 --- a/server/internal/daemon/prompt_test.go +++ b/server/internal/daemon/prompt_test.go @@ -286,6 +286,11 @@ func TestBuildChatPromptSlashSkills(t *testing.T) { if !strings.Contains(out, "User message:\nplease [/deploy](slash://skill/abc-123) this") { t.Fatalf("expected raw user message preserved, got:\n%s", out) } + // Every chat prompt points the agent at the on-demand channel-history + // pull so it can recover Slack thread/channel context (MUL-3871). + if !strings.Contains(out, "multica chat history") { + t.Fatalf("expected channel-history nudge, got:\n%s", out) + } }) t.Run("ignores skills not belonging to agent", func(t *testing.T) { diff --git a/server/internal/handler/chat_history.go b/server/internal/handler/chat_history.go new file mode 100644 index 000000000..1b0d63d1e --- /dev/null +++ b/server/internal/handler/chat_history.go @@ -0,0 +1,183 @@ +package handler + +import ( + "context" + "errors" + "log/slog" + "net/http" + "strconv" + + "github.com/jackc/pgx/v5/pgtype" + + "github.com/multica-ai/multica/server/internal/integrations/channel" + "github.com/multica-ai/multica/server/internal/integrations/slack" + "github.com/multica-ai/multica/server/internal/logger" + "github.com/multica-ai/multica/server/internal/util" +) + +// ChatChannelHistoryReader reads a chat session's bound IM-channel history. The +// Slack reader (slack.History) satisfies it; a future platform registers its +// own. Defined here as a narrow interface so the handler stays testable and so +// the channel-agnostic contract — one shape regardless of platform — is enforced +// at the boundary (MUL-3871). +type ChatChannelHistoryReader interface { + Fetch(ctx context.Context, chatSessionID pgtype.UUID, opts channel.HistoryOptions) (channel.HistoryPage, error) +} + +// ChatChannelHistoryResponse is the unified `multica chat history` payload. It +// is the SAME shape no matter which channel backs the session — the agent never +// sees a per-platform API. +type ChatChannelHistoryResponse struct { + ChannelType string `json:"channel_type"` + Scope channel.HistoryScope `json:"scope,omitempty"` + Messages []channel.HistoryMessage `json:"messages"` + NextCursor string `json:"next_cursor,omitempty"` + // Note carries a human-readable explanation when there is no history to + // read (e.g. the session is not connected to a chat channel), so the agent + // gets a clear answer instead of a bare empty list. + Note string `json:"note,omitempty"` +} + +// GetChatChannelHistory serves the agent-facing `multica chat history` command. +// It is authorized by the task-scoped token alone: middleware stamps the token's +// task into X-Task-ID (the client cannot forge it), and the endpoint reads the +// history of THAT task's chat session — so an agent can only ever read the +// conversation it is currently running for, never an arbitrary session/channel. +func (h *Handler) GetChatChannelHistory(w http.ResponseWriter, r *http.Request) { + // X-Actor-Source is server-set only: the Auth middleware deletes any + // client-supplied value and re-stamps "task_token" ONLY on the mat_ task + // token branch (along with the authoritative X-Task-ID). A normal JWT / mul_ + // PAT request leaves it empty and does NOT strip a client-forged X-Task-ID, + // so this gate is load-bearing: without it a member could forge X-Task-ID and + // read another session's channel history. Require the task-token actor here, + // THEN trust X-Task-ID. + if r.Header.Get("X-Actor-Source") != "task_token" { + writeError(w, http.StatusForbidden, "chat history is only available from within an agent task") + return + } + taskIDHeader := r.Header.Get("X-Task-ID") + if taskIDHeader == "" { + writeError(w, http.StatusBadRequest, "missing task context") + return + } + taskUUID, err := util.ParseUUID(taskIDHeader) + if err != nil { + writeError(w, http.StatusBadRequest, "invalid task id") + return + } + task, err := h.Queries.GetAgentTask(r.Context(), taskUUID) + if err != nil { + writeError(w, http.StatusNotFound, "task not found") + return + } + if !task.ChatSessionID.Valid { + writeError(w, http.StatusBadRequest, "this task is not a chat task") + return + } + // Defense in depth: load the session and confirm it lives in the token's + // stamped workspace. The token→task binding already guarantees the agent can + // only reach its own task here; this makes a future wiring regression fail + // closed instead of leaking another workspace's conversation. + session, err := h.Queries.GetChatSession(r.Context(), task.ChatSessionID) + if err != nil { + writeError(w, http.StatusNotFound, "chat session not found") + return + } + if ws := ctxWorkspaceID(r.Context()); ws != "" && uuidToString(session.WorkspaceID) != ws { + writeError(w, http.StatusForbidden, "chat session does not belong to this workspace") + return + } + + scope := parseHistoryScope(r.URL.Query().Get("scope")) + if scope == channel.HistoryScopeAuto { + // First turn — the bot has not replied yet, so no thread exists — reads + // the surrounding channel (where the prior context lives). A follow-up + // reads the agent's own thread. The agent can override with + // ?scope=channel|thread. + if h.chatSessionHasBotReply(r.Context(), task.ChatSessionID) { + scope = channel.HistoryScopeThread + } else { + scope = channel.HistoryScopeChannel + } + } + opts := channel.HistoryOptions{ + Scope: scope, + Limit: parseHistoryLimit(r.URL.Query().Get("limit")), + Before: r.URL.Query().Get("before"), + } + + empty := ChatChannelHistoryResponse{Messages: []channel.HistoryMessage{}} + if h.SlackHistory == nil { + empty.Note = "No chat channel integration is configured on this server." + writeJSON(w, http.StatusOK, empty) + return + } + + page, err := h.SlackHistory.Fetch(r.Context(), task.ChatSessionID, opts) + if err != nil { + if errors.Is(err, slack.ErrNoSlackSession) { + empty.Note = "This conversation is not connected to a chat channel, so there is no prior channel history to read." + writeJSON(w, http.StatusOK, empty) + return + } + slog.Error("chat channel history fetch failed", append(logger.RequestAttrs(r), + "error", err, "chat_session_id", uuidToString(task.ChatSessionID))...) + writeError(w, http.StatusBadGateway, "failed to read channel history") + return + } + + messages := page.Messages + if messages == nil { + messages = []channel.HistoryMessage{} + } + writeJSON(w, http.StatusOK, ChatChannelHistoryResponse{ + ChannelType: page.ChannelType, + Scope: page.Scope, + Messages: messages, + NextCursor: page.NextCursor, + }) +} + +// parseHistoryScope maps the ?scope query value to a HistoryScope, defaulting to +// auto for empty / unknown values. +func parseHistoryScope(raw string) channel.HistoryScope { + switch channel.HistoryScope(raw) { + case channel.HistoryScopeThread: + return channel.HistoryScopeThread + case channel.HistoryScopeChannel: + return channel.HistoryScopeChannel + default: + return channel.HistoryScopeAuto + } +} + +// chatSessionHasBotReply reports whether the bot has already replied in this +// session — i.e. this is a follow-up, not the first turn. On Slack the bot's +// first reply opens the thread, so an existing assistant message is the signal +// that a thread worth reading exists. Best-effort: a query error defaults to +// false (treat as first turn → channel), the safe, context-rich choice. +func (h *Handler) chatSessionHasBotReply(ctx context.Context, sessionID pgtype.UUID) bool { + msgs, err := h.Queries.ListChatMessages(ctx, sessionID) + if err != nil { + return false + } + for _, m := range msgs { + if m.Role == "assistant" { + return true + } + } + return false +} + +// parseHistoryLimit reads the ?limit query param, ignoring junk (the reader +// clamps the range). 0 means "use the reader's default". +func parseHistoryLimit(raw string) int { + if raw == "" { + return 0 + } + n, err := strconv.Atoi(raw) + if err != nil || n < 0 { + return 0 + } + return n +} diff --git a/server/internal/handler/chat_history_test.go b/server/internal/handler/chat_history_test.go new file mode 100644 index 000000000..0e2f43e03 --- /dev/null +++ b/server/internal/handler/chat_history_test.go @@ -0,0 +1,275 @@ +package handler + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/jackc/pgx/v5/pgtype" + + "github.com/multica-ai/multica/server/internal/integrations/channel" + "github.com/multica-ai/multica/server/internal/integrations/slack" +) + +type fakeChatHistoryReader struct { + page channel.HistoryPage + err error + gotSession pgtype.UUID + gotOpts channel.HistoryOptions +} + +func (f *fakeChatHistoryReader) Fetch(_ context.Context, sid pgtype.UUID, opts channel.HistoryOptions) (channel.HistoryPage, error) { + f.gotSession = sid + f.gotOpts = opts + return f.page, f.err +} + +// newChatHistoryTask inserts a chat task bound to a fresh chat session and +// returns the task id and (for chat tasks) the session id. With +// chatSession=false it inserts a non-chat task and an empty session id. +func newChatHistoryTask(t *testing.T, chatSession bool) (taskID, sessionID string) { + t.Helper() + agentID := createHandlerTestAgent(t, "ChatHistoryAgent", []byte("[]")) + runtimeID := handlerTestRuntimeID(t) + var sessionArg any + if chatSession { + sessionID = createHandlerTestChatSession(t, agentID) + sessionArg = sessionID + } + if err := testPool.QueryRow(context.Background(), ` + INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, chat_session_id) + VALUES ($1, $2, 'completed', 0, $3) + RETURNING id + `, agentID, runtimeID, sessionArg).Scan(&taskID); err != nil { + t.Fatalf("insert chat history task: %v", err) + } + t.Cleanup(func() { + testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID) + }) + return taskID, sessionID +} + +// addAssistantMessage records a prior bot reply in the session, so the endpoint +// classifies the next read as a follow-up. The chat_session cleanup cascades to +// chat_message, so no separate cleanup is needed. +func addAssistantMessage(t *testing.T, sessionID string) { + t.Helper() + if _, err := testPool.Exec(context.Background(), + `INSERT INTO chat_message (chat_session_id, role, content) VALUES ($1, 'assistant', 'prior reply')`, + sessionID); err != nil { + t.Fatalf("insert assistant message: %v", err) + } +} + +// taskActorRequest builds a /api/chat/history request as the Auth middleware +// would leave it for a mat_ task token: the server-set X-Actor-Source=task_token +// plus the authoritative X-Task-ID. +func taskActorRequest(taskID string) *http.Request { + req := newRequest("GET", "/api/chat/history", nil) + req.Header.Set("X-Actor-Source", "task_token") + req.Header.Set("X-Task-ID", taskID) + return req +} + +func withSlackHistory(t *testing.T, r ChatChannelHistoryReader) { + t.Helper() + orig := testHandler.SlackHistory + testHandler.SlackHistory = r + t.Cleanup(func() { testHandler.SlackHistory = orig }) +} + +func TestGetChatChannelHistory_Success(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + taskID, _ := newChatHistoryTask(t, true) + fake := &fakeChatHistoryReader{page: channel.HistoryPage{ + ChannelType: "slack", + Messages: []channel.HistoryMessage{ + {ID: "100", Author: "Alice", Role: channel.HistoryRoleUser, Text: "alert", TS: "100"}, + {ID: "101", Author: "Bot", Role: channel.HistoryRoleAssistant, Text: "on it", TS: "101"}, + }, + NextCursor: "100", + }} + withSlackHistory(t, fake) + + req := taskActorRequest(taskID) + req.URL.RawQuery = "limit=10" + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, req) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String()) + } + var resp ChatChannelHistoryResponse + if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil { + t.Fatalf("decode: %v", err) + } + if resp.ChannelType != "slack" || len(resp.Messages) != 2 || resp.NextCursor != "100" { + t.Fatalf("unexpected response: %+v", resp) + } + if !fake.gotSession.Valid { + t.Errorf("reader was not called with a session id") + } +} + +func TestGetChatChannelHistory_NoBindingReturnsNote(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + taskID, _ := newChatHistoryTask(t, true) + withSlackHistory(t, &fakeChatHistoryReader{err: slack.ErrNoSlackSession}) + + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, taskActorRequest(taskID)) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String()) + } + var resp ChatChannelHistoryResponse + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp.Note == "" || len(resp.Messages) != 0 { + t.Fatalf("expected empty messages + a note, got %+v", resp) + } +} + +func TestGetChatChannelHistory_NilReaderReturnsNote(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + taskID, _ := newChatHistoryTask(t, true) + withSlackHistory(t, nil) + + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, taskActorRequest(taskID)) + + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String()) + } + var resp ChatChannelHistoryResponse + _ = json.Unmarshal(w.Body.Bytes(), &resp) + if resp.Note == "" { + t.Fatalf("expected a note when no reader configured, got %+v", resp) + } +} + +// TestGetChatChannelHistory_RejectsForgedTaskID is the security regression test +// for Niko's must-fix: a normal request (no server-set X-Actor-Source) that +// forges X-Task-ID — exactly what a workspace member could do with a JWT / mul_ +// PAT, since the Auth middleware does NOT strip a client-sent X-Task-ID — must be +// rejected, never served another session's history. +func TestGetChatChannelHistory_RejectsForgedTaskID(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + taskID, _ := newChatHistoryTask(t, true) + fake := &fakeChatHistoryReader{page: channel.HistoryPage{ChannelType: "slack"}} + withSlackHistory(t, fake) + + req := newRequest("GET", "/api/chat/history", nil) + req.Header.Set("X-Task-ID", taskID) // forged: no X-Actor-Source=task_token + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, req) + + if w.Code != http.StatusForbidden { + t.Fatalf("status = %d, want 403", w.Code) + } + if fake.gotSession.Valid { + t.Fatalf("reader must not be called for a forged X-Task-ID") + } +} + +func TestGetChatChannelHistory_MissingTaskHeader(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + // Task-token actor source but no X-Task-ID: a defensive 400 (the mat_ branch + // always stamps both, so this should not happen in practice). + req := newRequest("GET", "/api/chat/history", nil) + req.Header.Set("X-Actor-Source", "task_token") + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, req) + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400", w.Code) + } +} + +func TestGetChatChannelHistory_NonChatTask(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + taskID, _ := newChatHistoryTask(t, false) // task with no chat_session_id + withSlackHistory(t, &fakeChatHistoryReader{}) + + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, taskActorRequest(taskID)) + if w.Code != http.StatusBadRequest { + t.Fatalf("status = %d, want 400: %s", w.Code, w.Body.String()) + } +} + +// TestGetChatChannelHistory_AutoFirstTurnReadsChannel: with no prior bot reply, +// scope=auto resolves to channel (the surrounding context before the thread). +func TestGetChatChannelHistory_AutoFirstTurnReadsChannel(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + taskID, _ := newChatHistoryTask(t, true) // no assistant message => first turn + fake := &fakeChatHistoryReader{page: channel.HistoryPage{ChannelType: "slack", Scope: channel.HistoryScopeChannel}} + withSlackHistory(t, fake) + + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, taskActorRequest(taskID)) // no ?scope => auto + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String()) + } + if fake.gotOpts.Scope != channel.HistoryScopeChannel { + t.Fatalf("auto first-turn scope = %q, want channel", fake.gotOpts.Scope) + } +} + +// TestGetChatChannelHistory_AutoFollowUpReadsThread: once the bot has replied, +// scope=auto resolves to thread. +func TestGetChatChannelHistory_AutoFollowUpReadsThread(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + taskID, sessionID := newChatHistoryTask(t, true) + addAssistantMessage(t, sessionID) // bot already replied => follow-up + fake := &fakeChatHistoryReader{page: channel.HistoryPage{ChannelType: "slack", Scope: channel.HistoryScopeThread}} + withSlackHistory(t, fake) + + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, taskActorRequest(taskID)) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String()) + } + if fake.gotOpts.Scope != channel.HistoryScopeThread { + t.Fatalf("auto follow-up scope = %q, want thread", fake.gotOpts.Scope) + } +} + +// TestGetChatChannelHistory_ExplicitChannelScope: ?scope=channel overrides the +// auto default even on a follow-up. +func TestGetChatChannelHistory_ExplicitChannelScope(t *testing.T) { + if testHandler == nil { + t.Skip("requires test database") + } + taskID, sessionID := newChatHistoryTask(t, true) + addAssistantMessage(t, sessionID) // follow-up, but explicit override below + fake := &fakeChatHistoryReader{page: channel.HistoryPage{ChannelType: "slack", Scope: channel.HistoryScopeChannel}} + withSlackHistory(t, fake) + + req := taskActorRequest(taskID) + req.URL.RawQuery = "scope=channel" + w := httptest.NewRecorder() + testHandler.GetChatChannelHistory(w, req) + if w.Code != http.StatusOK { + t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String()) + } + if fake.gotOpts.Scope != channel.HistoryScopeChannel { + t.Fatalf("explicit scope = %q, want channel", fake.gotOpts.Scope) + } +} diff --git a/server/internal/handler/handler.go b/server/internal/handler/handler.go index 0b6afb4b3..1d97efc3c 100644 --- a/server/internal/handler/handler.go +++ b/server/internal/handler/handler.go @@ -185,7 +185,12 @@ type Handler struct { // "link your Slack account" prompt (MUL-3666). Nil unless Slack is // configured (MULTICA_SLACK_SECRET_KEY set). SlackBindingTokens *slack.BindingTokenService - cfg Config + // SlackHistory backs the agent-facing `multica chat history` command: it + // reads a chat session's bound Slack conversation on demand (MUL-3871). Nil + // unless Slack is configured; GetChatChannelHistory then reports "no channel + // integration". A future platform satisfies the same reader interface. + SlackHistory ChatChannelHistoryReader + cfg Config } func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService, store storage.Storage, cfSigner *auth.CloudFrontSigner, analyticsClient analytics.Client, cfg Config, daemonHubs ...*daemonws.Hub) *Handler { diff --git a/server/internal/integrations/channel/history.go b/server/internal/integrations/channel/history.go new file mode 100644 index 000000000..1852a5244 --- /dev/null +++ b/server/internal/integrations/channel/history.go @@ -0,0 +1,100 @@ +package channel + +// This file defines the channel-agnostic vocabulary for ON-DEMAND history +// reads. Unlike the inbound push path (InboundMessage), history is PULLED by +// the agent through a single unified CLI (`multica chat history`): the agent +// asks for "the history of the conversation I'm in" and never sees a +// per-platform API. The server resolves the session's binding to a channel +// type and dispatches to that platform's reader, which returns these +// normalized shapes — so adding a platform is "implement a reader", and the +// agent-facing contract never changes (MUL-3871). + +// HistoryRole is the normalized author kind of a fetched message, mirroring the +// chat_message.role domain the agent already reasons about. +type HistoryRole string + +const ( + // HistoryRoleUser is a human (or a third-party bot, e.g. an alerting bot) + // message — context the agent should read. + HistoryRoleUser HistoryRole = "user" + // HistoryRoleAssistant is one of THIS bot's own prior messages in the + // conversation. + HistoryRoleAssistant HistoryRole = "assistant" +) + +// HistoryScope selects which slice of a conversation to read. A chat platform +// has two nested histories: the surrounding CHANNEL and the agent's own THREAD +// within it (on Slack the bot's first reply opens a thread on the @mention, so +// every engaged conversation has one). The agent's primary read on a follow-up +// is its thread; the wider channel is pulled only when needed. On the first +// turn there is no thread yet, so the channel is the relevant context. +type HistoryScope string + +const ( + // HistoryScopeAuto lets the server pick: the channel on the first turn (no + // thread exists yet), the thread on follow-ups. This is the default. + HistoryScopeAuto HistoryScope = "auto" + // HistoryScopeThread reads the agent's own thread (Slack + // conversations.replies). Falls back to the channel where the platform / + // conversation has no threads (e.g. a DM). + HistoryScopeThread HistoryScope = "thread" + // HistoryScopeChannel reads the surrounding channel (Slack + // conversations.history). + HistoryScopeChannel HistoryScope = "channel" +) + +// HistoryMessage is one normalized message from a conversation's history. It is +// the same shape regardless of platform so the agent reads a uniform list, +// exactly like `multica issue comment list --output json`. +type HistoryMessage struct { + // ID is the platform message identifier (Slack ts, Feishu message_id). + ID string `json:"id"` + // Author is a human-readable display label for the sender ("Alice", + // "Bot", or a positional "User 2" fallback when the name is unresolved). + Author string `json:"author"` + // AuthorID is the platform-native sender id, when available. Empty for + // some platform/bot messages. + AuthorID string `json:"author_id,omitempty"` + // Role distinguishes the bot's own turns from everyone else's. + Role HistoryRole `json:"role"` + // Text is the message body, flattened to plain text by the adapter. + Text string `json:"text"` + // TS is the platform timestamp string, sortable lexicographically within a + // platform (Slack "1700000000.000100"). It doubles as the paging cursor. + TS string `json:"ts"` +} + +// HistoryPage is one normalized page of history plus a cursor for paging +// further back. Messages are ordered OLDEST-FIRST so the transcript reads +// top-to-bottom like the chat does. +type HistoryPage struct { + // ChannelType is the platform the history came from ("slack"). Empty when + // the session is not bound to any channel (a web-only chat session). + ChannelType string `json:"channel_type,omitempty"` + // Scope is the scope actually read ("thread" or "channel") after resolving + // "auto" and any platform fallback (e.g. a DM has no thread). It lets the + // agent know what it got and decide whether to also pull the other scope. + Scope HistoryScope `json:"scope,omitempty"` + // Messages are the fetched messages, oldest-first. + Messages []HistoryMessage `json:"messages"` + // NextCursor, when non-empty, is an opaque cursor to pass as Before to + // page to OLDER messages. Empty means no older messages were available. + NextCursor string `json:"next_cursor,omitempty"` +} + +// HistoryOptions tune a history read. They are platform-neutral; each reader +// maps them onto its own API's paging primitives. +type HistoryOptions struct { + // Scope selects thread vs channel. The handler resolves "auto" to a + // concrete scope before calling the reader (it knows whether this is a + // first turn or a follow-up); the reader still degrades "thread" to channel + // where the conversation has no thread. An empty value reads the channel. + Scope HistoryScope + // Limit caps how many messages to return. A reader clamps it to its + // platform's per-page maximum and applies a sane default for <= 0. + Limit int + // Before is an opaque cursor (a NextCursor from a prior page); the reader + // returns only messages strictly older than it. Empty starts at the most + // recent messages. + Before string +} diff --git a/server/internal/integrations/slack/history.go b/server/internal/integrations/slack/history.go new file mode 100644 index 000000000..df7e407c6 --- /dev/null +++ b/server/internal/integrations/slack/history.go @@ -0,0 +1,334 @@ +package slack + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "sort" + "strconv" + "strings" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/slack-go/slack" + + "github.com/multica-ai/multica/server/internal/integrations/channel" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +// ErrNoSlackSession reports that the chat session has no Slack channel binding — +// it is a Feishu or web-only session. Callers surface it as an empty (not +// failed) history read so the unified `multica chat history` command answers +// gracefully on a non-Slack conversation. +var ErrNoSlackSession = errors.New("slack: session has no slack channel binding") + +const ( + // defaultHistoryLimit is the page size used when the caller asks for none. + defaultHistoryLimit = 20 + // maxHistoryLimit caps a single page. Slack's own conversations.* limit is + // far higher; we self-cap so a pull can't dump an unbounded transcript into + // the agent's context (mirrors the Feishu recent-context clamp). + maxHistoryLimit = 50 +) + +// historyQueries is the slice of generated queries the history reader needs. +// *db.Queries satisfies it. It mirrors outboundQueries: resolve the session's +// Slack binding, then load the installation that owns the bot token. +type historyQueries interface { + GetChannelChatSessionBindingBySession(ctx context.Context, arg db.GetChannelChatSessionBindingBySessionParams) (db.ChannelChatSessionBinding, error) + GetChannelInstallation(ctx context.Context, arg db.GetChannelInstallationParams) (db.ChannelInstallation, error) +} + +// historyClient is the slice of the slack-go Web API the reader calls. The real +// *slack.Client satisfies it; tests inject a fake so the fetch/labeling logic is +// exercised without a live Slack. +type historyClient interface { + GetConversationHistoryContext(ctx context.Context, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) + GetConversationRepliesContext(ctx context.Context, params *slack.GetConversationRepliesParameters) ([]slack.Message, bool, string, error) + GetUsersInfoContext(ctx context.Context, users ...string) (*[]slack.User, error) +} + +// History reads a Slack conversation's prior messages on demand — the pull half +// of the unified `multica chat history` tool (MUL-3871). It mirrors Outbound: +// given a chat_session it finds the Slack binding, decrypts the installation's +// bot token, and calls conversations.replies (a real thread) or +// conversations.history (DM / top-level channel context). Sessions with no +// Slack binding return ErrNoSlackSession, so it coexists with Feishu sessions on +// the shared endpoint. +type History struct { + q historyQueries + decrypt Decrypter + logger *slog.Logger + newClient func(botToken string) historyClient +} + +// NewHistory builds the reader over the generated queries and the bot-token +// decrypter (box.Open at wiring time). +func NewHistory(q historyQueries, decrypt Decrypter, logger *slog.Logger) *History { + if logger == nil { + logger = slog.Default() + } + h := &History{q: q, decrypt: decrypt, logger: logger} + h.newClient = func(botToken string) historyClient { + // Only the bot token is needed to read history; the app-level token is + // for the inbound Socket Mode connection (slack_channel.go). + return slack.New(botToken) + } + return h +} + +// Fetch returns one normalized, oldest-first page of the session's Slack +// conversation. It returns ErrNoSlackSession when the session is not Slack-bound +// or its installation is inactive. +func (h *History) Fetch(ctx context.Context, chatSessionID pgtype.UUID, opts channel.HistoryOptions) (channel.HistoryPage, error) { + binding, err := h.q.GetChannelChatSessionBindingBySession(ctx, db.GetChannelChatSessionBindingBySessionParams{ + ChatSessionID: chatSessionID, + ChannelType: string(TypeSlack), + }) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return channel.HistoryPage{}, ErrNoSlackSession + } + return channel.HistoryPage{}, fmt.Errorf("lookup slack chat binding: %w", err) + } + inst, err := h.q.GetChannelInstallation(ctx, db.GetChannelInstallationParams{ + ID: binding.InstallationID, + ChannelType: string(TypeSlack), + }) + if err != nil { + return channel.HistoryPage{}, fmt.Errorf("load slack installation: %w", err) + } + if inst.Status != "active" { + return channel.HistoryPage{}, ErrNoSlackSession // revoked install: nothing to read + } + creds, err := decodeCredentials(inst.Config, h.decrypt) + if err != nil { + return channel.HistoryPage{}, fmt.Errorf("decode slack credentials: %w", err) + } + channelID, threadRoot := historyTarget(binding) + // Resolve the concrete scope to read. The handler resolves "auto" to + // thread/channel (it knows first-turn vs follow-up); here we additionally + // degrade "thread" to "channel" when there is no thread to read — a DM, or a + // group whose root could not be recovered. + scope := channel.HistoryScopeChannel + if opts.Scope == channel.HistoryScopeThread && + binding.ChatType == string(channel.ChatTypeGroup) && threadRoot != "" { + scope = channel.HistoryScopeThread + } + + limit := opts.Limit + if limit <= 0 { + limit = defaultHistoryLimit + } + if limit > maxHistoryLimit { + limit = maxHistoryLimit + } + + fetchThreadTS := "" + if scope == channel.HistoryScopeThread { + fetchThreadTS = threadRoot + } + client := h.newClient(creds.BotToken) + raw, err := fetchRaw(ctx, client, channelID, fetchThreadTS, opts.Before, limit) + if err != nil { + return channel.HistoryPage{}, fmt.Errorf("read slack history: %w", err) + } + + page := normalizeHistory(ctx, client, h.logger, raw, creds.BotUserID, limit) + page.ChannelType = string(TypeSlack) + page.Scope = scope + return page, nil +} + +// historyTarget recovers the real channel id and the thread root from the +// binding. The channel_chat_id may be a composite "channel:threadRoot" +// isolation key, so the real channel id is read from the binding config +// (slackBindingConfig). The thread root — present for every engaged group +// session, since the bot's first reply opens a thread on the @mention — is the +// recorded reply thread (last_thread_id), falling back to the composite-key +// suffix. It is empty for a DM (no threads). +func historyTarget(b db.ChannelChatSessionBinding) (channelID, threadRoot string) { + channelID = b.ChannelChatID + if len(b.Config) > 0 { + var cfg slackBindingConfig + if err := json.Unmarshal(b.Config, &cfg); err == nil && cfg.ChannelID != "" { + channelID = cfg.ChannelID + } + } + if b.LastThreadID.Valid && b.LastThreadID.String != "" { + threadRoot = b.LastThreadID.String + } else if i := strings.IndexByte(b.ChannelChatID, ':'); i >= 0 { + threadRoot = b.ChannelChatID[i+1:] + } + return channelID, threadRoot +} + +// fetchRaw pulls the most recent `limit` messages older than `before` (exclusive +// when set). A thread read uses conversations.replies anchored on the thread +// root; a channel read uses conversations.history. Both return newest-first; +// ordering is normalized downstream. +func fetchRaw(ctx context.Context, client historyClient, channelID, threadTS, before string, limit int) ([]slack.Message, error) { + if threadTS != "" { + msgs, _, _, err := client.GetConversationRepliesContext(ctx, &slack.GetConversationRepliesParameters{ + ChannelID: channelID, + Timestamp: threadTS, + Latest: before, + Inclusive: false, + Limit: limit, + }) + return msgs, err + } + resp, err := client.GetConversationHistoryContext(ctx, &slack.GetConversationHistoryParameters{ + ChannelID: channelID, + Latest: before, + Inclusive: false, + Limit: limit, + }) + if err != nil { + return nil, err + } + return resp.Messages, nil +} + +// normalizeHistory turns raw Slack messages into a normalized, oldest-first +// page: it resolves human display names in one batch, labels each sender, maps +// the role, and computes the back-paging cursor. +func normalizeHistory(ctx context.Context, client historyClient, logger *slog.Logger, raw []slack.Message, botUserID string, limit int) channel.HistoryPage { + // Oldest-first so the transcript reads top-to-bottom like the chat does. + sort.SliceStable(raw, func(i, j int) bool { return slackTSLess(raw[i].Timestamp, raw[j].Timestamp) }) + + names := resolveUserNames(ctx, client, logger, raw, botUserID) + labeler := newHistoryLabeler(names) + + out := make([]channel.HistoryMessage, 0, len(raw)) + for i := range raw { + m := raw[i] + text := m.Text + if text == "" { + continue // join/system/edit markers carry no readable body + } + own := m.User != "" && m.User == botUserID + role := channel.HistoryRoleUser + if own { + role = channel.HistoryRoleAssistant + } + out = append(out, channel.HistoryMessage{ + ID: m.Timestamp, + Author: labeler.label(m, own), + AuthorID: m.User, + Role: role, + Text: text, + TS: m.Timestamp, + }) + } + + page := channel.HistoryPage{Messages: out} + // Only advertise a cursor when the platform returned a full page (more may + // exist older than the oldest message we just returned). + if len(raw) >= limit && len(out) > 0 { + page.NextCursor = out[0].TS + } + return page +} + +// resolveUserNames batch-resolves the human senders' display names, best-effort. +// A failure (missing users:read scope, transport error) yields a nil map so the +// labeler falls back to positional "User N" rather than blocking the read. +func resolveUserNames(ctx context.Context, client historyClient, logger *slog.Logger, msgs []slack.Message, botUserID string) map[string]string { + seen := make(map[string]bool) + ids := make([]string, 0, len(msgs)) + for i := range msgs { + u := msgs[i].User + if u == "" || u == botUserID || seen[u] { + continue + } + seen[u] = true + ids = append(ids, u) + } + if len(ids) == 0 { + return nil + } + users, err := client.GetUsersInfoContext(ctx, ids...) + if err != nil || users == nil { + if err != nil { + logger.WarnContext(ctx, "slack history: user name resolution failed", "ids", len(ids), "error", err) + } + return nil + } + names := make(map[string]string, len(*users)) + for _, u := range *users { + if name := slackDisplayName(u); name != "" { + names[u.ID] = name + } + } + return names +} + +// slackDisplayName picks the friendliest available name for a Slack user. +func slackDisplayName(u slack.User) string { + switch { + case u.Profile.DisplayName != "": + return u.Profile.DisplayName + case u.RealName != "": + return u.RealName + default: + return u.Name + } +} + +// historyLabeler assigns stable, human-readable labels within one page, mirroring +// the Feishu speakerLabeler: this bot is "Bot"; a resolved human gets their real +// name; an unresolved human falls back to positional "User N"; a third-party bot +// uses its posted username. +type historyLabeler struct { + names map[string]string + seen map[string]string + n int +} + +func newHistoryLabeler(names map[string]string) *historyLabeler { + return &historyLabeler{names: names, seen: make(map[string]string)} +} + +func (l *historyLabeler) label(m slack.Message, own bool) string { + if own { + return "Bot" + } + key := m.User + if key == "" { + // A third-party bot (alerting app, …) posts with a bot_id and often a + // username but no user id; label it by that username when present. + if m.Username != "" { + return m.Username + } + key = "bot:" + m.BotID + } + if lbl, ok := l.seen[key]; ok { + return lbl + } + var lbl string + if name := l.names[m.User]; name != "" { + lbl = name + } else if m.Username != "" { + lbl = m.Username + } else { + l.n++ + lbl = fmt.Sprintf("User %d", l.n) + } + l.seen[key] = lbl + return lbl +} + +// slackTSLess orders two Slack timestamps ("secs.micros") chronologically. Slack +// ts strings are not safely comparable lexicographically across widths, so parse +// them; an unparseable value sorts as 0 (oldest). +func slackTSLess(a, b string) bool { + return parseSlackTS(a) < parseSlackTS(b) +} + +func parseSlackTS(ts string) float64 { + f, _ := strconv.ParseFloat(ts, 64) + return f +} diff --git a/server/internal/integrations/slack/history_test.go b/server/internal/integrations/slack/history_test.go new file mode 100644 index 000000000..341b7867a --- /dev/null +++ b/server/internal/integrations/slack/history_test.go @@ -0,0 +1,253 @@ +package slack + +import ( + "context" + "errors" + "testing" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" + "github.com/slack-go/slack" + + "github.com/multica-ai/multica/server/internal/integrations/channel" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +type fakeHistoryQueries struct { + binding db.ChannelChatSessionBinding + bindingErr error + inst db.ChannelInstallation + instErr error +} + +func (f *fakeHistoryQueries) GetChannelChatSessionBindingBySession(context.Context, db.GetChannelChatSessionBindingBySessionParams) (db.ChannelChatSessionBinding, error) { + return f.binding, f.bindingErr +} + +func (f *fakeHistoryQueries) GetChannelInstallation(context.Context, db.GetChannelInstallationParams) (db.ChannelInstallation, error) { + return f.inst, f.instErr +} + +type fakeHistoryClient struct { + historyMsgs []slack.Message + repliesMsgs []slack.Message + users []slack.User + historyCalls int + repliesCalls int + lastHistory *slack.GetConversationHistoryParameters + lastReplies *slack.GetConversationRepliesParameters +} + +func (f *fakeHistoryClient) GetConversationHistoryContext(_ context.Context, p *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) { + f.historyCalls++ + f.lastHistory = p + return &slack.GetConversationHistoryResponse{Messages: f.historyMsgs}, nil +} + +func (f *fakeHistoryClient) GetConversationRepliesContext(_ context.Context, p *slack.GetConversationRepliesParameters) ([]slack.Message, bool, string, error) { + f.repliesCalls++ + f.lastReplies = p + return f.repliesMsgs, false, "", nil +} + +func (f *fakeHistoryClient) GetUsersInfoContext(_ context.Context, _ ...string) (*[]slack.User, error) { + return &f.users, nil +} + +func msg(user, text, ts string) slack.Message { + return slack.Message{Msg: slack.Msg{User: user, Text: text, Timestamp: ts}} +} + +func activeSlackInstall() db.ChannelInstallation { + return db.ChannelInstallation{Status: "active", Config: slackInstallConfigJSON()} +} + +// groupBinding builds a group session binding rooted at threadRoot (the thread +// the bot's reply opened on the @mention). +func groupBinding(threadRoot string) db.ChannelChatSessionBinding { + b := db.ChannelChatSessionBinding{ + InstallationID: uid(2), + ChannelChatID: "C1:" + threadRoot, + ChatType: string(channel.ChatTypeGroup), + Config: []byte(`{"channel_id":"C1"}`), + } + if threadRoot != "" { + b.LastThreadID = pgtype.Text{String: threadRoot, Valid: true} + } + return b +} + +func dmBinding() db.ChannelChatSessionBinding { + return db.ChannelChatSessionBinding{ + InstallationID: uid(2), + ChannelChatID: "D1", + ChatType: string(channel.ChatTypeP2P), + Config: []byte(`{"channel_id":"D1"}`), + } +} + +func newTestHistory(q historyQueries, fc historyClient) *History { + h := NewHistory(q, nil, nil) // nil decrypter => stored bytes treated as plaintext + h.newClient = func(string) historyClient { return fc } + return h +} + +// TestHistoryFetchChannelScope verifies a channel-scope read uses +// conversations.history and normalizes oldest-first with roles + labels. +func TestHistoryFetchChannelScope(t *testing.T) { + q := &fakeHistoryQueries{binding: groupBinding("50.000000"), inst: activeSlackInstall()} + fc := &fakeHistoryClient{ + // Slack returns newest-first; the bot (UBOT) replied last. + historyMsgs: []slack.Message{ + msg("UBOT", "on it", "102.000000"), + msg("U1", "@bot look into this", "101.000000"), + msg("U2", "alert: 5xx spiking", "100.000000"), + }, + users: []slack.User{{ID: "U1", RealName: "Alice"}}, // U2 unresolved -> positional + } + h := newTestHistory(q, fc) + + page, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeChannel}) + if err != nil { + t.Fatalf("Fetch: %v", err) + } + if fc.historyCalls != 1 || fc.repliesCalls != 0 { + t.Fatalf("expected conversations.history, got history=%d replies=%d", fc.historyCalls, fc.repliesCalls) + } + if fc.lastHistory.ChannelID != "C1" { + t.Errorf("channel id = %q, want C1", fc.lastHistory.ChannelID) + } + if page.ChannelType != "slack" || page.Scope != channel.HistoryScopeChannel { + t.Errorf("channel_type/scope = %q/%q, want slack/channel", page.ChannelType, page.Scope) + } + if len(page.Messages) != 3 || page.Messages[0].TS != "100.000000" || page.Messages[2].TS != "102.000000" { + t.Fatalf("expected 3 msgs oldest-first, got %+v", page.Messages) + } + if got := page.Messages[0]; got.Author != "User 1" || got.Role != channel.HistoryRoleUser { + t.Errorf("msg0 author/role = %q/%q, want User 1/user", got.Author, got.Role) + } + if got := page.Messages[1]; got.Author != "Alice" { + t.Errorf("msg1 author = %q, want Alice", got.Author) + } + if got := page.Messages[2]; got.Author != "Bot" || got.Role != channel.HistoryRoleAssistant { + t.Errorf("msg2 author/role = %q/%q, want Bot/assistant", got.Author, got.Role) + } +} + +// TestHistoryFetchThreadScope verifies a thread-scope read uses +// conversations.replies anchored on the session's thread root (from the binding). +func TestHistoryFetchThreadScope(t *testing.T) { + q := &fakeHistoryQueries{binding: groupBinding("50.000000"), inst: activeSlackInstall()} + fc := &fakeHistoryClient{repliesMsgs: []slack.Message{ + msg("U1", "second", "52.000000"), + msg("U1", "root", "50.000000"), + }} + h := newTestHistory(q, fc) + + page, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeThread, Limit: 10}) + if err != nil { + t.Fatalf("Fetch: %v", err) + } + if fc.repliesCalls != 1 || fc.historyCalls != 0 { + t.Fatalf("expected conversations.replies, got history=%d replies=%d", fc.historyCalls, fc.repliesCalls) + } + if fc.lastReplies.Timestamp != "50.000000" || fc.lastReplies.ChannelID != "C1" { + t.Errorf("replies anchored at %q/%q, want C1/50.000000", fc.lastReplies.ChannelID, fc.lastReplies.Timestamp) + } + if page.Scope != channel.HistoryScopeThread { + t.Errorf("scope = %q, want thread", page.Scope) + } + if len(page.Messages) != 2 || page.Messages[0].TS != "50.000000" { + t.Fatalf("expected 2 msgs oldest-first, got %+v", page.Messages) + } +} + +// TestHistoryFetchDMIgnoresThreadScope confirms a DM (no threads) degrades a +// thread request to channel history. +func TestHistoryFetchDMIgnoresThreadScope(t *testing.T) { + q := &fakeHistoryQueries{binding: dmBinding(), inst: activeSlackInstall()} + fc := &fakeHistoryClient{historyMsgs: []slack.Message{msg("U1", "hi", "100.000000")}} + h := newTestHistory(q, fc) + + page, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeThread}) + if err != nil { + t.Fatalf("Fetch: %v", err) + } + if fc.historyCalls != 1 || fc.repliesCalls != 0 { + t.Fatalf("DM must use conversations.history, got history=%d replies=%d", fc.historyCalls, fc.repliesCalls) + } + if page.Scope != channel.HistoryScopeChannel { + t.Errorf("scope = %q, want channel (DM has no thread)", page.Scope) + } +} + +// TestHistoryFetchThreadFallsBackWithoutRoot: a group binding with no recoverable +// thread root degrades a thread request to channel history. +func TestHistoryFetchThreadFallsBackWithoutRoot(t *testing.T) { + q := &fakeHistoryQueries{ + binding: db.ChannelChatSessionBinding{InstallationID: uid(2), ChannelChatID: "C1", ChatType: string(channel.ChatTypeGroup), Config: []byte(`{"channel_id":"C1"}`)}, + inst: activeSlackInstall(), + } + fc := &fakeHistoryClient{historyMsgs: []slack.Message{msg("U1", "x", "100.000000")}} + h := newTestHistory(q, fc) + + page, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeThread}) + if err != nil { + t.Fatalf("Fetch: %v", err) + } + if fc.historyCalls != 1 || fc.repliesCalls != 0 { + t.Fatalf("expected fallback to history, got history=%d replies=%d", fc.historyCalls, fc.repliesCalls) + } + if page.Scope != channel.HistoryScopeChannel { + t.Errorf("scope = %q, want channel", page.Scope) + } +} + +// TestHistoryTargetDerivesRoot pins the channel + thread-root recovery from a +// binding: last_thread_id first, then the composite-key suffix, empty for a DM. +func TestHistoryTargetDerivesRoot(t *testing.T) { + if ch, root := historyTarget(groupBinding("50.0")); ch != "C1" || root != "50.0" { + t.Errorf("from last_thread_id: got %q/%q, want C1/50.0", ch, root) + } + keyOnly := db.ChannelChatSessionBinding{ChannelChatID: "C9:77.7", Config: []byte(`{"channel_id":"C9"}`)} + if ch, root := historyTarget(keyOnly); ch != "C9" || root != "77.7" { + t.Errorf("from key suffix: got %q/%q, want C9/77.7", ch, root) + } + if ch, root := historyTarget(dmBinding()); ch != "D1" || root != "" { + t.Errorf("dm: got %q/%q, want D1/", ch, root) + } +} + +// TestHistoryFetchNoBinding maps a missing Slack binding to ErrNoSlackSession. +func TestHistoryFetchNoBinding(t *testing.T) { + q := &fakeHistoryQueries{bindingErr: pgx.ErrNoRows} + h := newTestHistory(q, &fakeHistoryClient{}) + if _, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{}); !errors.Is(err, ErrNoSlackSession) { + t.Fatalf("err = %v, want ErrNoSlackSession", err) + } +} + +// TestHistoryFetchInactiveInstall treats a revoked installation as empty. +func TestHistoryFetchInactiveInstall(t *testing.T) { + q := &fakeHistoryQueries{ + binding: groupBinding("50.0"), + inst: db.ChannelInstallation{Status: "revoked", Config: slackInstallConfigJSON()}, + } + h := newTestHistory(q, &fakeHistoryClient{}) + if _, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{}); !errors.Is(err, ErrNoSlackSession) { + t.Fatalf("err = %v, want ErrNoSlackSession", err) + } +} + +// TestHistoryLimitClamp confirms an over-large limit is clamped before the call. +func TestHistoryLimitClamp(t *testing.T) { + q := &fakeHistoryQueries{binding: groupBinding("50.0"), inst: activeSlackInstall()} + fc := &fakeHistoryClient{} + h := newTestHistory(q, fc) + if _, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeChannel, Limit: 5000}); err != nil { + t.Fatalf("Fetch: %v", err) + } + if fc.lastHistory.Limit != maxHistoryLimit { + t.Errorf("limit = %d, want clamp to %d", fc.lastHistory.Limit, maxHistoryLimit) + } +} diff --git a/server/internal/integrations/slack/resolvers.go b/server/internal/integrations/slack/resolvers.go index 02d9d1923..1c5d298f4 100644 --- a/server/internal/integrations/slack/resolvers.go +++ b/server/internal/integrations/slack/resolvers.go @@ -90,6 +90,11 @@ func slackSessionRouting(msg channel.InboundMessage) (bindingKey string, config if msg.Source.ChatType == channel.ChatTypeP2P { return chatID, cfg, msg.Source.ThreadID } + // The thread root is the inbound thread_ts when the @mention is a reply + // inside an existing thread, else the message's own ts (a top-level mention + // becomes the root the bot threads its reply under). Either way the root is + // recoverable later from the binding (channel_chat_id suffix / last_thread_id), + // which is what the history reader uses to read the thread. threadRoot := msg.Source.ThreadID if threadRoot == "" { threadRoot = msg.MessageID