feat(lark): add typing indicator lifecycle for inbound messages (#3860)

When a message is successfully ingested, send a Typing reaction to
the user's message. When the agent replies (EventChatDone) or fails
(EventTaskFailed), clear the reaction before the reply is visible.

- Add AddMessageReaction / DeleteMessageReaction to APIClient
- Implement reaction HTTP calls in httpAPIClient
- Introduce TypingIndicatorManager for per-session state tracking
- Wire into Hub (add on ingest) and Patcher (clear before reply)
- Skip typing for messages older than 2 minutes (WS replay guard)

Co-authored-by: miaolong001 <miaolong@xd.com>
This commit is contained in:
chyax98
2026-06-08 19:27:08 +08:00
committed by GitHub
parent 139cd755e2
commit 26ca943d45
11 changed files with 851 additions and 9 deletions

View File

@@ -224,6 +224,12 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
patcher := lark.NewPatcher(queries, installSvc, larkClient, lark.PatcherConfig{})
patcher.Register(bus)
// Typing indicator: shows a "processing" reaction on the user's
// message while the agent is working, then removes it before the
// reply is sent. Best-effort; failures are logged only.
typingIndicator := lark.NewTypingIndicatorManager(larkClient, installSvc, queries, slog.Default())
patcher.SetTypingIndicatorManager(typingIndicator)
// Inbound pipeline: lark_inbound_audit logger,
// channel-aware ChatSessionService, and the
// Dispatcher that orders identity / dedup / append /
@@ -263,6 +269,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
// "noop" so operators can spot it.
connectorFactory, connectorLabel := buildLarkConnectorFactory(installSvc, larkClient)
h.LarkHub = lark.NewHub(queries, connectorFactory, dispatcher, lark.HubConfig{})
h.LarkHub.SetTypingIndicatorManager(typingIndicator)
// OutcomeReplier wires the outbound side of the
// EventEmitter contract: NeedsBinding / AgentOffline /

View File

@@ -106,6 +106,16 @@ type APIClient interface {
// positional label. openIDs beyond Lark's 50-per-call cap are dropped
// by the client.
BatchGetUsers(ctx context.Context, creds InstallationCredentials, openIDs []string) (map[string]string, error)
// AddMessageReaction adds an emoji reaction to an existing message.
// The standard use-case is the "Typing" indicator that signals the
// Bot is processing the user's message. Returns the reaction_id Lark
// assigns so it can be removed later.
AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error)
// DeleteMessageReaction removes a previously-added reaction from a
// message. This is the cleanup half of the typing-indicator lifecycle.
DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error
}
// ListMessagesParams selects a bounded, recent window of messages in a
@@ -233,6 +243,22 @@ type BindingPromptParams struct {
BindURL string
}
// AddReactionParams is the input shape for adding an emoji reaction to
// a message.
type AddReactionParams struct {
InstallationID InstallationCredentials
MessageID string
EmojiType string
}
// DeleteReactionParams is the input shape for removing a previously-added
// reaction from a message.
type DeleteReactionParams struct {
InstallationID InstallationCredentials
MessageID string
ReactionID string
}
// InstallationCredentials is the per-installation transport context the
// client needs to authenticate against Lark on behalf of a workspace's
// bot. Passing these explicitly to each call (rather than constructing
@@ -333,3 +359,13 @@ func (s *stubAPIClient) BatchGetUsers(ctx context.Context, creds InstallationCre
s.log.Warn("lark stub client: BatchGetUsers called", "count", len(openIDs))
return nil, ErrAPIClientNotConfigured
}
func (s *stubAPIClient) AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) {
s.log.Warn("lark stub client: AddMessageReaction called", "message_id", p.MessageID, "emoji_type", p.EmojiType)
return "", ErrAPIClientNotConfigured
}
func (s *stubAPIClient) DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error {
s.log.Warn("lark stub client: DeleteMessageReaction called", "message_id", p.MessageID, "reaction_id", p.ReactionID)
return ErrAPIClientNotConfigured
}

View File

