Compare commits

...

3 Commits

Author SHA1 Message Date
J
8ebbd150b9 fix(slack): thread-first history for follow-ups, channel for first turn (MUL-3871)
A Slack conversation has two nested histories: the surrounding channel and the
agent's own thread (the bot's first reply opens a thread on the @mention). The
first version picked replies-vs-history from a thread_ts fixed at session
creation, so a session started by a top-level @mention always read CHANNEL
history — even on follow-ups inside the bot's thread, which should read THREAD
history first.

- Add a HistoryScope (auto|thread|channel). The handler resolves auto:
  first turn (no prior bot reply) -> channel; follow-up -> thread. The agent can
  override with --scope channel|thread, and the response reports the scope read.
- The thread root is derived from the binding (last_thread_id / composite-key
  suffix), available for every engaged group session, instead of the
  creation-time thread_ts (now removed from the binding config).
- A DM degrades a thread request to channel history (DMs have no threads).
- Prompt guidance + CLI help updated to explain the policy.

Tests: scope selection (thread/channel/DM-fallback/no-root), root derivation,
and handler auto-resolution (first->channel, follow-up->thread, explicit
override).

Co-authored-by: multica-agent <github@multica.ai>
2026-06-30 15:20:11 +08:00
J
0dabeeabf4 fix(slack): require task-token actor source on chat history endpoint
Niko's review caught a privilege-boundary hole: the endpoint trusted
X-Task-ID, but it is mounted under the general Auth group where a normal
JWT / mul_ PAT request does NOT strip a client-forged X-Task-ID — only the
mat_ task-token branch stamps it. A workspace member who knew a chat task id
could forge the header and read that task's Slack channel/DM/thread history.

