mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-02 11:59:54 +02:00
Compare commits
2 Commits
codex/comm
...
agent/j/79
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cff2567c8a | ||
|
|
0dac35d49f |
@@ -43,6 +43,7 @@ oauth_config:
|
||||
- im:history
|
||||
- mpim:history
|
||||
- chat:write
|
||||
- reactions:write
|
||||
- users:read
|
||||
settings:
|
||||
event_subscriptions:
|
||||
@@ -66,6 +67,7 @@ settings:
|
||||
| `app_home.messages_tab_enabled: true` | メンバーが Bot を開いて **DM** できるようにします。これがないと、Bot に直接メッセージを送れません。 |
|
||||
| `bot_user` | @ メンションされ、返信を投稿する Bot のアイデンティティを作成します。 |
|
||||
| `chat:write` | エージェントの返信を Slack に投稿し返します。 |
|
||||
| `reactions:write` | エージェントの処理中、あなたのメッセージに 👀 リアクションを付け、返信時に外します。このスコープがないとインジケーターは黙ってスキップされます——他の機能はすべて動作します。 |
|
||||
| `app_mentions:read` + `app_mention` イベント | チャンネルでの @ メンションを受け取ります。 |
|
||||
| `im:history` + `message.im` | Bot への **DM** を受け取ります(すべての DM メッセージが読み取られます)。 |
|
||||
| `channels:history` / `groups:history` / `mpim:history` + 対応する `message.*` イベント | パブリックチャンネル、プライベートチャンネル、グループ DM のメッセージを受け取ります。これらの中では、Bot は自分を **@ メンション**したメッセージにのみ反応します。 |
|
||||
@@ -75,6 +77,10 @@ settings:
|
||||
|
||||
**OAuth リダイレクト URL はありません**。BYO は OAuth を使わないからです。
|
||||
|
||||
<Callout type="warning">
|
||||
以前のマニフェストでアプリを作成済みですか?**OAuth & Permissions → Bot Token Scopes** で **`reactions:write`** スコープを追加し、新しいスコープを反映させるために**アプリをワークスペースに再インストール**してください。それまではエージェントは通常どおり返信します——👀「処理中」リアクションがスキップされるだけです。
|
||||
</Callout>
|
||||
|
||||
<Callout type="info">
|
||||
Slack で特定の名前を表示したいですか? 作成前に `display_information.name` と `features.bot_user.display_name`(たとえばエージェントの名前に)を変更するか、あとで **App Home** で編集してください。Slack は Bot をその **bot display name** で表示しますが、これはアプリ名と異なる場合があります。
|
||||
</Callout>
|
||||
|
||||
@@ -43,6 +43,7 @@ oauth_config:
|
||||
- im:history
|
||||
- mpim:history
|
||||
- chat:write
|
||||
- reactions:write
|
||||
- users:read
|
||||
settings:
|
||||
event_subscriptions:
|
||||
@@ -66,6 +67,7 @@ settings:
|
||||
| `app_home.messages_tab_enabled: true` | 멤버가 봇을 열어 **DM**할 수 있게 합니다. 이것이 없으면 봇에게 직접 메시지를 보낼 수 없습니다. |
|
||||
| `bot_user` | `@`로 멘션되고 답변을 게시하는 봇 신원을 생성합니다. |
|
||||
| `chat:write` | 에이전트의 답변을 Slack으로 다시 게시합니다. |
|
||||
| `reactions:write` | 에이전트가 처리하는 동안 메시지에 👀 반응을 추가하고, 답변하면 제거합니다. 이 스코프가 없으면 표시가 조용히 건너뛰어집니다 — 나머지 기능은 모두 정상 동작합니다. |
|
||||
| `app_mentions:read` + `app_mention` 이벤트 | 채널에서 `@`-멘션을 받습니다. |
|
||||
| `im:history` + `message.im` | 봇에게 보내는 **DM**을 받습니다(모든 DM 메시지를 읽습니다). |
|
||||
| `channels:history` / `groups:history` / `mpim:history` + 대응하는 `message.*` 이벤트 | 공개 채널, 비공개 채널, 그룹 DM의 메시지를 받습니다. 이런 곳에서 봇은 자신을 **@로 멘션한** 메시지에만 반응합니다. |
|
||||
@@ -75,6 +77,10 @@ settings:
|
||||
|
||||
**OAuth redirect URL은 없습니다.** BYO는 OAuth를 사용하지 않기 때문입니다.
|
||||
|
||||
<Callout type="warning">
|
||||
이전 매니페스트로 이미 앱을 만들었나요? **OAuth & Permissions → Bot Token Scopes**에서 **`reactions:write`** 스코프를 추가한 뒤, 새 스코프가 적용되도록 **앱을 워크스페이스에 다시 설치**하세요. 그 전까지 에이전트는 정상적으로 답변하며 — 👀 "처리 중" 반응만 건너뜁니다.
|
||||
</Callout>
|
||||
|
||||
<Callout type="info">
|
||||
Slack에서 특정 이름을 쓰고 싶나요? 생성하기 전에 `display_information.name`과 `features.bot_user.display_name`을 (예: 에이전트 이름으로) 변경하거나, 나중에 **App Home**에서 편집하세요. Slack은 봇을 **bot display name**으로 표시하며, 이는 앱 이름과 다를 수 있습니다.
|
||||
</Callout>
|
||||
|
||||
@@ -43,6 +43,7 @@ oauth_config:
|
||||
- im:history
|
||||
- mpim:history
|
||||
- chat:write
|
||||
- reactions:write
|
||||
- users:read
|
||||
settings:
|
||||
event_subscriptions:
|
||||
@@ -66,6 +67,7 @@ This manifest configures everything Multica needs, so you don't set anything by
|
||||
| `app_home.messages_tab_enabled: true` | Lets members open the bot and **DM** it. Without it, the bot can't be messaged directly. |
|
||||
| `bot_user` | Creates the bot identity that gets @-mentioned and posts replies. |
|
||||
| `chat:write` | Post the agent's replies back into Slack. |
|
||||
| `reactions:write` | Add a 👀 reaction to your message while the agent is working, removed when it replies. Without this scope the indicator is silently skipped — everything else still works. |
|
||||
| `app_mentions:read` + `app_mention` event | Receive @-mentions in channels. |
|
||||
| `im:history` + `message.im` | Receive **DMs** to the bot (every DM message is read). |
|
||||
| `channels:history` / `groups:history` / `mpim:history` + the matching `message.*` events | Receive messages in public channels, private channels, and group DMs. In these, the bot only acts on messages that **@-mention** it. |
|
||||
@@ -75,6 +77,10 @@ This manifest configures everything Multica needs, so you don't set anything by
|
||||
|
||||
There is **no OAuth redirect URL**, because BYO doesn't use OAuth.
|
||||
|
||||
<Callout type="warning">
|
||||
Already created your app with an earlier manifest? Add the **`reactions:write`** scope under **OAuth & Permissions → Bot Token Scopes**, then **reinstall the app to your workspace** so the new scope takes effect. Until you do, the agent still replies normally — only the 👀 "processing" reaction is skipped.
|
||||
</Callout>
|
||||
|
||||
<Callout type="info">
|
||||
Want a specific name in Slack? Change `display_information.name` and `features.bot_user.display_name` (e.g. to your agent's name) before creating, or edit it later under **App Home**. Slack shows the bot by its **bot display name**, which can differ from the app name.
|
||||
</Callout>
|
||||
|
||||
@@ -43,6 +43,7 @@ oauth_config:
|
||||
- im:history
|
||||
- mpim:history
|
||||
- chat:write
|
||||
- reactions:write
|
||||
- users:read
|
||||
settings:
|
||||
event_subscriptions:
|
||||
@@ -66,6 +67,7 @@ settings:
|
||||
| `app_home.messages_tab_enabled: true` | 让成员能打开 Bot 并**私聊**它。没有它,Bot 就无法被直接发消息。 |
|
||||
| `bot_user` | 创建被 @ 和发回复用的那个 Bot 身份。 |
|
||||
| `chat:write` | 把智能体的回复发回 Slack。 |
|
||||
| `reactions:write` | 智能体处理期间在你的消息上加一个 👀 表情,回复后再移除。没有这个权限时,该提示会被静默跳过——其他功能都不受影响。 |
|
||||
| `app_mentions:read` + `app_mention` 事件 | 接收频道里的 @ 提及。 |
|
||||
| `im:history` + `message.im` | 接收发给 Bot 的**私聊**(每一条私聊消息都会被读取)。 |
|
||||
| `channels:history` / `groups:history` / `mpim:history` + 对应的 `message.*` 事件 | 接收公开频道、私有频道和群组私聊里的消息。在这些场景里,Bot 只对 **@ 了**它的消息做出响应。 |
|
||||
@@ -75,6 +77,10 @@ settings:
|
||||
|
||||
这里**没有 OAuth 重定向 URL**,因为 BYO 不使用 OAuth。
|
||||
|
||||
<Callout type="warning">
|
||||
已经用旧版 manifest 创建过 app?在 **OAuth & Permissions → Bot Token Scopes** 里加上 **`reactions:write`** 权限,然后**把 app 重新安装到工作区**让新权限生效。在此之前智能体仍然正常回复——只是会跳过 👀「处理中」表情。
|
||||
</Callout>
|
||||
|
||||
<Callout type="info">
|
||||
想在 Slack 里用一个特定的名字?在创建之前改 `display_information.name` 和 `features.bot_user.display_name`(比如改成你智能体的名字),或者之后在 **App Home** 里编辑。Slack 是按 Bot 的**显示名(bot display name)**来展示它的,这个名字可以和 app 名不一样。
|
||||
</Callout>
|
||||
|
||||
@@ -452,7 +452,16 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
AppURL: appURLFromEnv(),
|
||||
Logger: slog.Default(),
|
||||
})
|
||||
channelRouter.Register(slack.TypeSlack, slack.NewSlackResolverSet(queries, pool, slackReplier))
|
||||
// Typing indicator (MUL-3874): a 👀 reaction on the user's message
|
||||
// while the agent works, cleared when the run finishes or fails.
|
||||
// Best-effort; failures are logged only. Registered before the
|
||||
// outbound reply subscriber so, on EventChatDone, the reaction clears
|
||||
// ahead of the reply (bus delivery is synchronous, in subscription
|
||||
// order). Subscribing here is also the only path that clears the
|
||||
// reaction on a failed run, which the outbound replier does not handle.
|
||||
slackTyping := slack.NewTypingIndicatorManager(queries, box.Open, slog.Default())
|
||||
slackTyping.Register(bus)
|
||||
channelRouter.Register(slack.TypeSlack, slack.NewSlackResolverSet(queries, pool, slackReplier, slackTyping))
|
||||
slack.NewOutbound(queries, box.Open, slog.Default()).Register(bus)
|
||||
|
||||
// Per-installation inbound: the Supervisor builds + supervises one
|
||||
|
||||
@@ -176,10 +176,19 @@ type OutboundReplier interface {
|
||||
Reply(ctx context.Context, inst ResolvedInstallation, msg channel.InboundMessage, res Result)
|
||||
}
|
||||
|
||||
// TypingNotifier shows a "processing" indicator when a message is ingested.
|
||||
// Optional; nil disables it.
|
||||
// TypingNotifier shows a "processing" indicator when a message is ingested and
|
||||
// clears it once the message reaches a terminal outcome. Optional; nil disables
|
||||
// it.
|
||||
type TypingNotifier interface {
|
||||
// OnIngested shows the indicator for a successfully ingested message.
|
||||
OnIngested(ctx context.Context, inst ResolvedInstallation, msg channel.InboundMessage, sessionID pgtype.UUID)
|
||||
// OnSettled clears the indicator for a session whose run trigger produced no
|
||||
// task (agent offline / archived, or an enqueue failure). In that case no
|
||||
// task lifecycle event is ever published, so the platform's own bus-driven
|
||||
// clear (on chat-done / task-failed) would never fire and the indicator would
|
||||
// stick. The Router calls this from the debounced flush. Idempotent: a
|
||||
// session with no indicator is a no-op.
|
||||
OnSettled(ctx context.Context, sessionID pgtype.UUID)
|
||||
}
|
||||
|
||||
// ResolverSet is the per-platform bundle the Router runs the pipeline through.
|
||||
|
||||
@@ -348,9 +348,15 @@ func (r *Router) flushChatRun(set ResolverSet, inst ResolvedInstallation, msg ch
|
||||
if err != nil {
|
||||
r.logger.Error("channel router: flush reload chat session failed",
|
||||
"chat_session_id", uuidString(sessionID), "err", err.Error())
|
||||
r.clearTyping(ctx, set, sessionID)
|
||||
return
|
||||
}
|
||||
if _, err := r.tasks.EnqueueChatTask(ctx, session, initiatorUserID, forceFresh); err != nil {
|
||||
// No task was enqueued, so no task lifecycle event will ever publish and
|
||||
// the platform's bus-driven typing clear can never fire. Clear the
|
||||
// indicator here (before any notice) so the "processing" reaction does
|
||||
// not stick on the user's message.
|
||||
r.clearTyping(ctx, set, sessionID)
|
||||
switch {
|
||||
case errors.Is(err, service.ErrChatTaskAgentNoRuntime):
|
||||
r.emitFlushReply(ctx, set, inst, msg, sessionID, OutcomeAgentOffline)
|
||||
@@ -363,6 +369,15 @@ func (r *Router) flushChatRun(set ResolverSet, inst ResolvedInstallation, msg ch
|
||||
}
|
||||
}
|
||||
|
||||
// clearTyping asks the platform to drop the "processing" indicator for a session
|
||||
// whose flush produced no task run. A nil TypingNotifier (platform without the
|
||||
// feature) is a no-op.
|
||||
func (r *Router) clearTyping(ctx context.Context, set ResolverSet, sessionID pgtype.UUID) {
|
||||
if set.Typing != nil {
|
||||
set.Typing.OnSettled(ctx, sessionID)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Router) markPendingFresh(key string) {
|
||||
r.pendingFreshMu.Lock()
|
||||
defer r.pendingFreshMu.Unlock()
|
||||
|
||||
@@ -122,8 +122,9 @@ func (f *fakeReplier) calls() []Result {
|
||||
}
|
||||
|
||||
type fakeTyping struct {
|
||||
mu sync.Mutex
|
||||
count int
|
||||
mu sync.Mutex
|
||||
count int
|
||||
settled int
|
||||
}
|
||||
|
||||
func (f *fakeTyping) OnIngested(_ context.Context, _ ResolvedInstallation, _ channel.InboundMessage, _ pgtype.UUID) {
|
||||
@@ -131,7 +132,13 @@ func (f *fakeTyping) OnIngested(_ context.Context, _ ResolvedInstallation, _ cha
|
||||
defer f.mu.Unlock()
|
||||
f.count++
|
||||
}
|
||||
func (f *fakeTyping) calls() int { f.mu.Lock(); defer f.mu.Unlock(); return f.count }
|
||||
func (f *fakeTyping) OnSettled(_ context.Context, _ pgtype.UUID) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.settled++
|
||||
}
|
||||
func (f *fakeTyping) calls() int { f.mu.Lock(); defer f.mu.Unlock(); return f.count }
|
||||
func (f *fakeTyping) settledCalls() int { f.mu.Lock(); defer f.mu.Unlock(); return f.settled }
|
||||
|
||||
type fakeIssues struct {
|
||||
called bool
|
||||
@@ -458,6 +465,37 @@ func TestRouter_FlushOffline_RepliesAgentOffline(t *testing.T) {
|
||||
if !found {
|
||||
t.Fatalf("agent-no-runtime must emit an AgentOffline reply")
|
||||
}
|
||||
// The reaction was added on ingest but no task will run, so the bus-driven
|
||||
// clear never fires — the flush must clear the typing indicator itself.
|
||||
if h.typing.settledCalls() != 1 {
|
||||
t.Fatalf("offline flush must clear the typing indicator, got %d OnSettled calls", h.typing.settledCalls())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_FlushArchived_ClearsTyping(t *testing.T) {
|
||||
h := newHarness(t)
|
||||
h.tasks.err = service.ErrChatTaskAgentArchived
|
||||
if err := h.router.Handle(context.Background(), p2pMessage(t)); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if h.typing.settledCalls() != 1 {
|
||||
t.Fatalf("archived flush must clear the typing indicator, got %d OnSettled calls", h.typing.settledCalls())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_FlushSuccess_DoesNotClearTyping(t *testing.T) {
|
||||
h := newHarness(t)
|
||||
if err := h.router.Handle(context.Background(), p2pMessage(t)); err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if !h.tasks.wasCalled() {
|
||||
t.Fatalf("a healthy session must enqueue a task")
|
||||
}
|
||||
// A successfully enqueued task is cleared by the platform's bus-driven
|
||||
// handler on chat-done / task-failed, NOT by the flush.
|
||||
if h.typing.settledCalls() != 0 {
|
||||
t.Fatalf("successful flush must not clear the typing indicator, got %d OnSettled calls", h.typing.settledCalls())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_ForceFresh_Propagates(t *testing.T) {
|
||||
|
||||
@@ -253,3 +253,11 @@ func (r *feishuTypingNotifier) OnIngested(ctx context.Context, inst engine.Resol
|
||||
lm, _ := larkMsgFromRaw(msg)
|
||||
r.mgr.Add(ctx, larkInst, sessionID, msg.MessageID, lm.CreateTime)
|
||||
}
|
||||
|
||||
// OnSettled clears the reaction when the run trigger enqueued no task (agent
|
||||
// offline / archived, or an enqueue failure) — the Patcher's bus-driven clear on
|
||||
// chat-done / task-failed never fires for those, so without this the Typing
|
||||
// reaction sticks.
|
||||
func (r *feishuTypingNotifier) OnSettled(ctx context.Context, sessionID pgtype.UUID) {
|
||||
r.mgr.Clear(ctx, sessionID)
|
||||
}
|
||||
|
||||
@@ -29,9 +29,10 @@ const originSlackChat = "slack_chat"
|
||||
// the outbound binding-prompt / status / issue-created notices; pass a nil
|
||||
// engine.OutboundReplier to disable them (the inbound pipeline — route,
|
||||
// identity, dedup, session, /issue, run trigger — is fully functional without
|
||||
// it). Typing is left nil. (MUL-3666 wired the replier; stage 3 had it nil.)
|
||||
func NewSlackResolverSet(q *db.Queries, tx engine.TxStarter, replier engine.OutboundReplier) engine.ResolverSet {
|
||||
return engine.ResolverSet{
|
||||
// it). typing shows the "processing" reaction on ingest; pass nil to disable it
|
||||
// (MUL-3874). (MUL-3666 wired the replier; stage 3 had both nil.)
|
||||
func NewSlackResolverSet(q *db.Queries, tx engine.TxStarter, replier engine.OutboundReplier, typing *TypingIndicatorManager) engine.ResolverSet {
|
||||
set := engine.ResolverSet{
|
||||
Installation: &installationResolver{q: q},
|
||||
Identity: &identityResolver{q: q},
|
||||
Dedup: &deduper{q: q},
|
||||
@@ -44,6 +45,12 @@ func NewSlackResolverSet(q *db.Queries, tx engine.TxStarter, replier engine.Outb
|
||||
Replier: replier,
|
||||
OriginType: originSlackChat,
|
||||
}
|
||||
// Guard against assigning a nil *TypingIndicatorManager into the interface
|
||||
// field (which would make set.Typing a non-nil typed-nil); mirrors Feishu.
|
||||
if typing != nil {
|
||||
set.Typing = &slackTypingNotifier{mgr: typing}
|
||||
}
|
||||
return set
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -52,6 +59,7 @@ var (
|
||||
_ engine.Deduper = (*deduper)(nil)
|
||||
_ engine.SessionBinder = (*sessionBinder)(nil)
|
||||
_ engine.Auditor = (*auditor)(nil)
|
||||
_ engine.TypingNotifier = (*slackTypingNotifier)(nil)
|
||||
)
|
||||
|
||||
// slackBindingConfig is the opaque outbound routing persisted on the chat
|
||||
@@ -270,3 +278,27 @@ func (r *auditor) RecordDrop(ctx context.Context, instID pgtype.UUID, msg channe
|
||||
ChannelMessageID: nullText(msg.MessageID),
|
||||
})
|
||||
}
|
||||
|
||||
// ---- typing indicator ----
|
||||
|
||||
type slackTypingNotifier struct{ mgr *TypingIndicatorManager }
|
||||
|
||||
// OnIngested fires when a Slack message is successfully ingested. It reacts to
|
||||
// the user's message (channel = Source.ChatID, ts = MessageID) so the user sees
|
||||
// the bot is processing it. The resolved installation carries the bot token in
|
||||
// its Config blob — the InstallationResolver stashed the db.ChannelInstallation
|
||||
// row in Platform, the documented adapter boundary the core never reads.
|
||||
func (n *slackTypingNotifier) OnIngested(ctx context.Context, inst engine.ResolvedInstallation, msg channel.InboundMessage, sessionID pgtype.UUID) {
|
||||
ci, ok := inst.Platform.(db.ChannelInstallation)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
n.mgr.Add(ctx, ci, sessionID, msg.Source.ChatID, msg.MessageID)
|
||||
}
|
||||
|
||||
// OnSettled clears the reaction when the run trigger enqueued no task (agent
|
||||
// offline / archived, or an enqueue failure) — the bus-driven clear on
|
||||
// chat-done / task-failed never fires for those, so without this the 👀 sticks.
|
||||
func (n *slackTypingNotifier) OnSettled(ctx context.Context, sessionID pgtype.UUID) {
|
||||
n.mgr.Clear(ctx, sessionID)
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ func TestSlackThreadIsolation(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewSlackResolverSet(t *testing.T) {
|
||||
set := NewSlackResolverSet(nil, nil, nil)
|
||||
set := NewSlackResolverSet(nil, nil, nil, nil)
|
||||
if set.Installation == nil || set.Identity == nil || set.Dedup == nil || set.Session == nil || set.Audit == nil {
|
||||
t.Error("resolver set must populate all required resolvers")
|
||||
}
|
||||
@@ -103,12 +103,18 @@ func TestNewSlackResolverSet(t *testing.T) {
|
||||
if set.Replier != nil {
|
||||
t.Error("a nil replier arg must leave Replier nil (not a typed-nil interface)")
|
||||
}
|
||||
if set.Typing != nil {
|
||||
t.Error("a nil typing arg must leave Typing nil (not a typed-nil interface)")
|
||||
}
|
||||
|
||||
// A real replier threads through.
|
||||
set = NewSlackResolverSet(nil, nil, NewOutboundReplier(OutboundReplierConfig{}))
|
||||
// A real replier + typing manager thread through.
|
||||
set = NewSlackResolverSet(nil, nil, NewOutboundReplier(OutboundReplierConfig{}), NewTypingIndicatorManager(nil, nil, nil))
|
||||
if set.Replier == nil {
|
||||
t.Error("a non-nil replier must populate ResolverSet.Replier")
|
||||
}
|
||||
if set.Typing == nil {
|
||||
t.Error("a non-nil typing manager must populate ResolverSet.Typing")
|
||||
}
|
||||
}
|
||||
|
||||
func TestInstallationServesTeam(t *testing.T) {
|
||||
|
||||
227
server/internal/integrations/slack/typing_indicator.go
Normal file
227
server/internal/integrations/slack/typing_indicator.go
Normal file
@@ -0,0 +1,227 @@
|
||||
package slack
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
"github.com/slack-go/slack"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// typingEmoji is the Slack reaction name used as the "processing" indicator on
|
||||
// the user's message while the agent is working. Slack has no animated "typing"
|
||||
// reaction like Feishu's, so we use the universal 👀 ("seen, on it") convention
|
||||
// — a built-in emoji present in every workspace. Change this one constant to
|
||||
// swap the indicator. The installed Slack app needs the reactions:write scope
|
||||
// for the reaction to land; without it the add simply fails and is logged.
|
||||
const typingEmoji = "eyes"
|
||||
|
||||
// typingIndicatorMaxAge bounds how old an inbound message may be before we skip
|
||||
// the reaction, so a Socket Mode reconnect that replays old events does not
|
||||
// stamp "processing" badges onto long-finished conversations. Mirrors Feishu.
|
||||
const typingIndicatorMaxAge = 2 * time.Minute
|
||||
|
||||
// reactionAPI is the minimal Slack reaction surface the indicator needs.
|
||||
// *slack.Client satisfies it directly; tests inject a fake.
|
||||
type reactionAPI interface {
|
||||
AddReactionContext(ctx context.Context, name string, item slack.ItemRef) error
|
||||
RemoveReactionContext(ctx context.Context, name string, item slack.ItemRef) error
|
||||
}
|
||||
|
||||
// typingState is the (channel, message ts) pair needed to remove a reaction.
|
||||
// Slack removes by emoji name + item ref, so there is no reaction id to store.
|
||||
type typingState struct {
|
||||
ChannelID string
|
||||
MessageTS string
|
||||
}
|
||||
|
||||
// TypingIndicatorQueries is the narrow DB surface the manager needs to resolve
|
||||
// an installation's bot token when clearing a reaction. *db.Queries satisfies it
|
||||
// (the same two reads the outbound reply subscriber uses).
|
||||
type TypingIndicatorQueries interface {
|
||||
GetChannelChatSessionBindingBySession(ctx context.Context, arg db.GetChannelChatSessionBindingBySessionParams) (db.ChannelChatSessionBinding, error)
|
||||
GetChannelInstallation(ctx context.Context, arg db.GetChannelInstallationParams) (db.ChannelInstallation, error)
|
||||
}
|
||||
|
||||
// TypingIndicatorManager owns the "processing" reaction lifecycle for inbound
|
||||
// Slack messages: it adds a 👀 reaction when a message is ingested and removes
|
||||
// it when the agent's run finishes (EventChatDone) or fails (EventTaskFailed).
|
||||
//
|
||||
// It mirrors lark.TypingIndicatorManager: state is held in memory keyed by
|
||||
// chat_session_id, the bot token is re-resolved from the DB on clear (never held
|
||||
// in the map between add and clear), and every failure is logged and swallowed —
|
||||
// the indicator is best-effort and must never block or fail a real reply.
|
||||
type TypingIndicatorManager struct {
|
||||
q TypingIndicatorQueries
|
||||
decrypt Decrypter
|
||||
log *slog.Logger
|
||||
newAPI func(creds credentials) reactionAPI
|
||||
|
||||
mu sync.RWMutex
|
||||
states map[string][]typingState // key = chat_session_id string
|
||||
}
|
||||
|
||||
// NewTypingIndicatorManager builds a manager over the generated queries and the
|
||||
// bot-token decrypter. The Slack API client is constructed per call from the
|
||||
// installation's decrypted bot token (xoxb-), exactly like the outbound sender.
|
||||
func NewTypingIndicatorManager(q TypingIndicatorQueries, decrypt Decrypter, logger *slog.Logger) *TypingIndicatorManager {
|
||||
if logger == nil {
|
||||
logger = slog.Default()
|
||||
}
|
||||
return &TypingIndicatorManager{
|
||||
q: q,
|
||||
decrypt: decrypt,
|
||||
log: logger,
|
||||
newAPI: func(c credentials) reactionAPI { return slack.New(c.BotToken) },
|
||||
states: make(map[string][]typingState),
|
||||
}
|
||||
}
|
||||
|
||||
// Add reacts to the just-ingested message and records the state under the chat
|
||||
// session. inst is the resolved installation row whose Config blob carries the
|
||||
// encrypted bot token. It is synchronous — the Router calls it in a detached,
|
||||
// time-bounded goroutine. Errors are logged and swallowed.
|
||||
func (m *TypingIndicatorManager) Add(ctx context.Context, inst db.ChannelInstallation, sessionID pgtype.UUID, channelID, messageTS string) {
|
||||
if channelID == "" || messageTS == "" {
|
||||
return
|
||||
}
|
||||
if isMessageTooOld(messageTS) {
|
||||
m.log.Debug("slack typing indicator: message too old, skipping",
|
||||
"chat_session_id", util.UUIDToString(sessionID), "message_ts", messageTS)
|
||||
return
|
||||
}
|
||||
creds, err := decodeCredentials(inst.Config, m.decrypt)
|
||||
if err != nil {
|
||||
m.log.Warn("slack typing indicator: decode credentials failed",
|
||||
"chat_session_id", util.UUIDToString(sessionID), "err", err)
|
||||
return
|
||||
}
|
||||
if err := m.newAPI(creds).AddReactionContext(ctx, typingEmoji, slack.NewRefToMessage(channelID, messageTS)); err != nil {
|
||||
m.log.Warn("slack typing indicator: add reaction failed",
|
||||
"chat_session_id", util.UUIDToString(sessionID), "message_ts", messageTS, "err", err)
|
||||
return
|
||||
}
|
||||
key := util.UUIDToString(sessionID)
|
||||
m.mu.Lock()
|
||||
m.states[key] = append(m.states[key], typingState{ChannelID: channelID, MessageTS: messageTS})
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// Clear removes every tracked reaction for the chat session and drops the state.
|
||||
// It re-resolves the installation's bot token from the binding so no decrypted
|
||||
// token is held in memory between add and clear. Individual remove failures are
|
||||
// logged but do not abort the loop. Best-effort throughout.
|
||||
func (m *TypingIndicatorManager) Clear(ctx context.Context, sessionID pgtype.UUID) {
|
||||
key := util.UUIDToString(sessionID)
|
||||
m.mu.Lock()
|
||||
states := m.states[key]
|
||||
delete(m.states, key)
|
||||
m.mu.Unlock()
|
||||
if len(states) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
binding, err := m.q.GetChannelChatSessionBindingBySession(ctx, db.GetChannelChatSessionBindingBySessionParams{
|
||||
ChatSessionID: sessionID,
|
||||
ChannelType: string(TypeSlack),
|
||||
})
|
||||
if err != nil {
|
||||
// A missing binding means the session is not (or no longer) a Slack
|
||||
// target; nothing to clear, and not worth a warning.
|
||||
if !errors.Is(err, pgx.ErrNoRows) {
|
||||
m.log.Warn("slack typing indicator: lookup binding for clear failed",
|
||||
"chat_session_id", key, "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
inst, err := m.q.GetChannelInstallation(ctx, db.GetChannelInstallationParams{
|
||||
ID: binding.InstallationID,
|
||||
ChannelType: string(TypeSlack),
|
||||
})
|
||||
if err != nil {
|
||||
m.log.Warn("slack typing indicator: lookup installation for clear failed",
|
||||
"chat_session_id", key, "err", err)
|
||||
return
|
||||
}
|
||||
creds, err := decodeCredentials(inst.Config, m.decrypt)
|
||||
if err != nil {
|
||||
m.log.Warn("slack typing indicator: decode credentials for clear failed",
|
||||
"chat_session_id", key, "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
api := m.newAPI(creds)
|
||||
for _, s := range states {
|
||||
if err := api.RemoveReactionContext(ctx, typingEmoji, slack.NewRefToMessage(s.ChannelID, s.MessageTS)); err != nil {
|
||||
m.log.Warn("slack typing indicator: remove reaction failed",
|
||||
"chat_session_id", key, "message_ts", s.MessageTS, "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Register subscribes the manager to the task-lifecycle events that end a run so
|
||||
// the reaction is cleared on both success and failure. The outbound reply
|
||||
// subscriber only handles EventChatDone, so this is the only path that removes
|
||||
// the reaction when a run fails. Call once at boot against a fresh bus; register
|
||||
// it before the outbound subscriber so the reaction clears ahead of the reply on
|
||||
// EventChatDone (bus delivery is synchronous, in subscription order).
|
||||
func (m *TypingIndicatorManager) Register(bus *events.Bus) {
|
||||
bus.Subscribe(protocol.EventChatDone, m.handleEvent)
|
||||
bus.Subscribe(protocol.EventTaskFailed, m.handleEvent)
|
||||
}
|
||||
|
||||
func (m *TypingIndicatorManager) handleEvent(e events.Event) {
|
||||
sessionID, ok := chatSessionIDFromEvent(e)
|
||||
if !ok {
|
||||
// Issue / autopilot tasks carry no chat_session — nothing to clear.
|
||||
return
|
||||
}
|
||||
// Bus delivery is synchronous; bound the reaction calls so a stuck Slack
|
||||
// HTTP request cannot wedge the publish call site.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
m.Clear(ctx, sessionID)
|
||||
}
|
||||
|
||||
// chatSessionIDFromEvent recovers the chat session id from a task-lifecycle
|
||||
// event. EventChatDone sets it on the envelope; EventTaskFailed carries it only
|
||||
// in the broadcast payload map (chat tasks only), so both are checked.
|
||||
func chatSessionIDFromEvent(e events.Event) (pgtype.UUID, bool) {
|
||||
if e.ChatSessionID != "" {
|
||||
if id, err := util.ParseUUID(e.ChatSessionID); err == nil && id.Valid {
|
||||
return id, true
|
||||
}
|
||||
}
|
||||
if m, ok := e.Payload.(map[string]any); ok {
|
||||
if s, _ := m["chat_session_id"].(string); s != "" {
|
||||
if id, err := util.ParseUUID(s); err == nil && id.Valid {
|
||||
return id, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return pgtype.UUID{}, false
|
||||
}
|
||||
|
||||
// isMessageTooOld reports whether a Slack message ts ("<seconds>.<micros>") is
|
||||
// older than typingIndicatorMaxAge. A malformed or empty ts is treated as fresh
|
||||
// (not skipped) — we would rather over-react than drop a real message.
|
||||
func isMessageTooOld(ts string) bool {
|
||||
if ts == "" {
|
||||
return false
|
||||
}
|
||||
secs, err := strconv.ParseFloat(ts, 64)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return time.Since(time.Unix(0, int64(secs*float64(time.Second)))) > typingIndicatorMaxAge
|
||||
}
|
||||
157
server/internal/integrations/slack/typing_indicator_test.go
Normal file
157
server/internal/integrations/slack/typing_indicator_test.go
Normal file
@@ -0,0 +1,157 @@
|
||||
package slack
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/slack-go/slack"
|
||||
|
||||
"github.com/multica-ai/multica/server/internal/events"
|
||||
"github.com/multica-ai/multica/server/internal/util"
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
"github.com/multica-ai/multica/server/pkg/protocol"
|
||||
)
|
||||
|
||||
// fakeReactor records reaction add/remove calls so a test can assert the
|
||||
// indicator's lifecycle without a live Slack API client.
|
||||
type fakeReactor struct {
|
||||
addName string
|
||||
added []slack.ItemRef
|
||||
removed []slack.ItemRef
|
||||
addErr error
|
||||
}
|
||||
|
||||
func (f *fakeReactor) AddReactionContext(_ context.Context, name string, item slack.ItemRef) error {
|
||||
f.addName = name
|
||||
f.added = append(f.added, item)
|
||||
return f.addErr
|
||||
}
|
||||
|
||||
func (f *fakeReactor) RemoveReactionContext(_ context.Context, _ string, item slack.ItemRef) error {
|
||||
f.removed = append(f.removed, item)
|
||||
return nil
|
||||
}
|
||||
|
||||
func newTestTyping(q TypingIndicatorQueries, fr *fakeReactor) *TypingIndicatorManager {
|
||||
m := NewTypingIndicatorManager(q, nil, nil)
|
||||
m.newAPI = func(credentials) reactionAPI { return fr }
|
||||
return m
|
||||
}
|
||||
|
||||
// freshTS / staleTS build Slack ts strings ("<seconds>.<micros>") relative to
|
||||
// now so the max-age guard can be exercised deterministically.
|
||||
func freshTS() string { return fmt.Sprintf("%d.000100", time.Now().Unix()) }
|
||||
func staleTS() string {
|
||||
return fmt.Sprintf("%d.000100", time.Now().Add(-5*time.Minute).Unix())
|
||||
}
|
||||
|
||||
func TestTypingIndicator_AddThenClear(t *testing.T) {
|
||||
sessionID := uid(7)
|
||||
q := &fakeOutboundQueries{
|
||||
binding: db.ChannelChatSessionBinding{InstallationID: uid(1)},
|
||||
inst: db.ChannelInstallation{ID: uid(1), Status: "active", Config: slackInstallConfigJSON()},
|
||||
}
|
||||
fr := &fakeReactor{}
|
||||
m := newTestTyping(q, fr)
|
||||
|
||||
ts := freshTS()
|
||||
m.Add(context.Background(), db.ChannelInstallation{Config: slackInstallConfigJSON()}, sessionID, "C1", ts)
|
||||
|
||||
if len(fr.added) != 1 || fr.added[0].Channel != "C1" || fr.added[0].Timestamp != ts {
|
||||
t.Fatalf("add reaction = %+v, want one on C1/%s", fr.added, ts)
|
||||
}
|
||||
if fr.addName != typingEmoji {
|
||||
t.Errorf("emoji = %q, want %q", fr.addName, typingEmoji)
|
||||
}
|
||||
|
||||
m.Clear(context.Background(), sessionID)
|
||||
if len(fr.removed) != 1 || fr.removed[0].Channel != "C1" || fr.removed[0].Timestamp != ts {
|
||||
t.Fatalf("remove reaction = %+v, want one on C1/%s", fr.removed, ts)
|
||||
}
|
||||
|
||||
// State is dropped on clear, so a second clear is a no-op.
|
||||
m.Clear(context.Background(), sessionID)
|
||||
if len(fr.removed) != 1 {
|
||||
t.Errorf("second clear must be a no-op, removed %d times", len(fr.removed))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTypingIndicator_SkipsStaleAndEmpty(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
channelID string
|
||||
ts string
|
||||
}{
|
||||
{"stale message (replayed reconnect)", "C1", staleTS()},
|
||||
{"empty ts", "C1", ""},
|
||||
{"empty channel", "", freshTS()},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
fr := &fakeReactor{}
|
||||
m := newTestTyping(&fakeOutboundQueries{}, fr)
|
||||
m.Add(context.Background(), db.ChannelInstallation{Config: slackInstallConfigJSON()}, uid(7), tc.channelID, tc.ts)
|
||||
if len(fr.added) != 0 {
|
||||
t.Errorf("%s: must not add a reaction, added %d", tc.name, len(fr.added))
|
||||
}
|
||||
// Nothing recorded → a clear has nothing to remove.
|
||||
m.Clear(context.Background(), uid(7))
|
||||
if len(fr.removed) != 0 {
|
||||
t.Errorf("%s: clear must be a no-op, removed %d", tc.name, len(fr.removed))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTypingIndicator_ClearsOnTaskFailed(t *testing.T) {
|
||||
sessionID := uid(7)
|
||||
q := &fakeOutboundQueries{
|
||||
binding: db.ChannelChatSessionBinding{InstallationID: uid(1)},
|
||||
inst: db.ChannelInstallation{ID: uid(1), Status: "active", Config: slackInstallConfigJSON()},
|
||||
}
|
||||
fr := &fakeReactor{}
|
||||
m := newTestTyping(q, fr)
|
||||
m.Add(context.Background(), db.ChannelInstallation{Config: slackInstallConfigJSON()}, sessionID, "C1", freshTS())
|
||||
|
||||
// EventTaskFailed carries the session id only in the broadcast payload map,
|
||||
// not on the envelope — the clear handler must read it from there.
|
||||
m.handleEvent(events.Event{
|
||||
Type: protocol.EventTaskFailed,
|
||||
Payload: map[string]any{"chat_session_id": util.UUIDToString(sessionID)},
|
||||
})
|
||||
if len(fr.removed) != 1 {
|
||||
t.Fatalf("task-failed event must clear the reaction, removed %d", len(fr.removed))
|
||||
}
|
||||
}
|
||||
|
||||
func TestTypingIndicator_IgnoresNonChatEvent(t *testing.T) {
|
||||
fr := &fakeReactor{}
|
||||
m := newTestTyping(&fakeOutboundQueries{}, fr)
|
||||
// An issue/autopilot event with no chat session must not even reach a binding
|
||||
// lookup, let alone a reaction removal.
|
||||
m.handleEvent(events.Event{Type: protocol.EventTaskFailed, Payload: map[string]any{"task_id": "t1"}})
|
||||
if len(fr.removed) != 0 {
|
||||
t.Errorf("non-chat event must be ignored, removed %d", len(fr.removed))
|
||||
}
|
||||
}
|
||||
|
||||
// When the run trigger enqueues no task (agent offline / archived), no task
|
||||
// lifecycle event ever publishes, so the engine clears the indicator through the
|
||||
// notifier's OnSettled instead of the bus.
|
||||
func TestSlackTypingNotifier_OnSettledClears(t *testing.T) {
|
||||
sessionID := uid(7)
|
||||
q := &fakeOutboundQueries{
|
||||
binding: db.ChannelChatSessionBinding{InstallationID: uid(1)},
|
||||
inst: db.ChannelInstallation{ID: uid(1), Status: "active", Config: slackInstallConfigJSON()},
|
||||
}
|
||||
fr := &fakeReactor{}
|
||||
m := newTestTyping(q, fr)
|
||||
m.Add(context.Background(), db.ChannelInstallation{Config: slackInstallConfigJSON()}, sessionID, "C1", freshTS())
|
||||
|
||||
(&slackTypingNotifier{mgr: m}).OnSettled(context.Background(), sessionID)
|
||||
if len(fr.removed) != 1 || fr.removed[0].Channel != "C1" {
|
||||
t.Fatalf("OnSettled must clear the reaction, removed = %+v", fr.removed)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user