diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index ce97c8acf..7b305b8cc 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -224,6 +224,12 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus patcher := lark.NewPatcher(queries, installSvc, larkClient, lark.PatcherConfig{}) patcher.Register(bus) + // Typing indicator: shows a "processing" reaction on the user's + // message while the agent is working, then removes it before the + // reply is sent. Best-effort; failures are logged only. + typingIndicator := lark.NewTypingIndicatorManager(larkClient, installSvc, queries, slog.Default()) + patcher.SetTypingIndicatorManager(typingIndicator) + // Inbound pipeline: lark_inbound_audit logger, // channel-aware ChatSessionService, and the // Dispatcher that orders identity / dedup / append / @@ -263,6 +269,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus // "noop" so operators can spot it. connectorFactory, connectorLabel := buildLarkConnectorFactory(installSvc, larkClient) h.LarkHub = lark.NewHub(queries, connectorFactory, dispatcher, lark.HubConfig{}) + h.LarkHub.SetTypingIndicatorManager(typingIndicator) // OutcomeReplier wires the outbound side of the // EventEmitter contract: NeedsBinding / AgentOffline / diff --git a/server/internal/integrations/lark/client.go b/server/internal/integrations/lark/client.go index 7ab5fe7e3..890d0bb75 100644 --- a/server/internal/integrations/lark/client.go +++ b/server/internal/integrations/lark/client.go @@ -106,6 +106,16 @@ type APIClient interface { // positional label. openIDs beyond Lark's 50-per-call cap are dropped // by the client. BatchGetUsers(ctx context.Context, creds InstallationCredentials, openIDs []string) (map[string]string, error) + + // AddMessageReaction adds an emoji reaction to an existing message. + // The standard use-case is the "Typing" indicator that signals the + // Bot is processing the user's message. Returns the reaction_id Lark + // assigns so it can be removed later. + AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) + + // DeleteMessageReaction removes a previously-added reaction from a + // message. This is the cleanup half of the typing-indicator lifecycle. + DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error } // ListMessagesParams selects a bounded, recent window of messages in a @@ -233,6 +243,22 @@ type BindingPromptParams struct { BindURL string } +// AddReactionParams is the input shape for adding an emoji reaction to +// a message. +type AddReactionParams struct { + InstallationID InstallationCredentials + MessageID string + EmojiType string +} + +// DeleteReactionParams is the input shape for removing a previously-added +// reaction from a message. +type DeleteReactionParams struct { + InstallationID InstallationCredentials + MessageID string + ReactionID string +} + // InstallationCredentials is the per-installation transport context the // client needs to authenticate against Lark on behalf of a workspace's // bot. Passing these explicitly to each call (rather than constructing @@ -333,3 +359,13 @@ func (s *stubAPIClient) BatchGetUsers(ctx context.Context, creds InstallationCre s.log.Warn("lark stub client: BatchGetUsers called", "count", len(openIDs)) return nil, ErrAPIClientNotConfigured } + +func (s *stubAPIClient) AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) { + s.log.Warn("lark stub client: AddMessageReaction called", "message_id", p.MessageID, "emoji_type", p.EmojiType) + return "", ErrAPIClientNotConfigured +} + +func (s *stubAPIClient) DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error { + s.log.Warn("lark stub client: DeleteMessageReaction called", "message_id", p.MessageID, "reaction_id", p.ReactionID) + return ErrAPIClientNotConfigured +} diff --git a/server/internal/integrations/lark/http_client.go b/server/internal/integrations/lark/http_client.go index 7087fe535..ca2a982ce 100644 --- a/server/internal/integrations/lark/http_client.go +++ b/server/internal/integrations/lark/http_client.go @@ -654,6 +654,73 @@ func (c *httpAPIClient) ListChatMessages(ctx context.Context, creds Installation // a caller asking for more still gets the first 50 resolved. const larkBatchGetUsersMaxIDs = 50 +// AddMessageReaction adds an emoji reaction to a message via +// POST /open-apis/im/v1/messages/{message_id}/reactions. +// Returns the reaction_id so it can be deleted later. +func (c *httpAPIClient) AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) { + if p.MessageID == "" { + return "", errors.New("lark http client: missing message_id") + } + if p.EmojiType == "" { + return "", errors.New("lark http client: missing emoji_type") + } + token, err := c.tenantAccessToken(ctx, p.InstallationID) + if err != nil { + return "", err + } + body := map[string]any{ + "reaction_type": map[string]string{"emoji_type": p.EmojiType}, + } + path := "/open-apis/im/v1/messages/" + url.PathEscape(p.MessageID) + "/reactions" + var resp struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data struct { + ReactionID string `json:"reaction_id"` + } `json:"data"` + } + if err := c.doJSON(ctx, c.resolveBaseURL(p.InstallationID), http.MethodPost, path, token, body, &resp); err != nil { + return "", fmt.Errorf("lark http client: add message reaction: %w", err) + } + if resp.Code != 0 || resp.Data.ReactionID == "" { + if isTokenError(resp.Code) { + c.invalidateToken(p.InstallationID.AppID) + } + return "", fmt.Errorf("lark http client: add message reaction: code=%d msg=%q", resp.Code, resp.Msg) + } + return resp.Data.ReactionID, nil +} + +// DeleteMessageReaction removes a reaction from a message via +// DELETE /open-apis/im/v1/messages/{message_id}/reactions/{reaction_id}. +func (c *httpAPIClient) DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error { + if p.MessageID == "" { + return errors.New("lark http client: missing message_id") + } + if p.ReactionID == "" { + return errors.New("lark http client: missing reaction_id") + } + token, err := c.tenantAccessToken(ctx, p.InstallationID) + if err != nil { + return err + } + path := "/open-apis/im/v1/messages/" + url.PathEscape(p.MessageID) + "/reactions/" + url.PathEscape(p.ReactionID) + var resp struct { + Code int `json:"code"` + Msg string `json:"msg"` + } + if err := c.doJSON(ctx, c.resolveBaseURL(p.InstallationID), http.MethodDelete, path, token, nil, &resp); err != nil { + return fmt.Errorf("lark http client: delete message reaction: %w", err) + } + if resp.Code != 0 { + if isTokenError(resp.Code) { + c.invalidateToken(p.InstallationID.AppID) + } + return fmt.Errorf("lark http client: delete message reaction: code=%d msg=%q", resp.Code, resp.Msg) + } + return nil +} + // BatchGetUsers resolves user open_ids to display names via // GET /open-apis/contact/v3/users/batch?user_ids=…&user_id_type=open_id. // It mirrors fetchBotUnionID's single-user contact lookup, batched. Only diff --git a/server/internal/integrations/lark/http_client_test.go b/server/internal/integrations/lark/http_client_test.go index 139bec771..b79033982 100644 --- a/server/internal/integrations/lark/http_client_test.go +++ b/server/internal/integrations/lark/http_client_test.go @@ -27,6 +27,8 @@ type larkFakeServer struct { sendN atomic.Int32 patchN atomic.Int32 bindN atomic.Int32 + reactN atomic.Int32 + delRN atomic.Int32 authObs atomic.Value // last Authorization header seen across all paths } @@ -130,6 +132,58 @@ func (f *larkFakeServer) stubPatch(resp map[string]any, verify func(r *http.Requ }) } +// stubReaction installs the IM-reaction-create endpoint. +func (f *larkFakeServer) stubReaction(resp map[string]any, verify func(r *http.Request, id string, body map[string]any)) { + const suffix = "/reactions" + f.mux.HandleFunc("/open-apis/im/v1/messages/", func(w http.ResponseWriter, r *http.Request) { + if !strings.HasSuffix(r.URL.Path, suffix) { + return // let other handlers match + } + if r.Method != http.MethodPost { + f.t.Errorf("reaction: want POST, got %s", r.Method) + } + f.reactN.Add(1) + rawID := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/open-apis/im/v1/messages/"), suffix) + if rawID == "" { + f.t.Errorf("reaction: missing message id") + } + var body map[string]any + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + f.t.Errorf("reaction: decode body: %v", err) + } + if verify != nil { + verify(r, rawID, body) + } + writeJSON(w, resp) + }) +} + +// stubReactionDelete installs the IM-reaction-delete endpoint. +func (f *larkFakeServer) stubReactionDelete(resp map[string]any, verify func(r *http.Request, msgID string, reactionID string)) { + const prefix = "/open-apis/im/v1/messages/" + f.mux.HandleFunc(prefix, func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodDelete { + return // let other handlers match + } + rest := strings.TrimPrefix(r.URL.Path, prefix) + parts := strings.Split(rest, "/reactions/") + if len(parts) != 2 { + return // not a delete path + } + f.delRN.Add(1) + if parts[0] == "" { + f.t.Errorf("reaction delete: missing message id") + } + if parts[1] == "" { + f.t.Errorf("reaction delete: missing reaction id") + } + if verify != nil { + verify(r, parts[0], parts[1]) + } + writeJSON(w, resp) + }) +} + func writeJSON(w http.ResponseWriter, body any) { w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(body) @@ -722,6 +776,94 @@ func TestHTTPClient_TokenEndpointError(t *testing.T) { } } +func TestHTTPClient_AddMessageReaction_HappyPath(t *testing.T) { + fake := newLarkFake(t) + fake.stubToken("tok_react", 7200) + fake.stubReaction(map[string]any{"code": 0, "msg": "ok", "data": map[string]string{"reaction_id": "re_42"}}, func(r *http.Request, id string, body map[string]any) { + if id != "om_user_msg_1" { + t.Errorf("message id: got %q want om_user_msg_1", id) + } + if got := r.Header.Get("Authorization"); got != "Bearer tok_react" { + t.Errorf("Authorization=%q want Bearer tok_react", got) + } + reactionType, ok := body["reaction_type"].(map[string]any) + if !ok { + t.Fatalf("reaction_type missing or wrong shape: %v", body) + } + if got := reactionType["emoji_type"]; got != "Typing" { + t.Errorf("emoji_type=%v want Typing", got) + } + }) + + c := newTestClient(fake, time.Now) + reactionID, err := c.AddMessageReaction(context.Background(), AddReactionParams{ + InstallationID: testCreds(), + MessageID: "om_user_msg_1", + EmojiType: "Typing", + }) + if err != nil { + t.Fatalf("AddMessageReaction: %v", err) + } + if reactionID != "re_42" { + t.Errorf("reaction id: got %q want re_42", reactionID) + } + if got := fake.reactN.Load(); got != 1 { + t.Fatalf("reaction endpoint calls=%d want 1", got) + } +} + +func TestHTTPClient_DeleteMessageReaction_HappyPath(t *testing.T) { + fake := newLarkFake(t) + fake.stubToken("tok_del", 7200) + fake.stubReactionDelete(map[string]any{"code": 0, "msg": "ok"}, func(r *http.Request, msgID string, reactionID string) { + if msgID != "om_user_msg_1" { + t.Errorf("message id: got %q want om_user_msg_1", msgID) + } + if reactionID != "re_42" { + t.Errorf("reaction id: got %q want re_42", reactionID) + } + if got := r.Header.Get("Authorization"); got != "Bearer tok_del" { + t.Errorf("Authorization=%q want Bearer tok_del", got) + } + }) + + c := newTestClient(fake, time.Now) + if err := c.DeleteMessageReaction(context.Background(), DeleteReactionParams{ + InstallationID: testCreds(), + MessageID: "om_user_msg_1", + ReactionID: "re_42", + }); err != nil { + t.Fatalf("DeleteMessageReaction: %v", err) + } + if got := fake.delRN.Load(); got != 1 { + t.Fatalf("reaction delete endpoint calls=%d want 1", got) + } +} + +func TestHTTPClient_AddMessageReaction_Validation(t *testing.T) { + c := NewHTTPAPIClient(HTTPClientConfig{}).(*httpAPIClient) + _, err := c.AddMessageReaction(context.Background(), AddReactionParams{MessageID: "m"}) + if err == nil || !strings.Contains(err.Error(), "missing emoji_type") { + t.Errorf("want missing emoji_type error, got %v", err) + } + _, err = c.AddMessageReaction(context.Background(), AddReactionParams{EmojiType: "Typing"}) + if err == nil || !strings.Contains(err.Error(), "missing message_id") { + t.Errorf("want missing message_id error, got %v", err) + } +} + +func TestHTTPClient_DeleteMessageReaction_Validation(t *testing.T) { + c := NewHTTPAPIClient(HTTPClientConfig{}).(*httpAPIClient) + err := c.DeleteMessageReaction(context.Background(), DeleteReactionParams{ReactionID: "re"}) + if err == nil || !strings.Contains(err.Error(), "missing message_id") { + t.Errorf("want missing message_id error, got %v", err) + } + err = c.DeleteMessageReaction(context.Background(), DeleteReactionParams{MessageID: "m"}) + if err == nil || !strings.Contains(err.Error(), "missing reaction_id") { + t.Errorf("want missing reaction_id error, got %v", err) + } +} + func TestHTTPClient_MissingAppCredentials(t *testing.T) { c := NewHTTPAPIClient(HTTPClientConfig{}).(*httpAPIClient) _, err := c.tenantAccessToken(context.Background(), InstallationCredentials{AppSecret: "x"}) diff --git a/server/internal/integrations/lark/hub.go b/server/internal/integrations/lark/hub.go index 5063ab798..b2d81a914 100644 --- a/server/internal/integrations/lark/hub.go +++ b/server/internal/integrations/lark/hub.go @@ -203,11 +203,12 @@ func (c HubConfig) withDefaults() HubConfig { // ... ctx cancellation triggers ... // hub.Wait() // joins on every per-installation goroutine type Hub struct { - queries HubQueries - factory ConnectorFactory - dispatcher *Dispatcher - replier OutcomeReplier - cfg HubConfig + queries HubQueries + factory ConnectorFactory + dispatcher *Dispatcher + replier OutcomeReplier + typingIndicator *TypingIndicatorManager + cfg HubConfig // nodeID is the per-process lease ownership token. The CAS // predicate on AcquireLarkWSLease treats matching tokens as @@ -293,6 +294,13 @@ func (h *Hub) SetOutcomeReplier(r OutcomeReplier) { h.replier = r } +// SetTypingIndicatorManager installs the typing-indicator manager on +// the Hub. Must be called BEFORE Run. Nil is safe and disables the +// typing reaction lifecycle. +func (h *Hub) SetTypingIndicatorManager(m *TypingIndicatorManager) { + h.typingIndicator = m +} + // NodeID exposes the per-process lease token. Useful for tests and // for observability (so operators can correlate DB lease rows to a // running replica). @@ -773,6 +781,16 @@ func (h *Hub) handleEvent(ctx context.Context, inst db.LarkInstallation, log *sl "outcome", string(res.Outcome), "drop_reason", string(res.DropReason), ) + if res.Outcome == OutcomeIngested && h.typingIndicator != nil { + // Detached: the typing reaction HTTP call must not block the + // connector's ACK path. A short timeout keeps the goroutine + // from hanging if Lark is slow. + go func() { + addCtx, cancel := context.WithTimeout(context.Background(), h.cfg.ReplyTimeout) + defer cancel() + h.typingIndicator.Add(addCtx, inst, res.ChatSessionID, msg.MessageID, msg.CreateTime) + }() + } h.scheduleReply(inst, msg, res, log) return res, nil } diff --git a/server/internal/integrations/lark/inbound_enricher_test.go b/server/internal/integrations/lark/inbound_enricher_test.go index eb9911de0..2897cd404 100644 --- a/server/internal/integrations/lark/inbound_enricher_test.go +++ b/server/internal/integrations/lark/inbound_enricher_test.go @@ -87,6 +87,12 @@ func (f *enricherFakeClient) SendBindingPromptCard(context.Context, BindingPromp func (f *enricherFakeClient) GetBotInfo(context.Context, InstallationCredentials) (BotInfo, error) { return BotInfo{}, nil } +func (f *enricherFakeClient) AddMessageReaction(context.Context, AddReactionParams) (string, error) { + return "", nil +} +func (f *enricherFakeClient) DeleteMessageReaction(context.Context, DeleteReactionParams) error { + return nil +} func textMsg(id, sender, text, createTime string) LarkMessage { return LarkMessage{ diff --git a/server/internal/integrations/lark/outbound.go b/server/internal/integrations/lark/outbound.go index 61fe6d799..737a9d274 100644 --- a/server/internal/integrations/lark/outbound.go +++ b/server/internal/integrations/lark/outbound.go @@ -205,10 +205,11 @@ func (c PatcherConfig) withDefaults() PatcherConfig { // most one replica holds the installation lease at a time, the // event bus is per-process, so exactly one Patcher reacts per run. type Patcher struct { - queries PatcherQueries - credentials CredentialsResolver - client APIClient - cfg PatcherConfig + queries PatcherQueries + credentials CredentialsResolver + client APIClient + typingIndicator *TypingIndicatorManager + cfg PatcherConfig } // NewPatcher constructs a Patcher bound to its dependencies. The @@ -223,6 +224,14 @@ func NewPatcher(queries PatcherQueries, credentials CredentialsResolver, client } } +// SetTypingIndicatorManager wires the typing-indicator manager into the +// patcher so that replies clear the "processing" reaction before they +// are sent. Call once at boot after both the patcher and manager are +// constructed. Nil disables the clear step. +func (p *Patcher) SetTypingIndicatorManager(m *TypingIndicatorManager) { + p.typingIndicator = m +} + // Register subscribes the patcher to the task-lifecycle events it // cares about on the supplied bus. Idempotent only if you call it // against a fresh bus; call sites should invoke it exactly once @@ -307,6 +316,13 @@ func (p *Patcher) processEvent(ctx context.Context, e events.Event) error { agentName = agent.Name } + // Clear the "processing" reaction before the reply is visible so the + // user sees a clean transition. Best-effort: a failure here is logged + // but does not block the actual reply. + if p.typingIndicator != nil { + p.typingIndicator.Clear(ctx, chatSessionID) + } + switch e.Type { case protocol.EventChatDone: return p.sendChatReply(ctx, creds, binding, e.Payload) diff --git a/server/internal/integrations/lark/outbound_test.go b/server/internal/integrations/lark/outbound_test.go index a5e195d1d..ec239f0b6 100644 --- a/server/internal/integrations/lark/outbound_test.go +++ b/server/internal/integrations/lark/outbound_test.go @@ -128,6 +128,12 @@ func (f *fakeAPIClient) ListChatMessages(ctx context.Context, creds Installation func (f *fakeAPIClient) BatchGetUsers(ctx context.Context, creds InstallationCredentials, openIDs []string) (map[string]string, error) { return nil, nil } +func (f *fakeAPIClient) AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) { + return "fake-reaction-id", nil +} +func (f *fakeAPIClient) DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error { + return nil +} func newTestPatcher(t *testing.T) (*Patcher, *fakePatcherQueries, *fakeAPIClient) { t.Helper() diff --git a/server/internal/integrations/lark/outcome_replier_test.go b/server/internal/integrations/lark/outcome_replier_test.go index 1dd3850cd..cb3581a0b 100644 --- a/server/internal/integrations/lark/outcome_replier_test.go +++ b/server/internal/integrations/lark/outcome_replier_test.go @@ -81,6 +81,12 @@ func (s *stubAPIClientWithRecorder) ListChatMessages(ctx context.Context, creds func (s *stubAPIClientWithRecorder) BatchGetUsers(ctx context.Context, creds InstallationCredentials, openIDs []string) (map[string]string, error) { return nil, nil } +func (s *stubAPIClientWithRecorder) AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) { + return "stub-reaction-id", nil +} +func (s *stubAPIClientWithRecorder) DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error { + return nil +} // stubCredentialsResolver returns a fixed plaintext secret. type stubCredentialsResolver struct{ secret string } diff --git a/server/internal/integrations/lark/typing_indicator.go b/server/internal/integrations/lark/typing_indicator.go new file mode 100644 index 000000000..8bf1d9e1f --- /dev/null +++ b/server/internal/integrations/lark/typing_indicator.go @@ -0,0 +1,220 @@ +package lark + +import ( + "context" + "log/slog" + "strconv" + "sync" + "time" + + "github.com/jackc/pgx/v5/pgtype" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +// typingEmoji is the Lark emoji_type used for the "processing" indicator. +// It renders as a small typing-animation badge on the message. +const typingEmoji = "Typing" + +// typingIndicatorMaxAge is how old a message can be before we skip the +// typing indicator. This prevents stale reactions when a WebSocket +// reconnect replays old events. Aligned with OpenClaw's 2-minute bound. +const typingIndicatorMaxAge = 2 * time.Minute + +// TypingIndicatorState holds the identifiers needed to remove a reaction. +type TypingIndicatorState struct { + MessageID string + ReactionID string +} + +// TypingIndicatorQueries is the narrow DB surface the manager needs. +type TypingIndicatorQueries interface { + GetLarkChatSessionBindingBySession(ctx context.Context, chatSessionID pgtype.UUID) (db.LarkChatSessionBinding, error) + GetLarkInstallation(ctx context.Context, id pgtype.UUID) (db.LarkInstallation, error) +} + +// TypingIndicatorManager owns the "processing" reaction lifecycle for +// inbound Lark messages. When a message is successfully ingested it adds +// a Typing reaction; when the agent eventually replies (or fails) it +// clears the reaction(s) for that chat session. +// +// The manager is safe for concurrent use. It tolerates missing or +// stale state gracefully: adding a reaction to a message that already +// has one simply appends another state entry; clearing a session with +// no tracked state is a no-op. +type TypingIndicatorManager struct { + client APIClient + credentials CredentialsResolver + queries TypingIndicatorQueries + log *slog.Logger + + mu sync.RWMutex + states map[string][]*TypingIndicatorState // key = chat_session_id string +} + +// NewTypingIndicatorManager constructs a manager. All dependencies must +// be non-nil; the manager panics on nil client / credentials / queries. +func NewTypingIndicatorManager(client APIClient, credentials CredentialsResolver, queries TypingIndicatorQueries, log *slog.Logger) *TypingIndicatorManager { + if log == nil { + log = slog.Default() + } + return &TypingIndicatorManager{ + client: client, + credentials: credentials, + queries: queries, + log: log, + states: make(map[string][]*TypingIndicatorState), + } +} + +// Add sends a Typing reaction to the given message and records the state +// under the chat session. It is synchronous — the caller decides whether +// to run it in a detached goroutine. Errors are logged and swallowed. +// +// createTime is Lark's epoch-millisecond string (InboundMessage.CreateTime). +// Messages older than typingIndicatorMaxAge are silently skipped so that +// WebSocket replays and stale reconnects do not surface misleading "processing" +// badges on long-finished conversations. +func (m *TypingIndicatorManager) Add(ctx context.Context, inst db.LarkInstallation, chatSessionID pgtype.UUID, messageID string, createTime string) { + if messageID == "" { + return + } + if isMessageTooOld(createTime) { + m.log.Debug("lark typing indicator: message too old, skipping", + "chat_session_id", uuidString(chatSessionID), + "message_id", messageID, + "create_time", createTime, + ) + return + } + creds, err := m.resolveCredentials(inst) + if err != nil { + m.log.Warn("lark typing indicator: failed to resolve credentials", + "chat_session_id", uuidString(chatSessionID), + "message_id", messageID, + "err", err, + ) + return + } + + reactionID, err := m.client.AddMessageReaction(ctx, AddReactionParams{ + InstallationID: creds, + MessageID: messageID, + EmojiType: typingEmoji, + }) + if err != nil { + m.log.Warn("lark typing indicator: add reaction failed", + "chat_session_id", uuidString(chatSessionID), + "message_id", messageID, + "err", err, + ) + return + } + + key := uuidString(chatSessionID) + m.mu.Lock() + m.states[key] = append(m.states[key], &TypingIndicatorState{ + MessageID: messageID, + ReactionID: reactionID, + }) + m.mu.Unlock() + + m.log.Debug("lark typing indicator: reaction added", + "chat_session_id", key, + "message_id", messageID, + "reaction_id", reactionID, + ) +} + +// Clear removes every tracked Typing reaction for the chat session and +// drops the state entry. It is synchronous so the reaction is gone before +// the agent's reply is sent, giving the user a clean visual transition. +// Individual delete failures are logged but do not abort the loop. +func (m *TypingIndicatorManager) Clear(ctx context.Context, chatSessionID pgtype.UUID) { + key := uuidString(chatSessionID) + m.mu.Lock() + states := m.states[key] + delete(m.states, key) + m.mu.Unlock() + + if len(states) == 0 { + return + } + + binding, err := m.queries.GetLarkChatSessionBindingBySession(ctx, chatSessionID) + if err != nil { + m.log.Warn("lark typing indicator: failed to lookup binding for clear", + "chat_session_id", key, + "err", err, + ) + return + } + + inst, err := m.queries.GetLarkInstallation(ctx, binding.InstallationID) + if err != nil { + m.log.Warn("lark typing indicator: failed to lookup installation for clear", + "chat_session_id", key, + "err", err, + ) + return + } + + creds, err := m.resolveCredentials(inst) + if err != nil { + m.log.Warn("lark typing indicator: failed to resolve credentials for clear", + "chat_session_id", key, + "err", err, + ) + return + } + + for _, s := range states { + if s.ReactionID == "" { + continue + } + if err := m.client.DeleteMessageReaction(ctx, DeleteReactionParams{ + InstallationID: creds, + MessageID: s.MessageID, + ReactionID: s.ReactionID, + }); err != nil { + m.log.Warn("lark typing indicator: delete reaction failed", + "chat_session_id", key, + "message_id", s.MessageID, + "reaction_id", s.ReactionID, + "err", err, + ) + continue + } + m.log.Debug("lark typing indicator: reaction removed", + "chat_session_id", key, + "message_id", s.MessageID, + "reaction_id", s.ReactionID, + ) + } +} + +func isMessageTooOld(createTime string) bool { + if createTime == "" { + return false + } + ms, err := strconv.ParseInt(createTime, 10, 64) + if err != nil { + return false + } + return time.Since(time.UnixMilli(ms)) > typingIndicatorMaxAge +} + +func (m *TypingIndicatorManager) resolveCredentials(inst db.LarkInstallation) (InstallationCredentials, error) { + secret, err := m.credentials.DecryptAppSecret(inst) + if err != nil { + return InstallationCredentials{}, err + } + creds := InstallationCredentials{ + AppID: inst.AppID, + AppSecret: secret, + Region: RegionOrDefault(inst.Region), + } + if inst.TenantKey.Valid { + creds.TenantKey = inst.TenantKey.String + } + return creds, nil +} diff --git a/server/internal/integrations/lark/typing_indicator_test.go b/server/internal/integrations/lark/typing_indicator_test.go new file mode 100644 index 000000000..96152d702 --- /dev/null +++ b/server/internal/integrations/lark/typing_indicator_test.go @@ -0,0 +1,318 @@ +package lark + +import ( + "context" + "errors" + "strconv" + "testing" + "time" + + "github.com/jackc/pgx/v5/pgtype" + db "github.com/multica-ai/multica/server/pkg/db/generated" +) + +// fakeTypingAPIClient records reaction calls and can be programmed to fail. +type fakeTypingAPIClient struct { + addCalled []addReactionCall + deleteCalled []deleteReactionCall + addErr error + deleteErr error + addReturn string +} + +type addReactionCall struct { + creds InstallationCredentials + messageID string + emojiType string +} + +type deleteReactionCall struct { + creds InstallationCredentials + messageID string + reactionID string +} + +func (f *fakeTypingAPIClient) IsConfigured() bool { return true } +func (f *fakeTypingAPIClient) SendInteractiveCard(context.Context, SendCardParams) (string, error) { + return "", nil +} +func (f *fakeTypingAPIClient) PatchInteractiveCard(context.Context, PatchCardParams) error { + return nil +} +func (f *fakeTypingAPIClient) SendTextMessage(context.Context, SendTextParams) (string, error) { + return "", nil +} +func (f *fakeTypingAPIClient) SendMarkdownCard(context.Context, SendMarkdownCardParams) (string, error) { + return "", nil +} +func (f *fakeTypingAPIClient) SendBindingPromptCard(context.Context, BindingPromptParams) error { + return nil +} +func (f *fakeTypingAPIClient) GetBotInfo(context.Context, InstallationCredentials) (BotInfo, error) { + return BotInfo{}, nil +} +func (f *fakeTypingAPIClient) GetMessage(context.Context, InstallationCredentials, string) ([]LarkMessage, error) { + return nil, nil +} +func (f *fakeTypingAPIClient) ListChatMessages(context.Context, InstallationCredentials, ListMessagesParams) ([]LarkMessage, error) { + return nil, nil +} +func (f *fakeTypingAPIClient) BatchGetUsers(context.Context, InstallationCredentials, []string) (map[string]string, error) { + return nil, nil +} +func (f *fakeTypingAPIClient) AddMessageReaction(_ context.Context, p AddReactionParams) (string, error) { + f.addCalled = append(f.addCalled, addReactionCall{p.InstallationID, p.MessageID, p.EmojiType}) + return f.addReturn, f.addErr +} +func (f *fakeTypingAPIClient) DeleteMessageReaction(_ context.Context, p DeleteReactionParams) error { + f.deleteCalled = append(f.deleteCalled, deleteReactionCall{p.InstallationID, p.MessageID, p.ReactionID}) + return f.deleteErr +} + +type fakeTypingQueries struct { + binding db.LarkChatSessionBinding + installation db.LarkInstallation + bindingErr error + installErr error +} + +func (f *fakeTypingQueries) GetLarkChatSessionBindingBySession(context.Context, pgtype.UUID) (db.LarkChatSessionBinding, error) { + return f.binding, f.bindingErr +} +func (f *fakeTypingQueries) GetLarkInstallation(context.Context, pgtype.UUID) (db.LarkInstallation, error) { + return f.installation, f.installErr +} + +type fakeTypingCreds struct{ secret string } + +func (f fakeTypingCreds) DecryptAppSecret(inst db.LarkInstallation) (string, error) { + return f.secret, nil +} + +func TestTypingIndicatorAddRecordsState(t *testing.T) { + api := &fakeTypingAPIClient{addReturn: "reaction-123"} + queries := &fakeTypingQueries{} + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger()) + + inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"} + session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, Valid: true} + + mgr.Add(context.Background(), inst, session, "msg-1", "") + + if len(api.addCalled) != 1 { + t.Fatalf("expected 1 add call, got %d", len(api.addCalled)) + } + if api.addCalled[0].messageID != "msg-1" || api.addCalled[0].emojiType != typingEmoji { + t.Fatalf("unexpected add call params: %+v", api.addCalled[0]) + } + + key := uuidString(session) + mgr.mu.RLock() + states := mgr.states[key] + mgr.mu.RUnlock() + if len(states) != 1 || states[0].MessageID != "msg-1" || states[0].ReactionID != "reaction-123" { + t.Fatalf("unexpected state: %+v", states) + } +} + +func TestTypingIndicatorAddSkipsOnEmptyMessageID(t *testing.T) { + api := &fakeTypingAPIClient{addReturn: "reaction-123"} + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, &fakeTypingQueries{}, newDiscardLogger()) + + inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"} + session := pgtype.UUID{Bytes: [16]byte{1}, Valid: true} + + mgr.Add(context.Background(), inst, session, "", "") + + if len(api.addCalled) != 0 { + t.Fatalf("expected 0 add calls, got %d", len(api.addCalled)) + } +} + +func TestTypingIndicatorAddSkipsOldMessages(t *testing.T) { + api := &fakeTypingAPIClient{addReturn: "reaction-123"} + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, &fakeTypingQueries{}, newDiscardLogger()) + + inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"} + session := pgtype.UUID{Bytes: [16]byte{1}, Valid: true} + + oldTime := time.Now().Add(-3 * time.Minute).UnixMilli() + mgr.Add(context.Background(), inst, session, "msg-old", strconv.FormatInt(oldTime, 10)) + + if len(api.addCalled) != 0 { + t.Fatalf("expected 0 add calls for old message, got %d", len(api.addCalled)) + } +} + +func TestTypingIndicatorAddLogsOnAPIError(t *testing.T) { + api := &fakeTypingAPIClient{addErr: errors.New("lark down")} + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, &fakeTypingQueries{}, newDiscardLogger()) + + inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"} + session := pgtype.UUID{Bytes: [16]byte{1}, Valid: true} + + mgr.Add(context.Background(), inst, session, "msg-1", "") + + if len(api.addCalled) != 1 { + t.Fatalf("expected 1 add call, got %d", len(api.addCalled)) + } + + key := uuidString(session) + mgr.mu.RLock() + states := mgr.states[key] + mgr.mu.RUnlock() + if len(states) != 0 { + t.Fatalf("expected 0 states after error, got %d", len(states)) + } +} + +func TestTypingIndicatorClearDeletesReactions(t *testing.T) { + api := &fakeTypingAPIClient{addReturn: "reaction-abc"} + queries := &fakeTypingQueries{ + binding: db.LarkChatSessionBinding{ + InstallationID: pgtype.UUID{Bytes: [16]byte{9, 9, 9, 9}, Valid: true}, + }, + installation: db.LarkInstallation{ + ID: pgtype.UUID{Bytes: [16]byte{9, 9, 9, 9}, Valid: true}, + AppID: "cli_test", + Region: "feishu", + }, + } + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger()) + + inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"} + session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true} + + mgr.Add(context.Background(), inst, session, "msg-1", "") + if len(api.addCalled) != 1 { + t.Fatal("add should have been called") + } + + mgr.Clear(context.Background(), session) + + if len(api.deleteCalled) != 1 { + t.Fatalf("expected 1 delete call, got %d", len(api.deleteCalled)) + } + if api.deleteCalled[0].messageID != "msg-1" || api.deleteCalled[0].reactionID != "reaction-abc" { + t.Fatalf("unexpected delete params: %+v", api.deleteCalled[0]) + } + + key := uuidString(session) + mgr.mu.RLock() + states := mgr.states[key] + mgr.mu.RUnlock() + if len(states) != 0 { + t.Fatalf("expected 0 states after clear, got %d", len(states)) + } +} + +func TestTypingIndicatorClearNoOpWhenEmpty(t *testing.T) { + api := &fakeTypingAPIClient{} + queries := &fakeTypingQueries{ + binding: db.LarkChatSessionBinding{ + InstallationID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true}, + }, + installation: db.LarkInstallation{ + ID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true}, + AppID: "cli_test", + Region: "feishu", + }, + } + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger()) + + session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true} + mgr.Clear(context.Background(), session) + + if len(api.deleteCalled) != 0 { + t.Fatalf("expected 0 delete calls when empty, got %d", len(api.deleteCalled)) + } +} + +func TestTypingIndicatorClearLogsOnDeleteError(t *testing.T) { + api := &fakeTypingAPIClient{addReturn: "reaction-xyz", deleteErr: errors.New("delete failed")} + queries := &fakeTypingQueries{ + binding: db.LarkChatSessionBinding{ + InstallationID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true}, + }, + installation: db.LarkInstallation{ + ID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true}, + AppID: "cli_test", + Region: "feishu", + }, + } + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger()) + + inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"} + session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true} + + mgr.Add(context.Background(), inst, session, "msg-1", "") + mgr.Clear(context.Background(), session) + + if len(api.deleteCalled) != 1 { + t.Fatalf("expected 1 delete call attempt, got %d", len(api.deleteCalled)) + } +} + +func TestTypingIndicatorMultipleMessagesPerSession(t *testing.T) { + api := &fakeTypingAPIClient{addReturn: "reaction-n"} + queries := &fakeTypingQueries{ + binding: db.LarkChatSessionBinding{ + InstallationID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true}, + }, + installation: db.LarkInstallation{ + ID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true}, + AppID: "cli_test", + Region: "feishu", + }, + } + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger()) + + inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"} + session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true} + + mgr.Add(context.Background(), inst, session, "msg-a", "") + mgr.Add(context.Background(), inst, session, "msg-b", "") + + if len(api.addCalled) != 2 { + t.Fatalf("expected 2 add calls, got %d", len(api.addCalled)) + } + + mgr.Clear(context.Background(), session) + + if len(api.deleteCalled) != 2 { + t.Fatalf("expected 2 delete calls, got %d", len(api.deleteCalled)) + } +} + +func TestTypingIndicatorConcurrentAddAndClear(t *testing.T) { + api := &fakeTypingAPIClient{addReturn: "reaction-concurrent"} + queries := &fakeTypingQueries{ + binding: db.LarkChatSessionBinding{ + InstallationID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true}, + }, + installation: db.LarkInstallation{ + ID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true}, + AppID: "cli_test", + Region: "feishu", + }, + } + mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger()) + + inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"} + session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true} + + done := make(chan struct{}) + go func() { + for i := 0; i < 50; i++ { + mgr.Add(context.Background(), inst, session, "msg", "") + } + close(done) + }() + go func() { + for i := 0; i < 50; i++ { + mgr.Clear(context.Background(), session) + } + }() + <-done + time.Sleep(10 * time.Millisecond) +}