Gate on the server-set X-Actor-Source == "task_token" (the Auth middleware
deletes any client-supplied value and re-stamps it only on the mat_ branch),
then trust X-Task-ID. Adds a regression test: a forged X-Task-ID without the
task-token actor source is rejected with 403 and never reaches the reader.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-30 14:56:31 +08:00
J
21a8ae91d5 feat(slack): add unified multica chat history pull for channel backfill (MUL-3871)
Agents @mentioned in a Slack thread/channel only saw the triggering message,
never the prior conversation (GitHub #4717). Instead of force-assembling a
recent-context block on every inbound (the Feishu approach), expose a single
channel-agnostic pull command the agent runs on demand.

- channel: normalized HistoryMessage/HistoryPage/HistoryOptions vocab so the
  agent sees one shape regardless of platform.
- slack.History: resolves session -> binding -> installation -> bot token and
  reads conversations.replies (real thread) or conversations.history (DM /
  top-level channel, capturing sibling messages). thread_ts is recorded on the
  binding config at session creation to pick the right call.
- handler GET /api/chat/history: authorized purely by the task-scoped token
  (stamped X-Task-ID -> the task's own chat session), so an agent can only read
  the conversation it is currently running for.
- multica chat history CLI command (no args; same for every channel).
- buildChatPrompt nudge so the agent discovers the command.

Feishu is intentionally untouched. Adding a platform = implement the reader.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-30 14:40:10 +08:00
12 changed files with 1291 additions and 1 deletions

View File

@@ -0,0 +1,109 @@
package main
import (
"context"
"fmt"
"net/url"
"os"
"strconv"
"github.com/spf13/cobra"
"github.com/multica-ai/multica/server/internal/cli"
)
var chatCmd = &cobra.Command{
Use: "chat",
Short: "Work with the current chat conversation",
}
var chatHistoryCmd = &cobra.Command{
Use: "history",
Short: "Read prior messages from the chat channel this conversation came from",
Long: `Read the earlier messages of the chat channel (e.g. a Slack thread, channel,
or DM) that this conversation is connected to.
When you are @mentioned in a Slack thread or channel you only receive the one
triggering message — not what was said before it. Run this to pull the
surrounding conversation so you understand the full context.
A conversation has two nested histories: the surrounding CHANNEL and your own
THREAD within it (your first reply opens a thread on the @mention). By default
(--scope auto) the server reads the channel on your first reply — where the
prior context lives — and your thread on follow-ups. Use --scope channel to pull
the wider channel during a follow-up when the thread alone is not enough, or
--scope thread to force the thread.
It is the SAME command regardless of which channel the conversation came from;
the server hides the per-platform differences. It reads only the conversation
you are currently running for — it cannot read any other session or channel.`,
Args: cobra.NoArgs,
RunE: runChatHistory,
}
func init() {
chatHistoryCmd.Flags().String("scope", "auto", "Which history to read: auto, thread, or channel")
chatHistoryCmd.Flags().Int("limit", 0, "Maximum number of messages to return (the server clamps the range)")
chatHistoryCmd.Flags().String("before", "", "Opaque cursor (a next_cursor from a prior page) to read older messages")
chatHistoryCmd.Flags().String("output", "json", "Output format: table or json")
chatCmd.AddCommand(chatHistoryCmd)
}
func runChatHistory(cmd *cobra.Command, _ []string) error {
client, err := newAPIClient(cmd)
if err != nil {
return err
}
ctx, cancel := cli.APIContext(context.Background())
defer cancel()
scope, _ := cmd.Flags().GetString("scope")
limit, _ := cmd.Flags().GetInt("limit")
before, _ := cmd.Flags().GetString("before")
q := url.Values{}
if scope != "" && scope != "auto" {
q.Set("scope", scope)
}
if limit > 0 {
q.Set("limit", strconv.Itoa(limit))
}
if before != "" {
q.Set("before", before)
}
path := "/api/chat/history"
if encoded := q.Encode(); encoded != "" {
path += "?" + encoded
}
var resp map[string]any
if err := client.GetJSON(ctx, path, &resp); err != nil {
return fmt.Errorf("read chat history: %w", err)
}
output, _ := cmd.Flags().GetString("output")
if output == "table" {
if note := strVal(resp, "note"); note != "" {
fmt.Fprintln(os.Stdout, note)
return nil
}
if s := strVal(resp, "scope"); s != "" {
fmt.Fprintf(os.Stdout, "scope: %s\n", s)
}
msgs, _ := resp["messages"].([]any)
headers := []string{"TS", "ROLE", "AUTHOR", "TEXT"}
rows := make([][]string, 0, len(msgs))
for _, mi := range msgs {
m, ok := mi.(map[string]any)
if !ok {
continue
}
rows = append(rows, []string{strVal(m, "ts"), strVal(m, "role"), strVal(m, "author"), strVal(m, "text")})
}
cli.PrintTable(os.Stdout, headers, rows)
return nil
}
return cli.PrintJSON(os.Stdout, resp)
}

View File

@@ -52,6 +52,7 @@ func init() {
repoCmd.GroupID = groupCore
skillCmd.GroupID = groupCore
squadCmd.GroupID = groupCore
chatCmd.GroupID = groupCore
// Runtime commands
daemonCmd.GroupID = groupRuntime
@@ -76,6 +77,7 @@ func init() {
rootCmd.AddCommand(repoCmd)
rootCmd.AddCommand(skillCmd)
rootCmd.AddCommand(squadCmd)
rootCmd.AddCommand(chatCmd)
rootCmd.AddCommand(daemonCmd)
rootCmd.AddCommand(runtimeCmd)
rootCmd.AddCommand(authCmd)

View File

@@ -455,6 +455,11 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
channelRouter.Register(slack.TypeSlack, slack.NewSlackResolverSet(queries, pool, slackReplier))
slack.NewOutbound(queries, box.Open, slog.Default()).Register(bus)
// On-demand history reader behind the unified `multica chat history`
// command (MUL-3871): pull the session's Slack conversation when the
// agent asks, instead of force-assembling it on every inbound.
h.SlackHistory = slack.NewHistory(queries, box.Open, slog.Default())
// Per-installation inbound: the Supervisor builds + supervises one
// Socket Mode connection per active Slack installation, authenticated
// with that installation's OWN app-level token (xapp-, pasted at BYO
@@ -1130,6 +1135,13 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
})
r.Get("/api/chat/pending-tasks", h.ListPendingChatTasks)
// Agent-facing unified history read: `multica chat history` resolves
// the caller's task-scoped token to its own chat session and returns
// the bound channel's prior messages (MUL-3871). No session id is
// passed — the token IS the scope, so an agent can only read its own
// conversation.
r.Get("/api/chat/history", h.GetChatChannelHistory)
// Inbox
r.Route("/api/inbox", func(r chi.Router) {
r.Get("/", h.ListInbox)

View File

@@ -190,6 +190,13 @@ func buildChatPrompt(task Task) string {
var b strings.Builder
b.WriteString("You are running as a chat assistant for a Multica workspace.\n")
b.WriteString("A user is chatting with you directly. Respond to their message.\n\n")
// Discoverability nudge for the on-demand channel-history pull (MUL-3871).
// When this conversation came from an IM channel (e.g. Slack) the message
// below may be only the line that triggered the agent, not the surrounding
// thread/channel discussion. The agent has no other signal that earlier
// context exists, so point it at the unified command; it returns an empty
// list (with a note) for web-only sessions, so an always-on hint is safe.
b.WriteString("If this conversation came from a chat channel (e.g. a Slack thread, channel, or DM), the message below may be only what triggered you — not what was said earlier. Run `multica chat history --output json` before replying to read the prior conversation. It auto-selects the right scope — the surrounding channel on your first reply (where earlier context lives), your own thread on follow-ups — and you can add `--scope channel` to pull the wider channel when a follow-up needs more. It returns an empty list if there is no connected channel.\n\n")
if task.Agent != nil && len(task.Agent.Skills) > 0 {
refs := ExtractSlashSkills(task.ChatMessage)
if len(refs) > 0 {

View File

@@ -286,6 +286,11 @@ func TestBuildChatPromptSlashSkills(t *testing.T) {
if !strings.Contains(out, "User message:\nplease [/deploy](slash://skill/abc-123) this") {
t.Fatalf("expected raw user message preserved, got:\n%s", out)
}
// Every chat prompt points the agent at the on-demand channel-history
// pull so it can recover Slack thread/channel context (MUL-3871).
if !strings.Contains(out, "multica chat history") {
t.Fatalf("expected channel-history nudge, got:\n%s", out)
}
})
t.Run("ignores skills not belonging to agent", func(t *testing.T) {

View File

@@ -0,0 +1,183 @@
package handler
import (
"context"
"errors"
"log/slog"
"net/http"
"strconv"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/integrations/channel"
"github.com/multica-ai/multica/server/internal/integrations/slack"
"github.com/multica-ai/multica/server/internal/logger"
"github.com/multica-ai/multica/server/internal/util"
)
// ChatChannelHistoryReader reads a chat session's bound IM-channel history. The
// Slack reader (slack.History) satisfies it; a future platform registers its
// own. Defined here as a narrow interface so the handler stays testable and so
// the channel-agnostic contract — one shape regardless of platform — is enforced
// at the boundary (MUL-3871).
type ChatChannelHistoryReader interface {
Fetch(ctx context.Context, chatSessionID pgtype.UUID, opts channel.HistoryOptions) (channel.HistoryPage, error)
}
// ChatChannelHistoryResponse is the unified `multica chat history` payload. It
// is the SAME shape no matter which channel backs the session — the agent never
// sees a per-platform API.
type ChatChannelHistoryResponse struct {
ChannelType string `json:"channel_type"`
Scope channel.HistoryScope `json:"scope,omitempty"`
Messages []channel.HistoryMessage `json:"messages"`
NextCursor string `json:"next_cursor,omitempty"`
// Note carries a human-readable explanation when there is no history to
// read (e.g. the session is not connected to a chat channel), so the agent
// gets a clear answer instead of a bare empty list.
Note string `json:"note,omitempty"`
}
// GetChatChannelHistory serves the agent-facing `multica chat history` command.
// It is authorized by the task-scoped token alone: middleware stamps the token's
// task into X-Task-ID (the client cannot forge it), and the endpoint reads the
// history of THAT task's chat session — so an agent can only ever read the
// conversation it is currently running for, never an arbitrary session/channel.
func (h *Handler) GetChatChannelHistory(w http.ResponseWriter, r *http.Request) {
// X-Actor-Source is server-set only: the Auth middleware deletes any
// client-supplied value and re-stamps "task_token" ONLY on the mat_ task
// token branch (along with the authoritative X-Task-ID). A normal JWT / mul_
// PAT request leaves it empty and does NOT strip a client-forged X-Task-ID,
// so this gate is load-bearing: without it a member could forge X-Task-ID and
// read another session's channel history. Require the task-token actor here,
// THEN trust X-Task-ID.
if r.Header.Get("X-Actor-Source") != "task_token" {
writeError(w, http.StatusForbidden, "chat history is only available from within an agent task")
return
}
taskIDHeader := r.Header.Get("X-Task-ID")
if taskIDHeader == "" {
writeError(w, http.StatusBadRequest, "missing task context")
return
}
taskUUID, err := util.ParseUUID(taskIDHeader)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid task id")
return
}
task, err := h.Queries.GetAgentTask(r.Context(), taskUUID)
if err != nil {
writeError(w, http.StatusNotFound, "task not found")
return
}
if !task.ChatSessionID.Valid {
writeError(w, http.StatusBadRequest, "this task is not a chat task")
return
}
// Defense in depth: load the session and confirm it lives in the token's
// stamped workspace. The token→task binding already guarantees the agent can
// only reach its own task here; this makes a future wiring regression fail
// closed instead of leaking another workspace's conversation.
session, err := h.Queries.GetChatSession(r.Context(), task.ChatSessionID)
if err != nil {
writeError(w, http.StatusNotFound, "chat session not found")
return
}
if ws := ctxWorkspaceID(r.Context()); ws != "" && uuidToString(session.WorkspaceID) != ws {
writeError(w, http.StatusForbidden, "chat session does not belong to this workspace")
return
}
scope := parseHistoryScope(r.URL.Query().Get("scope"))
if scope == channel.HistoryScopeAuto {
// First turn — the bot has not replied yet, so no thread exists — reads
// the surrounding channel (where the prior context lives). A follow-up
// reads the agent's own thread. The agent can override with
// ?scope=channel|thread.
if h.chatSessionHasBotReply(r.Context(), task.ChatSessionID) {
scope = channel.HistoryScopeThread
} else {
scope = channel.HistoryScopeChannel
}
}
opts := channel.HistoryOptions{
Scope: scope,
Limit: parseHistoryLimit(r.URL.Query().Get("limit")),
Before: r.URL.Query().Get("before"),
}
empty := ChatChannelHistoryResponse{Messages: []channel.HistoryMessage{}}
if h.SlackHistory == nil {
empty.Note = "No chat channel integration is configured on this server."
writeJSON(w, http.StatusOK, empty)
return
}
page, err := h.SlackHistory.Fetch(r.Context(), task.ChatSessionID, opts)
if err != nil {
if errors.Is(err, slack.ErrNoSlackSession) {
empty.Note = "This conversation is not connected to a chat channel, so there is no prior channel history to read."
writeJSON(w, http.StatusOK, empty)
return
}
slog.Error("chat channel history fetch failed", append(logger.RequestAttrs(r),
"error", err, "chat_session_id", uuidToString(task.ChatSessionID))...)
writeError(w, http.StatusBadGateway, "failed to read channel history")
return
}
messages := page.Messages
if messages == nil {
messages = []channel.HistoryMessage{}
}
writeJSON(w, http.StatusOK, ChatChannelHistoryResponse{
ChannelType: page.ChannelType,
Scope: page.Scope,
Messages: messages,
NextCursor: page.NextCursor,
})
}
// parseHistoryScope maps the ?scope query value to a HistoryScope, defaulting to
// auto for empty / unknown values.
func parseHistoryScope(raw string) channel.HistoryScope {
switch channel.HistoryScope(raw) {
case channel.HistoryScopeThread:
return channel.HistoryScopeThread
case channel.HistoryScopeChannel:
return channel.HistoryScopeChannel
default:
return channel.HistoryScopeAuto
}
}
// chatSessionHasBotReply reports whether the bot has already replied in this
// session — i.e. this is a follow-up, not the first turn. On Slack the bot's
// first reply opens the thread, so an existing assistant message is the signal
// that a thread worth reading exists. Best-effort: a query error defaults to
// false (treat as first turn → channel), the safe, context-rich choice.
func (h *Handler) chatSessionHasBotReply(ctx context.Context, sessionID pgtype.UUID) bool {
msgs, err := h.Queries.ListChatMessages(ctx, sessionID)
if err != nil {
return false
}
for _, m := range msgs {
if m.Role == "assistant" {
return true
}
}
return false
}
// parseHistoryLimit reads the ?limit query param, ignoring junk (the reader
// clamps the range). 0 means "use the reader's default".
func parseHistoryLimit(raw string) int {
if raw == "" {
return 0
}
n, err := strconv.Atoi(raw)
if err != nil || n < 0 {
return 0
}
return n
}

View File

@@ -0,0 +1,275 @@
package handler
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/integrations/channel"
"github.com/multica-ai/multica/server/internal/integrations/slack"
)
type fakeChatHistoryReader struct {
page channel.HistoryPage
err error
gotSession pgtype.UUID
gotOpts channel.HistoryOptions
}
func (f *fakeChatHistoryReader) Fetch(_ context.Context, sid pgtype.UUID, opts channel.HistoryOptions) (channel.HistoryPage, error) {
f.gotSession = sid
f.gotOpts = opts
return f.page, f.err
}
// newChatHistoryTask inserts a chat task bound to a fresh chat session and
// returns the task id and (for chat tasks) the session id. With
// chatSession=false it inserts a non-chat task and an empty session id.
func newChatHistoryTask(t *testing.T, chatSession bool) (taskID, sessionID string) {
t.Helper()
agentID := createHandlerTestAgent(t, "ChatHistoryAgent", []byte("[]"))
runtimeID := handlerTestRuntimeID(t)
var sessionArg any
if chatSession {
sessionID = createHandlerTestChatSession(t, agentID)
sessionArg = sessionID
}
if err := testPool.QueryRow(context.Background(), `
INSERT INTO agent_task_queue (agent_id, runtime_id, status, priority, chat_session_id)
VALUES ($1, $2, 'completed', 0, $3)
RETURNING id
`, agentID, runtimeID, sessionArg).Scan(&taskID); err != nil {
t.Fatalf("insert chat history task: %v", err)
}
t.Cleanup(func() {
testPool.Exec(context.Background(), `DELETE FROM agent_task_queue WHERE id = $1`, taskID)
})
return taskID, sessionID
}
// addAssistantMessage records a prior bot reply in the session, so the endpoint
// classifies the next read as a follow-up. The chat_session cleanup cascades to
// chat_message, so no separate cleanup is needed.
func addAssistantMessage(t *testing.T, sessionID string) {
t.Helper()
if _, err := testPool.Exec(context.Background(),
`INSERT INTO chat_message (chat_session_id, role, content) VALUES ($1, 'assistant', 'prior reply')`,
sessionID); err != nil {
t.Fatalf("insert assistant message: %v", err)
}
}
// taskActorRequest builds a /api/chat/history request as the Auth middleware
// would leave it for a mat_ task token: the server-set X-Actor-Source=task_token
// plus the authoritative X-Task-ID.
func taskActorRequest(taskID string) *http.Request {
req := newRequest("GET", "/api/chat/history", nil)
req.Header.Set("X-Actor-Source", "task_token")
req.Header.Set("X-Task-ID", taskID)
return req
}
func withSlackHistory(t *testing.T, r ChatChannelHistoryReader) {
t.Helper()
orig := testHandler.SlackHistory
testHandler.SlackHistory = r
t.Cleanup(func() { testHandler.SlackHistory = orig })
}
func TestGetChatChannelHistory_Success(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
taskID, _ := newChatHistoryTask(t, true)
fake := &fakeChatHistoryReader{page: channel.HistoryPage{
ChannelType: "slack",
Messages: []channel.HistoryMessage{
{ID: "100", Author: "Alice", Role: channel.HistoryRoleUser, Text: "alert", TS: "100"},
{ID: "101", Author: "Bot", Role: channel.HistoryRoleAssistant, Text: "on it", TS: "101"},
},
NextCursor: "100",
}}
withSlackHistory(t, fake)
req := taskActorRequest(taskID)
req.URL.RawQuery = "limit=10"
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String())
}
var resp ChatChannelHistoryResponse
if err := json.Unmarshal(w.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode: %v", err)
}
if resp.ChannelType != "slack" || len(resp.Messages) != 2 || resp.NextCursor != "100" {
t.Fatalf("unexpected response: %+v", resp)
}
if !fake.gotSession.Valid {
t.Errorf("reader was not called with a session id")
}
}
func TestGetChatChannelHistory_NoBindingReturnsNote(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
taskID, _ := newChatHistoryTask(t, true)
withSlackHistory(t, &fakeChatHistoryReader{err: slack.ErrNoSlackSession})
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, taskActorRequest(taskID))
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String())
}
var resp ChatChannelHistoryResponse
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp.Note == "" || len(resp.Messages) != 0 {
t.Fatalf("expected empty messages + a note, got %+v", resp)
}
}
func TestGetChatChannelHistory_NilReaderReturnsNote(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
taskID, _ := newChatHistoryTask(t, true)
withSlackHistory(t, nil)
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, taskActorRequest(taskID))
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String())
}
var resp ChatChannelHistoryResponse
_ = json.Unmarshal(w.Body.Bytes(), &resp)
if resp.Note == "" {
t.Fatalf("expected a note when no reader configured, got %+v", resp)
}
}
// TestGetChatChannelHistory_RejectsForgedTaskID is the security regression test
// for Niko's must-fix: a normal request (no server-set X-Actor-Source) that
// forges X-Task-ID — exactly what a workspace member could do with a JWT / mul_
// PAT, since the Auth middleware does NOT strip a client-sent X-Task-ID — must be
// rejected, never served another session's history.
func TestGetChatChannelHistory_RejectsForgedTaskID(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
taskID, _ := newChatHistoryTask(t, true)
fake := &fakeChatHistoryReader{page: channel.HistoryPage{ChannelType: "slack"}}
withSlackHistory(t, fake)
req := newRequest("GET", "/api/chat/history", nil)
req.Header.Set("X-Task-ID", taskID) // forged: no X-Actor-Source=task_token
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, req)
if w.Code != http.StatusForbidden {
t.Fatalf("status = %d, want 403", w.Code)
}
if fake.gotSession.Valid {
t.Fatalf("reader must not be called for a forged X-Task-ID")
}
}
func TestGetChatChannelHistory_MissingTaskHeader(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
// Task-token actor source but no X-Task-ID: a defensive 400 (the mat_ branch
// always stamps both, so this should not happen in practice).
req := newRequest("GET", "/api/chat/history", nil)
req.Header.Set("X-Actor-Source", "task_token")
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, req)
if w.Code != http.StatusBadRequest {
t.Fatalf("status = %d, want 400", w.Code)
}
}
func TestGetChatChannelHistory_NonChatTask(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
taskID, _ := newChatHistoryTask(t, false) // task with no chat_session_id
withSlackHistory(t, &fakeChatHistoryReader{})
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, taskActorRequest(taskID))
if w.Code != http.StatusBadRequest {
t.Fatalf("status = %d, want 400: %s", w.Code, w.Body.String())
}
}
// TestGetChatChannelHistory_AutoFirstTurnReadsChannel: with no prior bot reply,
// scope=auto resolves to channel (the surrounding context before the thread).
func TestGetChatChannelHistory_AutoFirstTurnReadsChannel(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
taskID, _ := newChatHistoryTask(t, true) // no assistant message => first turn
fake := &fakeChatHistoryReader{page: channel.HistoryPage{ChannelType: "slack", Scope: channel.HistoryScopeChannel}}
withSlackHistory(t, fake)
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, taskActorRequest(taskID)) // no ?scope => auto
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String())
}
if fake.gotOpts.Scope != channel.HistoryScopeChannel {
t.Fatalf("auto first-turn scope = %q, want channel", fake.gotOpts.Scope)
}
}
// TestGetChatChannelHistory_AutoFollowUpReadsThread: once the bot has replied,
// scope=auto resolves to thread.
func TestGetChatChannelHistory_AutoFollowUpReadsThread(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
taskID, sessionID := newChatHistoryTask(t, true)
addAssistantMessage(t, sessionID) // bot already replied => follow-up
fake := &fakeChatHistoryReader{page: channel.HistoryPage{ChannelType: "slack", Scope: channel.HistoryScopeThread}}
withSlackHistory(t, fake)
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, taskActorRequest(taskID))
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String())
}
if fake.gotOpts.Scope != channel.HistoryScopeThread {
t.Fatalf("auto follow-up scope = %q, want thread", fake.gotOpts.Scope)
}
}
// TestGetChatChannelHistory_ExplicitChannelScope: ?scope=channel overrides the
// auto default even on a follow-up.
func TestGetChatChannelHistory_ExplicitChannelScope(t *testing.T) {
if testHandler == nil {
t.Skip("requires test database")
}
taskID, sessionID := newChatHistoryTask(t, true)
addAssistantMessage(t, sessionID) // follow-up, but explicit override below
fake := &fakeChatHistoryReader{page: channel.HistoryPage{ChannelType: "slack", Scope: channel.HistoryScopeChannel}}
withSlackHistory(t, fake)
req := taskActorRequest(taskID)
req.URL.RawQuery = "scope=channel"
w := httptest.NewRecorder()
testHandler.GetChatChannelHistory(w, req)
if w.Code != http.StatusOK {
t.Fatalf("status = %d, want 200: %s", w.Code, w.Body.String())
}
if fake.gotOpts.Scope != channel.HistoryScopeChannel {
t.Fatalf("explicit scope = %q, want channel", fake.gotOpts.Scope)
}
}