@@ -654,6 +654,73 @@ func (c *httpAPIClient) ListChatMessages(ctx context.Context, creds Installation
// a caller asking for more still gets the first 50 resolved.
const larkBatchGetUsersMaxIDs = 50
// AddMessageReaction adds an emoji reaction to a message via
// POST /open-apis/im/v1/messages/{message_id}/reactions.
// Returns the reaction_id so it can be deleted later.
func (c *httpAPIClient) AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) {
if p.MessageID == "" {
return "", errors.New("lark http client: missing message_id")
}
if p.EmojiType == "" {
return "", errors.New("lark http client: missing emoji_type")
}
token, err := c.tenantAccessToken(ctx, p.InstallationID)
if err != nil {
return "", err
}
body := map[string]any{
"reaction_type": map[string]string{"emoji_type": p.EmojiType},
}
path := "/open-apis/im/v1/messages/" + url.PathEscape(p.MessageID) + "/reactions"
var resp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
ReactionID string `json:"reaction_id"`
} `json:"data"`
}
if err := c.doJSON(ctx, c.resolveBaseURL(p.InstallationID), http.MethodPost, path, token, body, &resp); err != nil {
return "", fmt.Errorf("lark http client: add message reaction: %w", err)
}
if resp.Code != 0 || resp.Data.ReactionID == "" {
if isTokenError(resp.Code) {
c.invalidateToken(p.InstallationID.AppID)
}
return "", fmt.Errorf("lark http client: add message reaction: code=%d msg=%q", resp.Code, resp.Msg)
}
return resp.Data.ReactionID, nil
}
// DeleteMessageReaction removes a reaction from a message via
// DELETE /open-apis/im/v1/messages/{message_id}/reactions/{reaction_id}.
func (c *httpAPIClient) DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error {
if p.MessageID == "" {
return errors.New("lark http client: missing message_id")
}
if p.ReactionID == "" {
return errors.New("lark http client: missing reaction_id")
}
token, err := c.tenantAccessToken(ctx, p.InstallationID)
if err != nil {
return err
}
path := "/open-apis/im/v1/messages/" + url.PathEscape(p.MessageID) + "/reactions/" + url.PathEscape(p.ReactionID)
var resp struct {
Code int `json:"code"`
Msg string `json:"msg"`
}
if err := c.doJSON(ctx, c.resolveBaseURL(p.InstallationID), http.MethodDelete, path, token, nil, &resp); err != nil {
return fmt.Errorf("lark http client: delete message reaction: %w", err)
}
if resp.Code != 0 {
if isTokenError(resp.Code) {
c.invalidateToken(p.InstallationID.AppID)
}
return fmt.Errorf("lark http client: delete message reaction: code=%d msg=%q", resp.Code, resp.Msg)
}
return nil
}
// BatchGetUsers resolves user open_ids to display names via
// GET /open-apis/contact/v3/users/batch?user_ids=…&user_id_type=open_id.
// It mirrors fetchBotUnionID's single-user contact lookup, batched. Only

View File