View File

@@ -185,7 +185,12 @@ type Handler struct {
// "link your Slack account" prompt (MUL-3666). Nil unless Slack is
// configured (MULTICA_SLACK_SECRET_KEY set).
SlackBindingTokens *slack.BindingTokenService
cfg Config
// SlackHistory backs the agent-facing `multica chat history` command: it
// reads a chat session's bound Slack conversation on demand (MUL-3871). Nil
// unless Slack is configured; GetChatChannelHistory then reports "no channel
// integration". A future platform satisfies the same reader interface.
SlackHistory ChatChannelHistoryReader
cfg Config
}
func New(queries *db.Queries, txStarter txStarter, hub *realtime.Hub, bus *events.Bus, emailService *service.EmailService, store storage.Storage, cfSigner *auth.CloudFrontSigner, analyticsClient analytics.Client, cfg Config, daemonHubs ...*daemonws.Hub) *Handler {

View File

@@ -0,0 +1,100 @@
package channel
// This file defines the channel-agnostic vocabulary for ON-DEMAND history
// reads. Unlike the inbound push path (InboundMessage), history is PULLED by
// the agent through a single unified CLI (`multica chat history`): the agent
// asks for "the history of the conversation I'm in" and never sees a
// per-platform API. The server resolves the session's binding to a channel
// type and dispatches to that platform's reader, which returns these
// normalized shapes — so adding a platform is "implement a reader", and the
// agent-facing contract never changes (MUL-3871).
// HistoryRole is the normalized author kind of a fetched message, mirroring the
// chat_message.role domain the agent already reasons about.
type HistoryRole string
const (
// HistoryRoleUser is a human (or a third-party bot, e.g. an alerting bot)
// message — context the agent should read.
HistoryRoleUser HistoryRole = "user"
// HistoryRoleAssistant is one of THIS bot's own prior messages in the
// conversation.
HistoryRoleAssistant HistoryRole = "assistant"
)
// HistoryScope selects which slice of a conversation to read. A chat platform
// has two nested histories: the surrounding CHANNEL and the agent's own THREAD
// within it (on Slack the bot's first reply opens a thread on the @mention, so
// every engaged conversation has one). The agent's primary read on a follow-up
// is its thread; the wider channel is pulled only when needed. On the first
// turn there is no thread yet, so the channel is the relevant context.
type HistoryScope string
const (
// HistoryScopeAuto lets the server pick: the channel on the first turn (no
// thread exists yet), the thread on follow-ups. This is the default.
HistoryScopeAuto HistoryScope = "auto"
// HistoryScopeThread reads the agent's own thread (Slack
// conversations.replies). Falls back to the channel where the platform /
// conversation has no threads (e.g. a DM).
HistoryScopeThread HistoryScope = "thread"
// HistoryScopeChannel reads the surrounding channel (Slack
// conversations.history).
HistoryScopeChannel HistoryScope = "channel"
)
// HistoryMessage is one normalized message from a conversation's history. It is
// the same shape regardless of platform so the agent reads a uniform list,
// exactly like `multica issue comment list --output json`.
type HistoryMessage struct {
// ID is the platform message identifier (Slack ts, Feishu message_id).
ID string `json:"id"`
// Author is a human-readable display label for the sender ("Alice",
// "Bot", or a positional "User 2" fallback when the name is unresolved).
Author string `json:"author"`
// AuthorID is the platform-native sender id, when available. Empty for
// some platform/bot messages.
AuthorID string `json:"author_id,omitempty"`
// Role distinguishes the bot's own turns from everyone else's.
Role HistoryRole `json:"role"`
// Text is the message body, flattened to plain text by the adapter.
Text string `json:"text"`
// TS is the platform timestamp string, sortable lexicographically within a
// platform (Slack "1700000000.000100"). It doubles as the paging cursor.
TS string `json:"ts"`
}
// HistoryPage is one normalized page of history plus a cursor for paging
// further back. Messages are ordered OLDEST-FIRST so the transcript reads
// top-to-bottom like the chat does.
type HistoryPage struct {
// ChannelType is the platform the history came from ("slack"). Empty when
// the session is not bound to any channel (a web-only chat session).
ChannelType string `json:"channel_type,omitempty"`
// Scope is the scope actually read ("thread" or "channel") after resolving
// "auto" and any platform fallback (e.g. a DM has no thread). It lets the
// agent know what it got and decide whether to also pull the other scope.
Scope HistoryScope `json:"scope,omitempty"`
// Messages are the fetched messages, oldest-first.
Messages []HistoryMessage `json:"messages"`
// NextCursor, when non-empty, is an opaque cursor to pass as Before to
// page to OLDER messages. Empty means no older messages were available.
NextCursor string `json:"next_cursor,omitempty"`
}
// HistoryOptions tune a history read. They are platform-neutral; each reader
// maps them onto its own API's paging primitives.
type HistoryOptions struct {
// Scope selects thread vs channel. The handler resolves "auto" to a
// concrete scope before calling the reader (it knows whether this is a
// first turn or a follow-up); the reader still degrades "thread" to channel
// where the conversation has no thread. An empty value reads the channel.
Scope HistoryScope
// Limit caps how many messages to return. A reader clamps it to its
// platform's per-page maximum and applies a sane default for <= 0.
Limit int
// Before is an opaque cursor (a NextCursor from a prior page); the reader
// returns only messages strictly older than it. Empty starts at the most
// recent messages.
Before string
}

View File

@@ -0,0 +1,334 @@
package slack
import (
"context"
"encoding/json"
"errors"
"fmt"
"log/slog"
"sort"
"strconv"
"strings"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/slack-go/slack"
"github.com/multica-ai/multica/server/internal/integrations/channel"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// ErrNoSlackSession reports that the chat session has no Slack channel binding —
// it is a Feishu or web-only session. Callers surface it as an empty (not
// failed) history read so the unified `multica chat history` command answers
// gracefully on a non-Slack conversation.
var ErrNoSlackSession = errors.New("slack: session has no slack channel binding")
const (
// defaultHistoryLimit is the page size used when the caller asks for none.
defaultHistoryLimit = 20
// maxHistoryLimit caps a single page. Slack's own conversations.* limit is
// far higher; we self-cap so a pull can't dump an unbounded transcript into
// the agent's context (mirrors the Feishu recent-context clamp).
maxHistoryLimit = 50
)
// historyQueries is the slice of generated queries the history reader needs.
// *db.Queries satisfies it. It mirrors outboundQueries: resolve the session's
// Slack binding, then load the installation that owns the bot token.
type historyQueries interface {
GetChannelChatSessionBindingBySession(ctx context.Context, arg db.GetChannelChatSessionBindingBySessionParams) (db.ChannelChatSessionBinding, error)
GetChannelInstallation(ctx context.Context, arg db.GetChannelInstallationParams) (db.ChannelInstallation, error)
}
// historyClient is the slice of the slack-go Web API the reader calls. The real
// *slack.Client satisfies it; tests inject a fake so the fetch/labeling logic is
// exercised without a live Slack.
type historyClient interface {
GetConversationHistoryContext(ctx context.Context, params *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error)
GetConversationRepliesContext(ctx context.Context, params *slack.GetConversationRepliesParameters) ([]slack.Message, bool, string, error)
GetUsersInfoContext(ctx context.Context, users ...string) (*[]slack.User, error)
}
// History reads a Slack conversation's prior messages on demand — the pull half
// of the unified `multica chat history` tool (MUL-3871). It mirrors Outbound:
// given a chat_session it finds the Slack binding, decrypts the installation's
// bot token, and calls conversations.replies (a real thread) or
// conversations.history (DM / top-level channel context). Sessions with no
// Slack binding return ErrNoSlackSession, so it coexists with Feishu sessions on
// the shared endpoint.
type History struct {
q historyQueries
decrypt Decrypter
logger *slog.Logger
newClient func(botToken string) historyClient
}
// NewHistory builds the reader over the generated queries and the bot-token
// decrypter (box.Open at wiring time).
func NewHistory(q historyQueries, decrypt Decrypter, logger *slog.Logger) *History {
if logger == nil {
logger = slog.Default()
}
h := &History{q: q, decrypt: decrypt, logger: logger}
h.newClient = func(botToken string) historyClient {
// Only the bot token is needed to read history; the app-level token is
// for the inbound Socket Mode connection (slack_channel.go).
return slack.New(botToken)
}
return h
}
// Fetch returns one normalized, oldest-first page of the session's Slack
// conversation. It returns ErrNoSlackSession when the session is not Slack-bound
// or its installation is inactive.
func (h *History) Fetch(ctx context.Context, chatSessionID pgtype.UUID, opts channel.HistoryOptions) (channel.HistoryPage, error) {
binding, err := h.q.GetChannelChatSessionBindingBySession(ctx, db.GetChannelChatSessionBindingBySessionParams{
ChatSessionID: chatSessionID,
ChannelType: string(TypeSlack),
})
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return channel.HistoryPage{}, ErrNoSlackSession
}
return channel.HistoryPage{}, fmt.Errorf("lookup slack chat binding: %w", err)
}
inst, err := h.q.GetChannelInstallation(ctx, db.GetChannelInstallationParams{
ID: binding.InstallationID,
ChannelType: string(TypeSlack),
})
if err != nil {
return channel.HistoryPage{}, fmt.Errorf("load slack installation: %w", err)
}
if inst.Status != "active" {
return channel.HistoryPage{}, ErrNoSlackSession // revoked install: nothing to read
}
creds, err := decodeCredentials(inst.Config, h.decrypt)
if err != nil {
return channel.HistoryPage{}, fmt.Errorf("decode slack credentials: %w", err)
}
channelID, threadRoot := historyTarget(binding)
// Resolve the concrete scope to read. The handler resolves "auto" to
// thread/channel (it knows first-turn vs follow-up); here we additionally
// degrade "thread" to "channel" when there is no thread to read — a DM, or a
// group whose root could not be recovered.
scope := channel.HistoryScopeChannel
if opts.Scope == channel.HistoryScopeThread &&
binding.ChatType == string(channel.ChatTypeGroup) && threadRoot != "" {
scope = channel.HistoryScopeThread
}
limit := opts.Limit
if limit <= 0 {
limit = defaultHistoryLimit
}
if limit > maxHistoryLimit {
limit = maxHistoryLimit
}
fetchThreadTS := ""
if scope == channel.HistoryScopeThread {
fetchThreadTS = threadRoot
}
client := h.newClient(creds.BotToken)
raw, err := fetchRaw(ctx, client, channelID, fetchThreadTS, opts.Before, limit)
if err != nil {
return channel.HistoryPage{}, fmt.Errorf("read slack history: %w", err)
}
page := normalizeHistory(ctx, client, h.logger, raw, creds.BotUserID, limit)
page.ChannelType = string(TypeSlack)
page.Scope = scope
return page, nil
}
// historyTarget recovers the real channel id and the thread root from the
// binding. The channel_chat_id may be a composite "channel:threadRoot"
// isolation key, so the real channel id is read from the binding config
// (slackBindingConfig). The thread root — present for every engaged group
// session, since the bot's first reply opens a thread on the @mention — is the
// recorded reply thread (last_thread_id), falling back to the composite-key
// suffix. It is empty for a DM (no threads).
func historyTarget(b db.ChannelChatSessionBinding) (channelID, threadRoot string) {
channelID = b.ChannelChatID
if len(b.Config) > 0 {
var cfg slackBindingConfig
if err := json.Unmarshal(b.Config, &cfg); err == nil && cfg.ChannelID != "" {
channelID = cfg.ChannelID
}
}
if b.LastThreadID.Valid && b.LastThreadID.String != "" {
threadRoot = b.LastThreadID.String
} else if i := strings.IndexByte(b.ChannelChatID, ':'); i >= 0 {
threadRoot = b.ChannelChatID[i+1:]
}
return channelID, threadRoot
}
// fetchRaw pulls the most recent `limit` messages older than `before` (exclusive
// when set). A thread read uses conversations.replies anchored on the thread
// root; a channel read uses conversations.history. Both return newest-first;
// ordering is normalized downstream.
func fetchRaw(ctx context.Context, client historyClient, channelID, threadTS, before string, limit int) ([]slack.Message, error) {
if threadTS != "" {
msgs, _, _, err := client.GetConversationRepliesContext(ctx, &slack.GetConversationRepliesParameters{
ChannelID: channelID,
Timestamp: threadTS,
Latest: before,
Inclusive: false,
Limit: limit,
})
return msgs, err
}
resp, err := client.GetConversationHistoryContext(ctx, &slack.GetConversationHistoryParameters{
ChannelID: channelID,
Latest: before,
Inclusive: false,
Limit: limit,
})
if err != nil {
return nil, err
}
return resp.Messages, nil
}
// normalizeHistory turns raw Slack messages into a normalized, oldest-first
// page: it resolves human display names in one batch, labels each sender, maps
// the role, and computes the back-paging cursor.
func normalizeHistory(ctx context.Context, client historyClient, logger *slog.Logger, raw []slack.Message, botUserID string, limit int) channel.HistoryPage {
// Oldest-first so the transcript reads top-to-bottom like the chat does.
sort.SliceStable(raw, func(i, j int) bool { return slackTSLess(raw[i].Timestamp, raw[j].Timestamp) })
names := resolveUserNames(ctx, client, logger, raw, botUserID)
labeler := newHistoryLabeler(names)
out := make([]channel.HistoryMessage, 0, len(raw))
for i := range raw {
m := raw[i]
text := m.Text
if text == "" {
continue // join/system/edit markers carry no readable body
}
own := m.User != "" && m.User == botUserID
role := channel.HistoryRoleUser
if own {
role = channel.HistoryRoleAssistant
}
out = append(out, channel.HistoryMessage{
ID: m.Timestamp,
Author: labeler.label(m, own),
AuthorID: m.User,
Role: role,
Text: text,
TS: m.Timestamp,
})
}
page := channel.HistoryPage{Messages: out}
// Only advertise a cursor when the platform returned a full page (more may
// exist older than the oldest message we just returned).
if len(raw) >= limit && len(out) > 0 {
page.NextCursor = out[0].TS
}
return page
}
// resolveUserNames batch-resolves the human senders' display names, best-effort.
// A failure (missing users:read scope, transport error) yields a nil map so the
// labeler falls back to positional "User N" rather than blocking the read.
func resolveUserNames(ctx context.Context, client historyClient, logger *slog.Logger, msgs []slack.Message, botUserID string) map[string]string {
seen := make(map[string]bool)
ids := make([]string, 0, len(msgs))
for i := range msgs {
u := msgs[i].User
if u == "" || u == botUserID || seen[u] {
continue
}
seen[u] = true
ids = append(ids, u)
}
if len(ids) == 0 {
return nil
}
users, err := client.GetUsersInfoContext(ctx, ids...)
if err != nil || users == nil {
if err != nil {
logger.WarnContext(ctx, "slack history: user name resolution failed", "ids", len(ids), "error", err)
}
return nil
}
names := make(map[string]string, len(*users))
for _, u := range *users {
if name := slackDisplayName(u); name != "" {
names[u.ID] = name
}
}
return names
}
// slackDisplayName picks the friendliest available name for a Slack user.
func slackDisplayName(u slack.User) string {
switch {
case u.Profile.DisplayName != "":
return u.Profile.DisplayName
case u.RealName != "":
return u.RealName
default:
return u.Name
}
}
// historyLabeler assigns stable, human-readable labels within one page, mirroring
// the Feishu speakerLabeler: this bot is "Bot"; a resolved human gets their real
// name; an unresolved human falls back to positional "User N"; a third-party bot
// uses its posted username.
type historyLabeler struct {
names map[string]string
seen map[string]string
n int
}
func newHistoryLabeler(names map[string]string) *historyLabeler {
return &historyLabeler{names: names, seen: make(map[string]string)}
}
func (l *historyLabeler) label(m slack.Message, own bool) string {
if own {
return "Bot"
}
key := m.User
if key == "" {
// A third-party bot (alerting app, …) posts with a bot_id and often a
// username but no user id; label it by that username when present.
if m.Username != "" {
return m.Username
}
key = "bot:" + m.BotID
}
if lbl, ok := l.seen[key]; ok {
return lbl
}
var lbl string
if name := l.names[m.User]; name != "" {
lbl = name
} else if m.Username != "" {
lbl = m.Username
} else {
l.n++
lbl = fmt.Sprintf("User %d", l.n)
}
l.seen[key] = lbl
return lbl
}
// slackTSLess orders two Slack timestamps ("secs.micros") chronologically. Slack
// ts strings are not safely comparable lexicographically across widths, so parse
// them; an unparseable value sorts as 0 (oldest).
func slackTSLess(a, b string) bool {
return parseSlackTS(a) < parseSlackTS(b)
}
func parseSlackTS(ts string) float64 {
f, _ := strconv.ParseFloat(ts, 64)
return f
}

View File

@@ -0,0 +1,253 @@
package slack
import (
"context"
"errors"
"testing"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/slack-go/slack"
"github.com/multica-ai/multica/server/internal/integrations/channel"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
type fakeHistoryQueries struct {
binding db.ChannelChatSessionBinding
bindingErr error
inst db.ChannelInstallation
instErr error
}
func (f *fakeHistoryQueries) GetChannelChatSessionBindingBySession(context.Context, db.GetChannelChatSessionBindingBySessionParams) (db.ChannelChatSessionBinding, error) {
return f.binding, f.bindingErr
}
func (f *fakeHistoryQueries) GetChannelInstallation(context.Context, db.GetChannelInstallationParams) (db.ChannelInstallation, error) {
return f.inst, f.instErr
}
type fakeHistoryClient struct {
historyMsgs []slack.Message
repliesMsgs []slack.Message
users []slack.User
historyCalls int
repliesCalls int
lastHistory *slack.GetConversationHistoryParameters
lastReplies *slack.GetConversationRepliesParameters
}
func (f *fakeHistoryClient) GetConversationHistoryContext(_ context.Context, p *slack.GetConversationHistoryParameters) (*slack.GetConversationHistoryResponse, error) {
f.historyCalls++
f.lastHistory = p
return &slack.GetConversationHistoryResponse{Messages: f.historyMsgs}, nil
}
func (f *fakeHistoryClient) GetConversationRepliesContext(_ context.Context, p *slack.GetConversationRepliesParameters) ([]slack.Message, bool, string, error) {
f.repliesCalls++
f.lastReplies = p
return f.repliesMsgs, false, "", nil
}
func (f *fakeHistoryClient) GetUsersInfoContext(_ context.Context, _ ...string) (*[]slack.User, error) {
return &f.users, nil
}
func msg(user, text, ts string) slack.Message {
return slack.Message{Msg: slack.Msg{User: user, Text: text, Timestamp: ts}}
}
func activeSlackInstall() db.ChannelInstallation {
return db.ChannelInstallation{Status: "active", Config: slackInstallConfigJSON()}
}
// groupBinding builds a group session binding rooted at threadRoot (the thread
// the bot's reply opened on the @mention).
func groupBinding(threadRoot string) db.ChannelChatSessionBinding {
b := db.ChannelChatSessionBinding{
InstallationID: uid(2),
ChannelChatID: "C1:" + threadRoot,
ChatType: string(channel.ChatTypeGroup),
Config: []byte(`{"channel_id":"C1"}`),
}
if threadRoot != "" {
b.LastThreadID = pgtype.Text{String: threadRoot, Valid: true}
}
return b
}
func dmBinding() db.ChannelChatSessionBinding {
return db.ChannelChatSessionBinding{
InstallationID: uid(2),
ChannelChatID: "D1",
ChatType: string(channel.ChatTypeP2P),
Config: []byte(`{"channel_id":"D1"}`),
}
}
func newTestHistory(q historyQueries, fc historyClient) *History {
h := NewHistory(q, nil, nil) // nil decrypter => stored bytes treated as plaintext
h.newClient = func(string) historyClient { return fc }
return h
}
// TestHistoryFetchChannelScope verifies a channel-scope read uses
// conversations.history and normalizes oldest-first with roles + labels.
func TestHistoryFetchChannelScope(t *testing.T) {
q := &fakeHistoryQueries{binding: groupBinding("50.000000"), inst: activeSlackInstall()}
fc := &fakeHistoryClient{
// Slack returns newest-first; the bot (UBOT) replied last.
historyMsgs: []slack.Message{
msg("UBOT", "on it", "102.000000"),
msg("U1", "@bot look into this", "101.000000"),
msg("U2", "alert: 5xx spiking", "100.000000"),
},
users: []slack.User{{ID: "U1", RealName: "Alice"}}, // U2 unresolved -> positional
}
h := newTestHistory(q, fc)
page, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeChannel})
if err != nil {
t.Fatalf("Fetch: %v", err)
}
if fc.historyCalls != 1 || fc.repliesCalls != 0 {
t.Fatalf("expected conversations.history, got history=%d replies=%d", fc.historyCalls, fc.repliesCalls)
}
if fc.lastHistory.ChannelID != "C1" {
t.Errorf("channel id = %q, want C1", fc.lastHistory.ChannelID)
}
if page.ChannelType != "slack" || page.Scope != channel.HistoryScopeChannel {
t.Errorf("channel_type/scope = %q/%q, want slack/channel", page.ChannelType, page.Scope)
}
if len(page.Messages) != 3 || page.Messages[0].TS != "100.000000" || page.Messages[2].TS != "102.000000" {
t.Fatalf("expected 3 msgs oldest-first, got %+v", page.Messages)
}
if got := page.Messages[0]; got.Author != "User 1" || got.Role != channel.HistoryRoleUser {
t.Errorf("msg0 author/role = %q/%q, want User 1/user", got.Author, got.Role)
}
if got := page.Messages[1]; got.Author != "Alice" {
t.Errorf("msg1 author = %q, want Alice", got.Author)
}
if got := page.Messages[2]; got.Author != "Bot" || got.Role != channel.HistoryRoleAssistant {
t.Errorf("msg2 author/role = %q/%q, want Bot/assistant", got.Author, got.Role)
}
}
// TestHistoryFetchThreadScope verifies a thread-scope read uses
// conversations.replies anchored on the session's thread root (from the binding).
func TestHistoryFetchThreadScope(t *testing.T) {
q := &fakeHistoryQueries{binding: groupBinding("50.000000"), inst: activeSlackInstall()}
fc := &fakeHistoryClient{repliesMsgs: []slack.Message{
msg("U1", "second", "52.000000"),
msg("U1", "root", "50.000000"),
}}
h := newTestHistory(q, fc)
page, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeThread, Limit: 10})
if err != nil {
t.Fatalf("Fetch: %v", err)
}
if fc.repliesCalls != 1 || fc.historyCalls != 0 {
t.Fatalf("expected conversations.replies, got history=%d replies=%d", fc.historyCalls, fc.repliesCalls)
}
if fc.lastReplies.Timestamp != "50.000000" || fc.lastReplies.ChannelID != "C1" {
t.Errorf("replies anchored at %q/%q, want C1/50.000000", fc.lastReplies.ChannelID, fc.lastReplies.Timestamp)
}
if page.Scope != channel.HistoryScopeThread {
t.Errorf("scope = %q, want thread", page.Scope)
}
if len(page.Messages) != 2 || page.Messages[0].TS != "50.000000" {
t.Fatalf("expected 2 msgs oldest-first, got %+v", page.Messages)
}
}
// TestHistoryFetchDMIgnoresThreadScope confirms a DM (no threads) degrades a
// thread request to channel history.
func TestHistoryFetchDMIgnoresThreadScope(t *testing.T) {
q := &fakeHistoryQueries{binding: dmBinding(), inst: activeSlackInstall()}
fc := &fakeHistoryClient{historyMsgs: []slack.Message{msg("U1", "hi", "100.000000")}}
h := newTestHistory(q, fc)
page, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeThread})
if err != nil {
t.Fatalf("Fetch: %v", err)
}
if fc.historyCalls != 1 || fc.repliesCalls != 0 {
t.Fatalf("DM must use conversations.history, got history=%d replies=%d", fc.historyCalls, fc.repliesCalls)
}
if page.Scope != channel.HistoryScopeChannel {
t.Errorf("scope = %q, want channel (DM has no thread)", page.Scope)
}
}
// TestHistoryFetchThreadFallsBackWithoutRoot: a group binding with no recoverable
// thread root degrades a thread request to channel history.
func TestHistoryFetchThreadFallsBackWithoutRoot(t *testing.T) {
q := &fakeHistoryQueries{
binding: db.ChannelChatSessionBinding{InstallationID: uid(2), ChannelChatID: "C1", ChatType: string(channel.ChatTypeGroup), Config: []byte(`{"channel_id":"C1"}`)},
inst: activeSlackInstall(),
}
fc := &fakeHistoryClient{historyMsgs: []slack.Message{msg("U1", "x", "100.000000")}}
h := newTestHistory(q, fc)
page, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeThread})
if err != nil {
t.Fatalf("Fetch: %v", err)
}
if fc.historyCalls != 1 || fc.repliesCalls != 0 {
t.Fatalf("expected fallback to history, got history=%d replies=%d", fc.historyCalls, fc.repliesCalls)
}
if page.Scope != channel.HistoryScopeChannel {
t.Errorf("scope = %q, want channel", page.Scope)
}
}
// TestHistoryTargetDerivesRoot pins the channel + thread-root recovery from a
// binding: last_thread_id first, then the composite-key suffix, empty for a DM.
func TestHistoryTargetDerivesRoot(t *testing.T) {
if ch, root := historyTarget(groupBinding("50.0")); ch != "C1" || root != "50.0" {
t.Errorf("from last_thread_id: got %q/%q, want C1/50.0", ch, root)
}
keyOnly := db.ChannelChatSessionBinding{ChannelChatID: "C9:77.7", Config: []byte(`{"channel_id":"C9"}`)}
if ch, root := historyTarget(keyOnly); ch != "C9" || root != "77.7" {
t.Errorf("from key suffix: got %q/%q, want C9/77.7", ch, root)
}
if ch, root := historyTarget(dmBinding()); ch != "D1" || root != "" {
t.Errorf("dm: got %q/%q, want D1/<empty>", ch, root)
}
}
// TestHistoryFetchNoBinding maps a missing Slack binding to ErrNoSlackSession.
func TestHistoryFetchNoBinding(t *testing.T) {
q := &fakeHistoryQueries{bindingErr: pgx.ErrNoRows}
h := newTestHistory(q, &fakeHistoryClient{})
if _, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{}); !errors.Is(err, ErrNoSlackSession) {
t.Fatalf("err = %v, want ErrNoSlackSession", err)
}
}
// TestHistoryFetchInactiveInstall treats a revoked installation as empty.
func TestHistoryFetchInactiveInstall(t *testing.T) {
q := &fakeHistoryQueries{
binding: groupBinding("50.0"),
inst: db.ChannelInstallation{Status: "revoked", Config: slackInstallConfigJSON()},
}
h := newTestHistory(q, &fakeHistoryClient{})
if _, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{}); !errors.Is(err, ErrNoSlackSession) {
t.Fatalf("err = %v, want ErrNoSlackSession", err)
}
}
// TestHistoryLimitClamp confirms an over-large limit is clamped before the call.
func TestHistoryLimitClamp(t *testing.T) {
q := &fakeHistoryQueries{binding: groupBinding("50.0"), inst: activeSlackInstall()}
fc := &fakeHistoryClient{}
h := newTestHistory(q, fc)
if _, err := h.Fetch(context.Background(), uid(9), channel.HistoryOptions{Scope: channel.HistoryScopeChannel, Limit: 5000}); err != nil {
t.Fatalf("Fetch: %v", err)
}
if fc.lastHistory.Limit != maxHistoryLimit {
t.Errorf("limit = %d, want clamp to %d", fc.lastHistory.Limit, maxHistoryLimit)
}
}

View File

@@ -82,6 +82,11 @@ func slackSessionRouting(msg channel.InboundMessage) (bindingKey string, config
if msg.Source.ChatType == channel.ChatTypeP2P {
return chatID, cfg, msg.Source.ThreadID
}
// The thread root is the inbound thread_ts when the @mention is a reply
// inside an existing thread, else the message's own ts (a top-level mention
// becomes the root the bot threads its reply under). Either way the root is
// recoverable later from the binding (channel_chat_id suffix / last_thread_id),
// which is what the history reader uses to read the thread.
threadRoot := msg.Source.ThreadID
if threadRoot == "" {
threadRoot = msg.MessageID