@@ -27,6 +27,8 @@ type larkFakeServer struct {
sendN atomic.Int32
patchN atomic.Int32
bindN atomic.Int32
reactN atomic.Int32
delRN atomic.Int32
authObs atomic.Value // last Authorization header seen across all paths
}
@@ -130,6 +132,58 @@ func (f *larkFakeServer) stubPatch(resp map[string]any, verify func(r *http.Requ
})
}
// stubReaction installs the IM-reaction-create endpoint.
func (f *larkFakeServer) stubReaction(resp map[string]any, verify func(r *http.Request, id string, body map[string]any)) {
const suffix = "/reactions"
f.mux.HandleFunc("/open-apis/im/v1/messages/", func(w http.ResponseWriter, r *http.Request) {
if !strings.HasSuffix(r.URL.Path, suffix) {
return // let other handlers match
}
if r.Method != http.MethodPost {
f.t.Errorf("reaction: want POST, got %s", r.Method)
}
f.reactN.Add(1)
rawID := strings.TrimSuffix(strings.TrimPrefix(r.URL.Path, "/open-apis/im/v1/messages/"), suffix)
if rawID == "" {
f.t.Errorf("reaction: missing message id")
}
var body map[string]any
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
f.t.Errorf("reaction: decode body: %v", err)
}
if verify != nil {
verify(r, rawID, body)
}
writeJSON(w, resp)
})
}
// stubReactionDelete installs the IM-reaction-delete endpoint.
func (f *larkFakeServer) stubReactionDelete(resp map[string]any, verify func(r *http.Request, msgID string, reactionID string)) {
const prefix = "/open-apis/im/v1/messages/"
f.mux.HandleFunc(prefix, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
return // let other handlers match
}
rest := strings.TrimPrefix(r.URL.Path, prefix)
parts := strings.Split(rest, "/reactions/")
if len(parts) != 2 {
return // not a delete path
}
f.delRN.Add(1)
if parts[0] == "" {
f.t.Errorf("reaction delete: missing message id")
}
if parts[1] == "" {
f.t.Errorf("reaction delete: missing reaction id")
}
if verify != nil {
verify(r, parts[0], parts[1])
}
writeJSON(w, resp)
})
}
func writeJSON(w http.ResponseWriter, body any) {
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(body)
@@ -722,6 +776,94 @@ func TestHTTPClient_TokenEndpointError(t *testing.T) {
}
}
func TestHTTPClient_AddMessageReaction_HappyPath(t *testing.T) {
fake := newLarkFake(t)
fake.stubToken("tok_react", 7200)
fake.stubReaction(map[string]any{"code": 0, "msg": "ok", "data": map[string]string{"reaction_id": "re_42"}}, func(r *http.Request, id string, body map[string]any) {
if id != "om_user_msg_1" {
t.Errorf("message id: got %q want om_user_msg_1", id)
}
if got := r.Header.Get("Authorization"); got != "Bearer tok_react" {
t.Errorf("Authorization=%q want Bearer tok_react", got)
}
reactionType, ok := body["reaction_type"].(map[string]any)
if !ok {
t.Fatalf("reaction_type missing or wrong shape: %v", body)
}
if got := reactionType["emoji_type"]; got != "Typing" {
t.Errorf("emoji_type=%v want Typing", got)
}
})
c := newTestClient(fake, time.Now)
reactionID, err := c.AddMessageReaction(context.Background(), AddReactionParams{
InstallationID: testCreds(),
MessageID: "om_user_msg_1",
EmojiType: "Typing",
})
if err != nil {
t.Fatalf("AddMessageReaction: %v", err)
}
if reactionID != "re_42" {
t.Errorf("reaction id: got %q want re_42", reactionID)
}
if got := fake.reactN.Load(); got != 1 {
t.Fatalf("reaction endpoint calls=%d want 1", got)
}
}
func TestHTTPClient_DeleteMessageReaction_HappyPath(t *testing.T) {
fake := newLarkFake(t)
fake.stubToken("tok_del", 7200)
fake.stubReactionDelete(map[string]any{"code": 0, "msg": "ok"}, func(r *http.Request, msgID string, reactionID string) {
if msgID != "om_user_msg_1" {
t.Errorf("message id: got %q want om_user_msg_1", msgID)
}
if reactionID != "re_42" {
t.Errorf("reaction id: got %q want re_42", reactionID)
}
if got := r.Header.Get("Authorization"); got != "Bearer tok_del" {
t.Errorf("Authorization=%q want Bearer tok_del", got)
}
})
c := newTestClient(fake, time.Now)
if err := c.DeleteMessageReaction(context.Background(), DeleteReactionParams{
InstallationID: testCreds(),
MessageID: "om_user_msg_1",
ReactionID: "re_42",
}); err != nil {
t.Fatalf("DeleteMessageReaction: %v", err)
}
if got := fake.delRN.Load(); got != 1 {
t.Fatalf("reaction delete endpoint calls=%d want 1", got)
}
}
func TestHTTPClient_AddMessageReaction_Validation(t *testing.T) {
c := NewHTTPAPIClient(HTTPClientConfig{}).(*httpAPIClient)
_, err := c.AddMessageReaction(context.Background(), AddReactionParams{MessageID: "m"})
if err == nil || !strings.Contains(err.Error(), "missing emoji_type") {
t.Errorf("want missing emoji_type error, got %v", err)
}
_, err = c.AddMessageReaction(context.Background(), AddReactionParams{EmojiType: "Typing"})
if err == nil || !strings.Contains(err.Error(), "missing message_id") {
t.Errorf("want missing message_id error, got %v", err)
}
}
func TestHTTPClient_DeleteMessageReaction_Validation(t *testing.T) {
c := NewHTTPAPIClient(HTTPClientConfig{}).(*httpAPIClient)
err := c.DeleteMessageReaction(context.Background(), DeleteReactionParams{ReactionID: "re"})
if err == nil || !strings.Contains(err.Error(), "missing message_id") {
t.Errorf("want missing message_id error, got %v", err)
}
err = c.DeleteMessageReaction(context.Background(), DeleteReactionParams{MessageID: "m"})
if err == nil || !strings.Contains(err.Error(), "missing reaction_id") {
t.Errorf("want missing reaction_id error, got %v", err)
}
}
func TestHTTPClient_MissingAppCredentials(t *testing.T) {
c := NewHTTPAPIClient(HTTPClientConfig{}).(*httpAPIClient)
_, err := c.tenantAccessToken(context.Background(), InstallationCredentials{AppSecret: "x"})

View File

@@ -203,11 +203,12 @@ func (c HubConfig) withDefaults() HubConfig {
// ... ctx cancellation triggers ...
// hub.Wait() // joins on every per-installation goroutine
type Hub struct {
queries HubQueries
factory ConnectorFactory
dispatcher *Dispatcher
replier OutcomeReplier
cfg HubConfig
queries HubQueries
factory ConnectorFactory
dispatcher *Dispatcher
replier OutcomeReplier
typingIndicator *TypingIndicatorManager
cfg HubConfig
// nodeID is the per-process lease ownership token. The CAS
// predicate on AcquireLarkWSLease treats matching tokens as
@@ -293,6 +294,13 @@ func (h *Hub) SetOutcomeReplier(r OutcomeReplier) {
h.replier = r
}
// SetTypingIndicatorManager installs the typing-indicator manager on
// the Hub. Must be called BEFORE Run. Nil is safe and disables the
// typing reaction lifecycle.
func (h *Hub) SetTypingIndicatorManager(m *TypingIndicatorManager) {
h.typingIndicator = m
}
// NodeID exposes the per-process lease token. Useful for tests and
// for observability (so operators can correlate DB lease rows to a
// running replica).
@@ -773,6 +781,16 @@ func (h *Hub) handleEvent(ctx context.Context, inst db.LarkInstallation, log *sl
"outcome", string(res.Outcome),
"drop_reason", string(res.DropReason),
)
if res.Outcome == OutcomeIngested && h.typingIndicator != nil {
// Detached: the typing reaction HTTP call must not block the
// connector's ACK path. A short timeout keeps the goroutine
// from hanging if Lark is slow.
go func() {
addCtx, cancel := context.WithTimeout(context.Background(), h.cfg.ReplyTimeout)
defer cancel()
h.typingIndicator.Add(addCtx, inst, res.ChatSessionID, msg.MessageID, msg.CreateTime)
}()
}
h.scheduleReply(inst, msg, res, log)
return res, nil
}

View File

@@ -87,6 +87,12 @@ func (f *enricherFakeClient) SendBindingPromptCard(context.Context, BindingPromp
func (f *enricherFakeClient) GetBotInfo(context.Context, InstallationCredentials) (BotInfo, error) {
return BotInfo{}, nil
}
func (f *enricherFakeClient) AddMessageReaction(context.Context, AddReactionParams) (string, error) {
return "", nil
}
func (f *enricherFakeClient) DeleteMessageReaction(context.Context, DeleteReactionParams) error {
return nil
}
func textMsg(id, sender, text, createTime string) LarkMessage {
return LarkMessage{

View File

@@ -205,10 +205,11 @@ func (c PatcherConfig) withDefaults() PatcherConfig {
// most one replica holds the installation lease at a time, the
// event bus is per-process, so exactly one Patcher reacts per run.
type Patcher struct {
queries PatcherQueries
credentials CredentialsResolver
client APIClient
cfg PatcherConfig
queries PatcherQueries
credentials CredentialsResolver
client APIClient
typingIndicator *TypingIndicatorManager
cfg PatcherConfig
}
// NewPatcher constructs a Patcher bound to its dependencies. The
@@ -223,6 +224,14 @@ func NewPatcher(queries PatcherQueries, credentials CredentialsResolver, client
}
}
// SetTypingIndicatorManager wires the typing-indicator manager into the
// patcher so that replies clear the "processing" reaction before they
// are sent. Call once at boot after both the patcher and manager are
// constructed. Nil disables the clear step.
func (p *Patcher) SetTypingIndicatorManager(m *TypingIndicatorManager) {
p.typingIndicator = m
}
// Register subscribes the patcher to the task-lifecycle events it
// cares about on the supplied bus. Idempotent only if you call it
// against a fresh bus; call sites should invoke it exactly once
@@ -307,6 +316,13 @@ func (p *Patcher) processEvent(ctx context.Context, e events.Event) error {
agentName = agent.Name
}
// Clear the "processing" reaction before the reply is visible so the
// user sees a clean transition. Best-effort: a failure here is logged
// but does not block the actual reply.
if p.typingIndicator != nil {
p.typingIndicator.Clear(ctx, chatSessionID)
}
switch e.Type {
case protocol.EventChatDone:
return p.sendChatReply(ctx, creds, binding, e.Payload)

View File

@@ -128,6 +128,12 @@ func (f *fakeAPIClient) ListChatMessages(ctx context.Context, creds Installation
func (f *fakeAPIClient) BatchGetUsers(ctx context.Context, creds InstallationCredentials, openIDs []string) (map[string]string, error) {
return nil, nil
}
func (f *fakeAPIClient) AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) {
return "fake-reaction-id", nil
}
func (f *fakeAPIClient) DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error {
return nil
}
func newTestPatcher(t *testing.T) (*Patcher, *fakePatcherQueries, *fakeAPIClient) {
t.Helper()

View File

@@ -81,6 +81,12 @@ func (s *stubAPIClientWithRecorder) ListChatMessages(ctx context.Context, creds
func (s *stubAPIClientWithRecorder) BatchGetUsers(ctx context.Context, creds InstallationCredentials, openIDs []string) (map[string]string, error) {
return nil, nil
}
func (s *stubAPIClientWithRecorder) AddMessageReaction(ctx context.Context, p AddReactionParams) (string, error) {
return "stub-reaction-id", nil
}
func (s *stubAPIClientWithRecorder) DeleteMessageReaction(ctx context.Context, p DeleteReactionParams) error {
return nil
}
// stubCredentialsResolver returns a fixed plaintext secret.
type stubCredentialsResolver struct{ secret string }

View File

@@ -0,0 +1,220 @@
package lark
import (
"context"
"log/slog"
"strconv"
"sync"
"time"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// typingEmoji is the Lark emoji_type used for the "processing" indicator.
// It renders as a small typing-animation badge on the message.
const typingEmoji = "Typing"
// typingIndicatorMaxAge is how old a message can be before we skip the
// typing indicator. This prevents stale reactions when a WebSocket
// reconnect replays old events. Aligned with OpenClaw's 2-minute bound.
const typingIndicatorMaxAge = 2 * time.Minute
// TypingIndicatorState holds the identifiers needed to remove a reaction.
type TypingIndicatorState struct {
MessageID string
ReactionID string
}
// TypingIndicatorQueries is the narrow DB surface the manager needs.
type TypingIndicatorQueries interface {
GetLarkChatSessionBindingBySession(ctx context.Context, chatSessionID pgtype.UUID) (db.LarkChatSessionBinding, error)
GetLarkInstallation(ctx context.Context, id pgtype.UUID) (db.LarkInstallation, error)
}
// TypingIndicatorManager owns the "processing" reaction lifecycle for
// inbound Lark messages. When a message is successfully ingested it adds
// a Typing reaction; when the agent eventually replies (or fails) it
// clears the reaction(s) for that chat session.
//
// The manager is safe for concurrent use. It tolerates missing or
// stale state gracefully: adding a reaction to a message that already
// has one simply appends another state entry; clearing a session with
// no tracked state is a no-op.
type TypingIndicatorManager struct {
client APIClient
credentials CredentialsResolver
queries TypingIndicatorQueries
log *slog.Logger
mu sync.RWMutex
states map[string][]*TypingIndicatorState // key = chat_session_id string
}
// NewTypingIndicatorManager constructs a manager. All dependencies must
// be non-nil; the manager panics on nil client / credentials / queries.
func NewTypingIndicatorManager(client APIClient, credentials CredentialsResolver, queries TypingIndicatorQueries, log *slog.Logger) *TypingIndicatorManager {
if log == nil {
log = slog.Default()
}
return &TypingIndicatorManager{
client: client,
credentials: credentials,
queries: queries,
log: log,
states: make(map[string][]*TypingIndicatorState),
}
}
// Add sends a Typing reaction to the given message and records the state
// under the chat session. It is synchronous — the caller decides whether
// to run it in a detached goroutine. Errors are logged and swallowed.
//
// createTime is Lark's epoch-millisecond string (InboundMessage.CreateTime).
// Messages older than typingIndicatorMaxAge are silently skipped so that
// WebSocket replays and stale reconnects do not surface misleading "processing"
// badges on long-finished conversations.
func (m *TypingIndicatorManager) Add(ctx context.Context, inst db.LarkInstallation, chatSessionID pgtype.UUID, messageID string, createTime string) {
if messageID == "" {
return
}
if isMessageTooOld(createTime) {
m.log.Debug("lark typing indicator: message too old, skipping",
"chat_session_id", uuidString(chatSessionID),
"message_id", messageID,
"create_time", createTime,
)
return
}
creds, err := m.resolveCredentials(inst)
if err != nil {
m.log.Warn("lark typing indicator: failed to resolve credentials",
"chat_session_id", uuidString(chatSessionID),
"message_id", messageID,
"err", err,
)
return
}
reactionID, err := m.client.AddMessageReaction(ctx, AddReactionParams{
InstallationID: creds,
MessageID: messageID,
EmojiType: typingEmoji,
})
if err != nil {
m.log.Warn("lark typing indicator: add reaction failed",
"chat_session_id", uuidString(chatSessionID),
"message_id", messageID,
"err", err,
)
return
}
key := uuidString(chatSessionID)
m.mu.Lock()
m.states[key] = append(m.states[key], &TypingIndicatorState{
MessageID: messageID,
ReactionID: reactionID,
})
m.mu.Unlock()
m.log.Debug("lark typing indicator: reaction added",
"chat_session_id", key,
"message_id", messageID,
"reaction_id", reactionID,
)
}
// Clear removes every tracked Typing reaction for the chat session and
// drops the state entry. It is synchronous so the reaction is gone before
// the agent's reply is sent, giving the user a clean visual transition.
// Individual delete failures are logged but do not abort the loop.
func (m *TypingIndicatorManager) Clear(ctx context.Context, chatSessionID pgtype.UUID) {
key := uuidString(chatSessionID)
m.mu.Lock()
states := m.states[key]
delete(m.states, key)
m.mu.Unlock()
if len(states) == 0 {
return
}
binding, err := m.queries.GetLarkChatSessionBindingBySession(ctx, chatSessionID)
if err != nil {
m.log.Warn("lark typing indicator: failed to lookup binding for clear",
"chat_session_id", key,
"err", err,
)
return
}
inst, err := m.queries.GetLarkInstallation(ctx, binding.InstallationID)
if err != nil {
m.log.Warn("lark typing indicator: failed to lookup installation for clear",
"chat_session_id", key,
"err", err,
)
return
}
creds, err := m.resolveCredentials(inst)
if err != nil {
m.log.Warn("lark typing indicator: failed to resolve credentials for clear",
"chat_session_id", key,
"err", err,
)
return
}
for _, s := range states {
if s.ReactionID == "" {
continue
}
if err := m.client.DeleteMessageReaction(ctx, DeleteReactionParams{
InstallationID: creds,
MessageID: s.MessageID,
ReactionID: s.ReactionID,
}); err != nil {
m.log.Warn("lark typing indicator: delete reaction failed",
"chat_session_id", key,
"message_id", s.MessageID,
"reaction_id", s.ReactionID,
"err", err,
)
continue
}
m.log.Debug("lark typing indicator: reaction removed",
"chat_session_id", key,
"message_id", s.MessageID,
"reaction_id", s.ReactionID,
)
}
}
func isMessageTooOld(createTime string) bool {
if createTime == "" {
return false
}
ms, err := strconv.ParseInt(createTime, 10, 64)
if err != nil {
return false
}
return time.Since(time.UnixMilli(ms)) > typingIndicatorMaxAge
}
func (m *TypingIndicatorManager) resolveCredentials(inst db.LarkInstallation) (InstallationCredentials, error) {
secret, err := m.credentials.DecryptAppSecret(inst)
if err != nil {
return InstallationCredentials{}, err
}
creds := InstallationCredentials{
AppID: inst.AppID,
AppSecret: secret,
Region: RegionOrDefault(inst.Region),
}
if inst.TenantKey.Valid {
creds.TenantKey = inst.TenantKey.String
}
return creds, nil
}

View File

@@ -0,0 +1,318 @@
package lark
import (
"context"
"errors"
"strconv"
"testing"
"time"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// fakeTypingAPIClient records reaction calls and can be programmed to fail.
type fakeTypingAPIClient struct {
addCalled []addReactionCall
deleteCalled []deleteReactionCall
addErr error
deleteErr error
addReturn string
}
type addReactionCall struct {
creds InstallationCredentials
messageID string
emojiType string
}
type deleteReactionCall struct {
creds InstallationCredentials
messageID string
reactionID string
}
func (f *fakeTypingAPIClient) IsConfigured() bool { return true }
func (f *fakeTypingAPIClient) SendInteractiveCard(context.Context, SendCardParams) (string, error) {
return "", nil
}
func (f *fakeTypingAPIClient) PatchInteractiveCard(context.Context, PatchCardParams) error {
return nil
}
func (f *fakeTypingAPIClient) SendTextMessage(context.Context, SendTextParams) (string, error) {
return "", nil
}
func (f *fakeTypingAPIClient) SendMarkdownCard(context.Context, SendMarkdownCardParams) (string, error) {
return "", nil
}
func (f *fakeTypingAPIClient) SendBindingPromptCard(context.Context, BindingPromptParams) error {
return nil
}
func (f *fakeTypingAPIClient) GetBotInfo(context.Context, InstallationCredentials) (BotInfo, error) {
return BotInfo{}, nil
}
func (f *fakeTypingAPIClient) GetMessage(context.Context, InstallationCredentials, string) ([]LarkMessage, error) {
return nil, nil
}
func (f *fakeTypingAPIClient) ListChatMessages(context.Context, InstallationCredentials, ListMessagesParams) ([]LarkMessage, error) {
return nil, nil
}
func (f *fakeTypingAPIClient) BatchGetUsers(context.Context, InstallationCredentials, []string) (map[string]string, error) {
return nil, nil
}
func (f *fakeTypingAPIClient) AddMessageReaction(_ context.Context, p AddReactionParams) (string, error) {
f.addCalled = append(f.addCalled, addReactionCall{p.InstallationID, p.MessageID, p.EmojiType})
return f.addReturn, f.addErr
}
func (f *fakeTypingAPIClient) DeleteMessageReaction(_ context.Context, p DeleteReactionParams) error {
f.deleteCalled = append(f.deleteCalled, deleteReactionCall{p.InstallationID, p.MessageID, p.ReactionID})
return f.deleteErr
}
type fakeTypingQueries struct {
binding db.LarkChatSessionBinding
installation db.LarkInstallation
bindingErr error
installErr error
}
func (f *fakeTypingQueries) GetLarkChatSessionBindingBySession(context.Context, pgtype.UUID) (db.LarkChatSessionBinding, error) {
return f.binding, f.bindingErr
}
func (f *fakeTypingQueries) GetLarkInstallation(context.Context, pgtype.UUID) (db.LarkInstallation, error) {
return f.installation, f.installErr
}
type fakeTypingCreds struct{ secret string }
func (f fakeTypingCreds) DecryptAppSecret(inst db.LarkInstallation) (string, error) {
return f.secret, nil
}
func TestTypingIndicatorAddRecordsState(t *testing.T) {
api := &fakeTypingAPIClient{addReturn: "reaction-123"}
queries := &fakeTypingQueries{}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger())
inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"}
session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}, Valid: true}
mgr.Add(context.Background(), inst, session, "msg-1", "")
if len(api.addCalled) != 1 {
t.Fatalf("expected 1 add call, got %d", len(api.addCalled))
}
if api.addCalled[0].messageID != "msg-1" || api.addCalled[0].emojiType != typingEmoji {
t.Fatalf("unexpected add call params: %+v", api.addCalled[0])
}
key := uuidString(session)
mgr.mu.RLock()
states := mgr.states[key]
mgr.mu.RUnlock()
if len(states) != 1 || states[0].MessageID != "msg-1" || states[0].ReactionID != "reaction-123" {
t.Fatalf("unexpected state: %+v", states)
}
}
func TestTypingIndicatorAddSkipsOnEmptyMessageID(t *testing.T) {
api := &fakeTypingAPIClient{addReturn: "reaction-123"}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, &fakeTypingQueries{}, newDiscardLogger())
inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"}
session := pgtype.UUID{Bytes: [16]byte{1}, Valid: true}
mgr.Add(context.Background(), inst, session, "", "")
if len(api.addCalled) != 0 {
t.Fatalf("expected 0 add calls, got %d", len(api.addCalled))
}
}
func TestTypingIndicatorAddSkipsOldMessages(t *testing.T) {
api := &fakeTypingAPIClient{addReturn: "reaction-123"}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, &fakeTypingQueries{}, newDiscardLogger())
inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"}
session := pgtype.UUID{Bytes: [16]byte{1}, Valid: true}
oldTime := time.Now().Add(-3 * time.Minute).UnixMilli()
mgr.Add(context.Background(), inst, session, "msg-old", strconv.FormatInt(oldTime, 10))
if len(api.addCalled) != 0 {
t.Fatalf("expected 0 add calls for old message, got %d", len(api.addCalled))
}
}
func TestTypingIndicatorAddLogsOnAPIError(t *testing.T) {
api := &fakeTypingAPIClient{addErr: errors.New("lark down")}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, &fakeTypingQueries{}, newDiscardLogger())
inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"}
session := pgtype.UUID{Bytes: [16]byte{1}, Valid: true}
mgr.Add(context.Background(), inst, session, "msg-1", "")
if len(api.addCalled) != 1 {
t.Fatalf("expected 1 add call, got %d", len(api.addCalled))
}
key := uuidString(session)
mgr.mu.RLock()
states := mgr.states[key]
mgr.mu.RUnlock()
if len(states) != 0 {
t.Fatalf("expected 0 states after error, got %d", len(states))
}
}
func TestTypingIndicatorClearDeletesReactions(t *testing.T) {
api := &fakeTypingAPIClient{addReturn: "reaction-abc"}
queries := &fakeTypingQueries{
binding: db.LarkChatSessionBinding{
InstallationID: pgtype.UUID{Bytes: [16]byte{9, 9, 9, 9}, Valid: true},
},
installation: db.LarkInstallation{
ID: pgtype.UUID{Bytes: [16]byte{9, 9, 9, 9}, Valid: true},
AppID: "cli_test",
Region: "feishu",
},
}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger())
inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"}
session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true}
mgr.Add(context.Background(), inst, session, "msg-1", "")
if len(api.addCalled) != 1 {
t.Fatal("add should have been called")
}
mgr.Clear(context.Background(), session)
if len(api.deleteCalled) != 1 {
t.Fatalf("expected 1 delete call, got %d", len(api.deleteCalled))
}
if api.deleteCalled[0].messageID != "msg-1" || api.deleteCalled[0].reactionID != "reaction-abc" {
t.Fatalf("unexpected delete params: %+v", api.deleteCalled[0])
}
key := uuidString(session)
mgr.mu.RLock()
states := mgr.states[key]
mgr.mu.RUnlock()
if len(states) != 0 {
t.Fatalf("expected 0 states after clear, got %d", len(states))
}
}
func TestTypingIndicatorClearNoOpWhenEmpty(t *testing.T) {
api := &fakeTypingAPIClient{}
queries := &fakeTypingQueries{
binding: db.LarkChatSessionBinding{
InstallationID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true},
},
installation: db.LarkInstallation{
ID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true},
AppID: "cli_test",
Region: "feishu",
},
}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger())
session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true}
mgr.Clear(context.Background(), session)
if len(api.deleteCalled) != 0 {
t.Fatalf("expected 0 delete calls when empty, got %d", len(api.deleteCalled))
}
}
func TestTypingIndicatorClearLogsOnDeleteError(t *testing.T) {
api := &fakeTypingAPIClient{addReturn: "reaction-xyz", deleteErr: errors.New("delete failed")}
queries := &fakeTypingQueries{
binding: db.LarkChatSessionBinding{
InstallationID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true},
},
installation: db.LarkInstallation{
ID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true},
AppID: "cli_test",
Region: "feishu",
},
}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger())
inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"}
session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true}
mgr.Add(context.Background(), inst, session, "msg-1", "")
mgr.Clear(context.Background(), session)
if len(api.deleteCalled) != 1 {
t.Fatalf("expected 1 delete call attempt, got %d", len(api.deleteCalled))
}
}
func TestTypingIndicatorMultipleMessagesPerSession(t *testing.T) {
api := &fakeTypingAPIClient{addReturn: "reaction-n"}
queries := &fakeTypingQueries{
binding: db.LarkChatSessionBinding{
InstallationID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true},
},
installation: db.LarkInstallation{
ID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true},
AppID: "cli_test",
Region: "feishu",
},
}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger())
inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"}
session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true}
mgr.Add(context.Background(), inst, session, "msg-a", "")
mgr.Add(context.Background(), inst, session, "msg-b", "")
if len(api.addCalled) != 2 {
t.Fatalf("expected 2 add calls, got %d", len(api.addCalled))
}
mgr.Clear(context.Background(), session)
if len(api.deleteCalled) != 2 {
t.Fatalf("expected 2 delete calls, got %d", len(api.deleteCalled))
}
}
func TestTypingIndicatorConcurrentAddAndClear(t *testing.T) {
api := &fakeTypingAPIClient{addReturn: "reaction-concurrent"}
queries := &fakeTypingQueries{
binding: db.LarkChatSessionBinding{
InstallationID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true},
},
installation: db.LarkInstallation{
ID: pgtype.UUID{Bytes: [16]byte{9}, Valid: true},
AppID: "cli_test",
Region: "feishu",
},
}
mgr := NewTypingIndicatorManager(api, fakeTypingCreds{secret: "shh"}, queries, newDiscardLogger())
inst := db.LarkInstallation{AppID: "cli_test", Region: "feishu"}
session := pgtype.UUID{Bytes: [16]byte{1, 2, 3, 4}, Valid: true}
done := make(chan struct{})
go func() {
for i := 0; i < 50; i++ {
mgr.Add(context.Background(), inst, session, "msg", "")
}
close(done)
}()
go func() {
for i := 0; i < 50; i++ {
mgr.Clear(context.Background(), session)
}
}()
<-done
time.Sleep(10 * time.Millisecond)
}