Compare commits

...

1 Commits

Author SHA1 Message Date
J
9a39a309ae feat(lark): inbound context enrichment — post / merge_forward / quoted-reply (MUL-2951)
Expand an inbound Lark bot message's body before dispatch with the context
a user explicitly attached, so the agent sees a semantically complete
conversation instead of a bare "@bot 总结一下".

- post: flatten rich-text (title + paragraphs, links, @-mentions) to plain
  text synchronously in the decoder.
- merge_forward: inline the forwarded transcript via a single GetMessage —
  GET /open-apis/im/v1/messages/{id} returns the forward sentinel plus the
  bundled children. (The issue's container_id_type=merge_forward query is
  undocumented; this avoids it and also handles a forwarded quoted parent.)
- quoted reply: prepend the parent_id message as a <quoted_message> block;
  a parent that is itself a forward nests a <forwarded_messages> block.
- new InboundEnricher runs in the WS connector between decode and emit,
  bounded by EnrichTimeout and degrading to "[unable to fetch]" placeholders
  so it never blocks the ~3s long-conn ACK budget.

/issue stays parseable on a quote-reply by parsing the command from the
user's own text (CommandBody) rather than the enriched body.

Short-window debounce batching (issue item #4) is tracked as a follow-up.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-03 23:52:15 +08:00
17 changed files with 1524 additions and 12 deletions

View File

@@ -250,7 +250,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
// inbound messages will be silently dropped until the
// config is fixed, with the boot log labelling the mode
// "noop" so operators can spot it.
connectorFactory, connectorLabel := buildLarkConnectorFactory(installSvc)
connectorFactory, connectorLabel := buildLarkConnectorFactory(installSvc, larkClient)
h.LarkHub = lark.NewHub(queries, connectorFactory, dispatcher, lark.HubConfig{})
// OutcomeReplier wires the outbound side of the
@@ -943,7 +943,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
//
// Returns the factory plus a short label for the boot log: "ws" in
// the healthy case, "noop" in the fallback case.
func buildLarkConnectorFactory(installSvc *lark.InstallationService) (lark.ConnectorFactory, string) {
func buildLarkConnectorFactory(installSvc *lark.InstallationService, apiClient lark.APIClient) (lark.ConnectorFactory, string) {
endpointFetcher, err := lark.NewHTTPConnectionTokenFetcher(lark.HTTPConnectionTokenConfig{
BaseURL: strings.TrimSpace(os.Getenv("MULTICA_LARK_CALLBACK_BASE_URL")),
Logger: slog.Default(),
@@ -968,10 +968,16 @@ func buildLarkConnectorFactory(installSvc *lark.InstallationService) (lark.Conne
}
return creds, nil
})
// Inbound enricher: expands quoted replies / forwarded bundles into
// the agent's body via the IM API before dispatch. It shares the
// connector's resolved credentials and runs under the connector's
// EnrichTimeout so it cannot overrun the Lark long-conn ACK budget.
enricher := lark.NewInboundEnricher(apiClient, lark.InboundEnricherConfig{Logger: slog.Default()})
conn, err := lark.NewWSLongConnConnector(lark.WSConnectorConfig{
Dialer: dialer,
EndpointFetcher: endpointFetcher,
FrameDecoder: decoder,
Enricher: enricher,
CredentialsProvider: credsProvider,
Logger: slog.Default(),
})

View File

@@ -71,9 +71,16 @@ type EnsureChatSessionParams struct {
// for callers that have already finalized dedup outside the
// transaction.
type AppendUserMessageParams struct {
ChatSessionID pgtype.UUID
Sender pgtype.UUID
Body string
ChatSessionID pgtype.UUID
Sender pgtype.UUID
// Body is the full text stored as the chat_message — including any
// quoted-reply / forwarded context the enricher inlined.
Body string
// CommandBody is the user's own typed text, used as the `/issue`
// command source. It is the un-enriched Body; when empty (callers
// that don't set it), `/issue` parsing falls back to Body so
// behavior is unchanged for the non-enriched path.
CommandBody string
InstallationID pgtype.UUID
LarkMessageID string
ClaimToken pgtype.UUID

View File

@@ -198,11 +198,22 @@ func (s *chatSessionService) AppendUserMessage(ctx context.Context, p AppendUser
defer tx.Rollback(ctx)
qtx := s.queries.WithTx(tx)
// Parse the command BEFORE the insert, so the "/issue alone → use
// previous user message" fallback queries from the message set
// that does NOT yet include the message currently being appended.
// Otherwise the previous-message lookup would self-reference.
cmd, _ := parseIssueCommand(p.Body)
// Parse the command from the user's OWN typed text (CommandBody),
// not the stored Body: the enricher prepends quoted / forwarded
// context to Body, which would push a `/issue …` off the first line
// and silently stop creating the issue (parseIssueCommand only
// inspects the first non-empty line). Fall back to Body when
// CommandBody is unset so non-enriched callers are unaffected.
//
// Parse BEFORE the insert so the "/issue alone → use previous user
// message" fallback queries from the message set that does NOT yet
// include the message currently being appended; otherwise the
// previous-message lookup would self-reference.
commandSource := p.CommandBody
if commandSource == "" {
commandSource = p.Body
}
cmd, _ := parseIssueCommand(commandSource)
if cmd != nil && cmd.Title == "" {
prev, err := qtx.GetMostRecentUserChatMessage(ctx, p.ChatSessionID)
if err == nil {

View File

@@ -69,6 +69,46 @@ type APIClient interface {
// is then frozen into lark_installation alongside the app_id /
// app_secret in the same transaction as the installer-bind.
GetBotInfo(ctx context.Context, creds InstallationCredentials) (BotInfo, error)
// GetMessage fetches a message by id via
// GET /open-apis/im/v1/messages/{message_id}. Lark always returns an
// ARRAY (data.items[]): for a normal message exactly one element;
// for a `merge_forward` message the first element is the forward
// sentinel and the remaining elements are the bundled child messages
// (each a normal typed message linked back by upper_message_id). The
// inbound enricher relies on both shapes: items[0] for a quoted-reply
// parent, items[1:] for a forwarded transcript. Returning the raw
// slice keeps this method a thin transport adapter — flattening and
// block assembly are the enricher's job.
GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error)
}
// LarkMessage is the normalized slice of an IM v1 message item the
// enricher needs. Body.content is passed through raw (still the
// JSON-encoded, msg_type-specific string Lark double-encodes) so the
// flattener — not the transport client — owns content interpretation.
type LarkMessage struct {
MessageID string
MessageType string // Lark `msg_type`: text / post / image / merge_forward / …
Content string // raw body.content (a JSON-encoded string)
SenderID string // sender.id (open_id for users, app_id for apps)
SenderType string // sender.sender_type: user / app / anonymous / …
CreateTime string // epoch milliseconds, as Lark returns it (a string)
ParentID string
RootID string
UpperMessageID string // the merge_forward parent a child hangs under
Deleted bool
Mentions []LarkMessageMention
}
// LarkMessageMention mirrors a mentions[] entry on the IM REST item
// shape. Note this differs from the WS receive event's mention shape:
// here `id` is a bare open_id string, not a nested {open_id, union_id,
// user_id} object.
type LarkMessageMention struct {
Key string // e.g. "@_user_1"
ID string // open_id
Name string // display name (may be empty)
}
// BotInfo is the slice of /open-apis/bot/v3/info (+ a follow-up
@@ -227,3 +267,8 @@ func (s *stubAPIClient) GetBotInfo(ctx context.Context, creds InstallationCreden
s.log.Warn("lark stub client: GetBotInfo called", "app_id", creds.AppID)
return BotInfo{}, ErrAPIClientNotConfigured
}
func (s *stubAPIClient) GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error) {
s.log.Warn("lark stub client: GetMessage called", "message_id", messageID)
return nil, ErrAPIClientNotConfigured
}

View File

@@ -0,0 +1,164 @@
package lark
import (
"encoding/json"
"strings"
)
// flattenContent renders a Lark message's body.content — the raw,
// JSON-encoded string Lark double-encodes — into plain text, dispatching
// on msg_type. It is the shared structural step used by BOTH ingress
// paths:
//
// - the inbound decoder, for the user's own text / post message, and
// - the enricher, for the quoted-reply parent and merge_forward child
// messages it pulls back over the IM REST API.
//
// Mention placeholders (@_user_N) are preserved verbatim; the caller is
// responsible for resolving them against the message's mentions[] array
// via resolveMentions. The two ingress shapes (WS receive event vs IM
// REST item) carry the mentions array differently — only the caller
// knows which one applies — so flattening stays mention-agnostic.
//
// Non-text media types render as a stable bracketed placeholder so the
// agent sees that *something* was attached without us downloading the
// binary. Attachment ingestion is explicitly out of scope (tracked as a
// separate attachment-pipeline issue), and merge_forward is intercepted
// by the enricher before it reaches here (expanding it needs an HTTP
// round-trip); the inline placeholder is only a fallback for a forward
// nested inside another forward.
func flattenContent(msgType, rawContent string) string {
switch msgType {
case "text":
return extractTextBody(rawContent)
case "post":
return flattenPostContent(rawContent)
case "image":
return "[Image]"
case "file":
return "[File]"
case "audio":
return "[Audio]"
case "media":
return "[Video]"
case "sticker":
return "[Sticker]"
case "interactive":
return "[interactive card]"
case "share_chat":
return "[Shared Chat]"
case "share_user":
return "[Shared User Card]"
case "system":
return "[System Message]"
case "merge_forward":
return "[forwarded messages]"
default:
return ""
}
}
// larkPostContent mirrors the RECEIVE-side shape of a `post` rich-text
// body.content. Crucially this is NOT the locale-wrapped form the SEND
// API takes ({"zh_cn": {...}}): an inbound post body.content unmarshals
// directly into {title, content}. content is a 2-D array — the outer
// array is the ordered list of paragraphs, each inner array the ordered
// spans of that paragraph; the newline between paragraphs is implicit in
// the array boundary, not a span.
type larkPostContent struct {
Title string `json:"title"`
Content [][]larkPostSpan `json:"content"`
}
// larkPostSpan is one node inside a post paragraph. Only the fields that
// carry renderable text are modelled; the tag set is extensible, so the
// flattener emits `text` for any unrecognized tag and skips it otherwise
// rather than failing.
type larkPostSpan struct {
Tag string `json:"tag"`
Text string `json:"text"`
Href string `json:"href"`
UserID string `json:"user_id"`
UserName string `json:"user_name"`
}
// flattenPostContent flattens a received `post` body.content into plain
// text: the title (when present) on its own first line, then one line
// per paragraph. Within a paragraph spans are joined with a single space
// — this matches Lark's own rendering, where logically separate chunks
// ("Lark 集成", then a link "PR #3277") read as space-separated words.
//
// A link span renders as "text (href)" so the URL survives into the
// agent's context; an `at` span renders as its @_user_N placeholder (or
// the inline user_name when Lark already resolved it) so a downstream
// resolveMentions pass can substitute the display name. Media spans
// degrade to the same bracketed placeholders flattenContent uses.
func flattenPostContent(raw string) string {
if raw == "" {
return ""
}
var doc larkPostContent
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
return ""
}
var b strings.Builder
write := func(line string) {
if b.Len() > 0 {
b.WriteByte('\n')
}
b.WriteString(line)
}
if doc.Title != "" {
write(doc.Title)
}
for _, para := range doc.Content {
write(flattenPostParagraph(para))
}
return strings.TrimRight(b.String(), "\n")
}
func flattenPostParagraph(spans []larkPostSpan) string {
parts := make([]string, 0, len(spans))
for _, s := range spans {
switch s.Tag {
case "text", "code_block":
if s.Text != "" {
parts = append(parts, s.Text)
}
case "a":
switch {
case s.Text != "" && s.Href != "":
parts = append(parts, s.Text+" ("+s.Href+")")
case s.Text != "":
parts = append(parts, s.Text)
case s.Href != "":
parts = append(parts, s.Href)
}
case "at":
// Prefer an already-resolved display name; otherwise emit
// the user_id, which on the receive side is the @_user_N
// placeholder a later resolveMentions pass maps to a name.
switch {
case s.UserName != "":
parts = append(parts, "@"+s.UserName)
case s.UserID != "":
parts = append(parts, s.UserID)
}
case "img":
parts = append(parts, "[Image]")
case "media":
parts = append(parts, "[Video]")
case "emotion":
// emoji_type is an enum key (e.g. "SMILE"), not display
// text — skip it rather than leak the key.
case "hr":
parts = append(parts, "---")
default:
if s.Text != "" {
parts = append(parts, s.Text)
}
}
}
return strings.Join(parts, " ")
}

View File

@@ -0,0 +1,101 @@
package lark
import "testing"
// TestFlattenPostContent_IssueExample pins the exact rich-text `post`
// example from MUL-2951: a title line, a prose paragraph, and a
// paragraph mixing a text span with a hyperlink span. The link must
// render as "text (href)" so the URL survives into the agent's context.
func TestFlattenPostContent_IssueExample(t *testing.T) {
t.Parallel()
// Received-side post body.content (NOT locale-wrapped).
raw := `{
"title": "周报",
"content": [
[{ "tag": "text", "text": "本周完成:" }],
[
{ "tag": "text", "text": "Lark 集成" },
{ "tag": "a", "href": "https://github.com/multica-ai/multica/pull/3277", "text": "PR #3277" }
]
]
}`
want := "周报\n本周完成\nLark 集成 PR #3277 (https://github.com/multica-ai/multica/pull/3277)"
if got := flattenPostContent(raw); got != want {
t.Errorf("flattenPostContent()\n got = %q\nwant = %q", got, want)
}
}
func TestFlattenPostContent_NoTitle(t *testing.T) {
t.Parallel()
raw := `{"content":[[{"tag":"text","text":"line one"}],[{"tag":"text","text":"line two"}]]}`
want := "line one\nline two"
if got := flattenPostContent(raw); got != want {
t.Errorf("got %q want %q", got, want)
}
}
func TestFlattenPostContent_MediaAndMentionSpans(t *testing.T) {
t.Parallel()
// at span carries the @_user_N placeholder (resolved later by
// resolveMentions); media tags degrade to bracket placeholders;
// emotion is skipped entirely.
raw := `{"content":[[
{"tag":"at","user_id":"@_user_1","user_name":""},
{"tag":"text","text":"look"},
{"tag":"img","image_key":"img_x"},
{"tag":"emotion","emoji_type":"SMILE"}
]]}`
want := "@_user_1 look [Image]"
if got := flattenPostContent(raw); got != want {
t.Errorf("got %q want %q", got, want)
}
}
func TestFlattenPostContent_AtPrefersResolvedName(t *testing.T) {
t.Parallel()
raw := `{"content":[[{"tag":"at","user_id":"@_user_1","user_name":"Tom"},{"tag":"text","text":"hi"}]]}`
want := "@Tom hi"
if got := flattenPostContent(raw); got != want {
t.Errorf("got %q want %q", got, want)
}
}
func TestFlattenPostContent_Malformed(t *testing.T) {
t.Parallel()
if got := flattenPostContent("not json"); got != "" {
t.Errorf("malformed content should flatten to empty, got %q", got)
}
if got := flattenPostContent(""); got != "" {
t.Errorf("empty content should flatten to empty, got %q", got)
}
}
func TestFlattenContent_DispatchByType(t *testing.T) {
t.Parallel()
cases := []struct {
name string
msgType string
content string
want string
}{
{"text", "text", `{"text":"hello"}`, "hello"},
{"image", "image", `{"image_key":"img_x"}`, "[Image]"},
{"file", "file", `{"file_key":"f"}`, "[File]"},
{"audio", "audio", `{"file_key":"f"}`, "[Audio]"},
{"media", "media", `{"file_key":"f"}`, "[Video]"},
{"sticker", "sticker", `{"file_key":"f"}`, "[Sticker]"},
{"interactive", "interactive", `{"title":"t"}`, "[interactive card]"},
{"share_chat", "share_chat", `{"chat_id":"oc"}`, "[Shared Chat]"},
{"merge_forward", "merge_forward", `{"content":"Merged and Forwarded Message"}`, "[forwarded messages]"},
{"unknown", "totally_new_type", `{}`, ""},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if got := flattenContent(tc.msgType, tc.content); got != tc.want {
t.Errorf("flattenContent(%q) = %q want %q", tc.msgType, got, tc.want)
}
})
}
}

View File

@@ -30,6 +30,32 @@ type InboundMessage struct {
SenderOpenID OpenID
Body string
AddressedToBot bool
// MessageType is the raw Lark msg_type ("text", "post",
// "merge_forward", "image", "interactive", …). The decoder
// populates it so the inbound enricher can decide whether a
// message needs an HTTP round-trip to expand (merge_forward) while
// the dispatcher itself stays msg_type-agnostic and only reads Body.
MessageType string
// ParentID is the message_id of the message this one quote-replies
// to, taken verbatim from the receive event's `parent_id`. Empty
// when the message is not a reply. The enricher fetches it and
// prepends a <quoted_message> block. RootID is the thread/root
// anchor Lark also reports; we keep it for completeness but the
// quoted-reply expansion keys off ParentID (the immediate parent),
// not the root.
ParentID string
RootID string
// CommandBody is the user's OWN typed text (the decoded Body before
// the enricher prepends any <quoted_message> / <forwarded_messages>
// context). The `/issue` command is parsed from THIS, not from the
// enriched Body: enrichment prepends context blocks, which would
// otherwise push the user's `/issue …` off the first line and
// silently stop creating the issue. The enricher leaves CommandBody
// untouched while it rewrites Body.
CommandBody string
}
// Outcome categorizes what the Dispatcher decided to do with an
@@ -383,6 +409,7 @@ func (d *Dispatcher) processClaimed(ctx context.Context, msg InboundMessage, ins
ChatSessionID: sessionID,
Sender: binding.MulticaUserID,
Body: msg.Body,
CommandBody: msg.CommandBody,
InstallationID: inst.ID,
LarkMessageID: msg.MessageID,
ClaimToken: claimToken,

View File

@@ -505,6 +505,102 @@ func (c *httpAPIClient) GetBotInfo(ctx context.Context, creds InstallationCreden
return BotInfo{OpenID: OpenID(botResp.Bot.OpenID), UnionID: unionID}, nil
}
// GetMessage retrieves a message by id via
// GET /open-apis/im/v1/messages/{message_id}. The endpoint always wraps
// the result in data.items[] — one element for a normal message, and a
// forward sentinel followed by the bundled child messages for a
// `merge_forward`. We pass user_id_type=open_id so sender.id and
// mentions[].id come back as open_ids, matching the identifiers the
// rest of the package keys on.
//
// body.content is forwarded verbatim (the raw, JSON-encoded, msg_type-
// specific string Lark double-encodes); the enricher's flattener owns
// interpreting it. A deleted / out-of-scope message surfaces as a Lark
// error code, which we turn into a normal Go error so the enricher can
// degrade to its "[unable to fetch]" placeholder without aborting the
// inbound pipeline.
func (c *httpAPIClient) GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error) {
if messageID == "" {
return nil, errors.New("lark http client: missing message_id")
}
token, err := c.tenantAccessToken(ctx, creds)
if err != nil {
return nil, err
}
q := url.Values{}
q.Set("user_id_type", "open_id")
path := "/open-apis/im/v1/messages/" + url.PathEscape(messageID) + "?" + q.Encode()
var resp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data struct {
Items []larkRESTMessageItem `json:"items"`
} `json:"data"`
}
if err := c.doJSON(ctx, http.MethodGet, path, token, nil, &resp); err != nil {
return nil, fmt.Errorf("lark http client: get message: %w", err)
}
if resp.Code != 0 {
if isTokenError(resp.Code) {
c.invalidateToken(creds.AppID)
}
return nil, fmt.Errorf("lark http client: get message: code=%d msg=%q", resp.Code, resp.Msg)
}
out := make([]LarkMessage, 0, len(resp.Data.Items))
for _, it := range resp.Data.Items {
out = append(out, it.normalize())
}
return out, nil
}
// larkRESTMessageItem is the IM v1 message item shape returned by the
// get / list endpoints. It differs from the WS receive event in two
// ways the enricher cares about: msg_type (not message_type), and a
// flat `sender.id` / `mentions[].id` string (not a nested id object).
type larkRESTMessageItem struct {
MessageID string `json:"message_id"`
RootID string `json:"root_id"`
ParentID string `json:"parent_id"`
UpperMessageID string `json:"upper_message_id"`
MsgType string `json:"msg_type"`
CreateTime string `json:"create_time"`
Deleted bool `json:"deleted"`
Sender struct {
ID string `json:"id"`
IDType string `json:"id_type"`
SenderType string `json:"sender_type"`
} `json:"sender"`
Body struct {
Content string `json:"content"`
} `json:"body"`
Mentions []struct {
Key string `json:"key"`
ID string `json:"id"`
Name string `json:"name"`
} `json:"mentions"`
}
func (it larkRESTMessageItem) normalize() LarkMessage {
m := LarkMessage{
MessageID: it.MessageID,
MessageType: it.MsgType,
Content: it.Body.Content,
SenderID: it.Sender.ID,
SenderType: it.Sender.SenderType,
CreateTime: it.CreateTime,
ParentID: it.ParentID,
RootID: it.RootID,
UpperMessageID: it.UpperMessageID,
Deleted: it.Deleted,
}
for _, mn := range it.Mentions {
m.Mentions = append(m.Mentions, LarkMessageMention{Key: mn.Key, ID: mn.ID, Name: mn.Name})
}
return m
}
// fetchBotUnionID resolves a Bot's `union_id` from its `open_id` via
// /open-apis/contact/v3/users/{open_id}?user_id_type=open_id. Split
// out from GetBotInfo so the failure mode is explicit and the call

View File

@@ -0,0 +1,124 @@
package lark
import (
"context"
"net/http"
"strings"
"testing"
"time"
)
const messageGetPrefix = "/open-apis/im/v1/messages/"
// TestHTTPClient_GetMessageSingle exercises the happy path: a normal
// message comes back as a one-element items[] and is normalized with
// raw body.content, sender, and REST-shaped mentions intact.
func TestHTTPClient_GetMessageSingle(t *testing.T) {
fake := newLarkFake(t)
fake.stubToken("tok", 7200)
fake.mux.HandleFunc(messageGetPrefix, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Errorf("want GET, got %s", r.Method)
}
if id := strings.TrimPrefix(r.URL.Path, messageGetPrefix); id != "om_parent" {
t.Errorf("path id = %q", id)
}
if r.URL.Query().Get("user_id_type") != "open_id" {
t.Errorf("missing user_id_type=open_id: %q", r.URL.RawQuery)
}
writeJSON(w, map[string]any{
"code": 0, "msg": "ok",
"data": map[string]any{
"items": []any{
map[string]any{
"message_id": "om_parent",
"msg_type": "text",
"create_time": "1000",
"sender": map[string]any{"id": "ou_a", "id_type": "open_id", "sender_type": "user"},
"body": map[string]any{"content": `{"text":"hi"}`},
"mentions": []any{map[string]any{"key": "@_user_1", "id": "ou_b", "name": "Bob"}},
},
},
},
})
})
c := newTestClient(fake, time.Now)
items, err := c.GetMessage(context.Background(), testCreds(), "om_parent")
if err != nil {
t.Fatalf("GetMessage: %v", err)
}
if len(items) != 1 {
t.Fatalf("items = %d, want 1", len(items))
}
m := items[0]
if m.MessageID != "om_parent" || m.MessageType != "text" || m.Content != `{"text":"hi"}` {
t.Errorf("normalized = %+v", m)
}
if m.SenderID != "ou_a" || m.SenderType != "user" {
t.Errorf("sender = id:%q type:%q", m.SenderID, m.SenderType)
}
if len(m.Mentions) != 1 || m.Mentions[0].Key != "@_user_1" || m.Mentions[0].ID != "ou_b" || m.Mentions[0].Name != "Bob" {
t.Errorf("mentions = %+v", m.Mentions)
}
if a := fake.lastAuth(); a != "Bearer tok" {
t.Errorf("auth header = %q", a)
}
}
// TestHTTPClient_GetMessageMergeForward pins the merge_forward contract:
// GetMessage(forward_id) returns the sentinel parent followed by the
// bundled child messages, all in one items[] array.
func TestHTTPClient_GetMessageMergeForward(t *testing.T) {
fake := newLarkFake(t)
fake.stubToken("tok", 7200)
fake.mux.HandleFunc(messageGetPrefix, func(w http.ResponseWriter, r *http.Request) {
writeJSON(w, map[string]any{
"code": 0, "msg": "ok",
"data": map[string]any{
"items": []any{
map[string]any{"message_id": "om_fwd", "msg_type": "merge_forward", "body": map[string]any{"content": `{"content":"Merged and Forwarded Message"}`}},
map[string]any{"message_id": "c1", "msg_type": "text", "upper_message_id": "om_fwd", "create_time": "1000", "sender": map[string]any{"id": "ou_a", "sender_type": "user"}, "body": map[string]any{"content": `{"text":"one"}`}},
map[string]any{"message_id": "c2", "msg_type": "text", "upper_message_id": "om_fwd", "create_time": "2000", "sender": map[string]any{"id": "ou_b", "sender_type": "user"}, "body": map[string]any{"content": `{"text":"two"}`}},
},
},
})
})
c := newTestClient(fake, time.Now)
items, err := c.GetMessage(context.Background(), testCreds(), "om_fwd")
if err != nil {
t.Fatalf("GetMessage: %v", err)
}
if len(items) != 3 {
t.Fatalf("items = %d, want 3 (sentinel + 2 children)", len(items))
}
if items[0].MessageType != "merge_forward" {
t.Errorf("items[0] should be the forward sentinel, got %q", items[0].MessageType)
}
if items[1].UpperMessageID != "om_fwd" || items[2].UpperMessageID != "om_fwd" {
t.Errorf("children should link to the forward via upper_message_id")
}
}
// TestHTTPClient_GetMessageErrorCode maps a Lark business error (e.g.
// deleted / not visible) to a Go error so the enricher can degrade.
func TestHTTPClient_GetMessageErrorCode(t *testing.T) {
fake := newLarkFake(t)
fake.stubToken("tok", 7200)
fake.mux.HandleFunc(messageGetPrefix, func(w http.ResponseWriter, r *http.Request) {
writeJSON(w, map[string]any{"code": 230110, "msg": "message has been deleted"})
})
c := newTestClient(fake, time.Now)
if _, err := c.GetMessage(context.Background(), testCreds(), "om_gone"); err == nil {
t.Fatal("expected error for non-zero Lark code")
}
}
func TestHTTPClient_GetMessageEmptyID(t *testing.T) {
fake := newLarkFake(t)
c := newTestClient(fake, time.Now)
if _, err := c.GetMessage(context.Background(), testCreds(), ""); err == nil {
t.Fatal("expected error for empty message id")
}
}

View File

@@ -0,0 +1,300 @@
package lark
import (
"context"
"fmt"
"log/slog"
"sort"
"strconv"
"strings"
)
// larkMsgTypeMergeForward is the msg_type of a "merged & forwarded"
// message — a bundle of other messages a user forwarded as one unit.
// Its own body.content is a fixed sentinel string; the actual forwarded
// messages come back as the extra items[] of a GetMessage call.
const larkMsgTypeMergeForward = "merge_forward"
// defaultMaxForwardChildren caps how many child messages we inline from
// a single forward. Lark itself bounds a merge_forward at 100 messages;
// we mirror that as a safety valve so a pathological bundle can't blow
// up the agent's context. Anything beyond the cap is dropped with a
// visible "... (N more truncated)" marker.
const defaultMaxForwardChildren = 100
// Enricher expands an inbound message's body with context the user
// EXPLICITLY attached — a quoted reply or a merged-and-forwarded bundle
// — by calling back into Lark's IM API. It runs after the (fast,
// HTTP-free) decoder and before the dispatcher, turning a bare
// "@bot 总结一下" into a body that already carries the referenced
// conversation inline.
//
// It is best-effort by contract: every fetch failure degrades to a
// visible placeholder block and Enrich NEVER returns an error or blocks
// ingestion. A message with nothing to expand (no parent_id, not a
// merge_forward) is returned untouched without any network call.
type Enricher interface {
Enrich(ctx context.Context, msg InboundMessage, creds InstallationCredentials) InboundMessage
}
// InboundEnricherConfig tunes the enricher. Both fields default.
type InboundEnricherConfig struct {
// MaxForwardChildren caps inlined forward children. <=0 uses
// defaultMaxForwardChildren.
MaxForwardChildren int
// Logger receives best-effort warnings about fetch failures. Nil
// uses slog.Default().
Logger *slog.Logger
}
type inboundEnricher struct {
client APIClient
maxForwardChildren int
logger *slog.Logger
}
// NewInboundEnricher builds an Enricher backed by the given Lark API
// client. The client supplies GetMessage; everything else (flattening,
// block assembly, speaker labelling) is local.
func NewInboundEnricher(client APIClient, cfg InboundEnricherConfig) Enricher {
if cfg.MaxForwardChildren <= 0 {
cfg.MaxForwardChildren = defaultMaxForwardChildren
}
if cfg.Logger == nil {
cfg.Logger = slog.Default()
}
return &inboundEnricher{
client: client,
maxForwardChildren: cfg.MaxForwardChildren,
logger: cfg.Logger,
}
}
// Enrich rewrites msg.Body to inline any quoted-reply parent and/or
// forwarded bundle. Composition order matches the product spec: the
// quoted block is prepended, then the message's own content (or, for a
// forward, the rendered transcript) follows.
//
// <quoted_message …>…</quoted_message>
//
// <the user's own message, or the forwarded transcript>
func (e *inboundEnricher) Enrich(ctx context.Context, msg InboundMessage, creds InstallationCredentials) InboundMessage {
isForward := msg.MessageType == larkMsgTypeMergeForward
if msg.ParentID == "" && !isForward {
// Nothing the user explicitly attached — no network call.
return msg
}
// If the transport isn't wired (stub client on a deployment without
// a Lark app), skip rather than stamp every reply with a fetch
// error. Body stays whatever the decoder produced.
if e.client == nil || !e.client.IsConfigured() {
return msg
}
var b strings.Builder
if msg.ParentID != "" {
b.WriteString(e.renderQuoted(ctx, creds, msg.ParentID))
}
var core string
if isForward {
core = e.renderForwarded(ctx, creds, msg.MessageID)
} else {
core = msg.Body
}
if b.Len() > 0 && core != "" {
b.WriteString("\n\n")
}
b.WriteString(core)
msg.Body = b.String()
return msg
}
// renderQuoted fetches the directly-quoted parent and renders a
// <quoted_message> block. A parent that is itself a merge_forward nests
// a <forwarded_messages> transcript inside the quoted block (the
// GetMessage response already carries both the forward sentinel and its
// children, so no extra round-trip is needed). Any failure degrades to
// the documented error block.
func (e *inboundEnricher) renderQuoted(ctx context.Context, creds InstallationCredentials, parentID string) string {
items, err := e.client.GetMessage(ctx, creds, parentID)
if err != nil || len(items) == 0 {
e.logger.Warn("lark enricher: quoted parent fetch failed",
"parent_id", parentID, "items", len(items), "err", err)
return quotedErrorBlock(parentID)
}
parent := items[0]
if parent.Deleted {
return quotedErrorBlock(parentID)
}
labeler := newSpeakerLabeler()
sender := labeler.label(parent)
if parent.MessageType == larkMsgTypeMergeForward {
inner := e.renderForwardedItems(items, parentID)
return wrapQuoted(parentID, sender, larkMsgTypeMergeForward, inner)
}
text := e.flattenMessage(parent)
if text == "" {
text = "[empty message]"
}
return wrapQuoted(parentID, sender, parent.MessageType, text)
}
// renderForwarded fetches a merge_forward by id and renders its bundled
// children. The GetMessage response is [sentinel, child…]; we filter the
// forward's own record out by id (robust to whether Lark returns the
// sentinel first or not) and render the rest.
func (e *inboundEnricher) renderForwarded(ctx context.Context, creds InstallationCredentials, forwardID string) string {
items, err := e.client.GetMessage(ctx, creds, forwardID)
if err != nil {
e.logger.Warn("lark enricher: forward fetch failed", "message_id", forwardID, "err", err)
return forwardedErrorBlock()
}
return e.renderForwardedItems(items, forwardID)
}
// renderForwardedItems renders the children of a forward whose own
// record id is forwardID. Children are time-ordered, capped, and each
// rendered as "[<speaker>]: <text>"; a child that is itself a forward is
// not recursed into (it gets a manual-expand placeholder) so the HTTP
// fan-out on the ACK-latency-sensitive inbound path stays bounded.
func (e *inboundEnricher) renderForwardedItems(items []LarkMessage, forwardID string) string {
// The verified contract is that GetMessage(forward_id) returns one
// level of bundling: [sentinel, direct-children…]. We therefore
// treat every non-sentinel item as a direct child. We filter by id
// (not by upper_message_id == forwardID) on purpose: a strict
// upper_message_id match would silently DROP a real child if Lark
// ever returned one with that field unpopulated. A child that is
// itself a forward is rendered as a manual-expand placeholder below
// rather than recursed into, so grandchildren are never inlined.
children := make([]LarkMessage, 0, len(items))
for _, it := range items {
if it.MessageID == forwardID {
continue // the forward sentinel itself
}
children = append(children, it)
}
total := len(children)
if total == 0 {
return "<forwarded_messages count=\"0\">\n[no forwarded content available]\n</forwarded_messages>"
}
sort.SliceStable(children, func(i, j int) bool {
return parseLarkMillis(children[i].CreateTime) < parseLarkMillis(children[j].CreateTime)
})
truncated := 0
if total > e.maxForwardChildren {
truncated = total - e.maxForwardChildren
children = children[:e.maxForwardChildren]
}
labeler := newSpeakerLabeler()
lines := make([]string, 0, len(children))
for _, c := range children {
label := labeler.label(c)
var text string
switch {
case c.MessageType == larkMsgTypeMergeForward:
text = "[nested merge_forward, expand manually]"
default:
text = e.flattenMessage(c)
if text == "" {
text = "[empty message]"
}
}
lines = append(lines, fmt.Sprintf("[%s]: %s", label, text))
}
body := strings.Join(lines, "\n")
if truncated > 0 {
body += fmt.Sprintf("\n... (%d more truncated)", truncated)
}
return fmt.Sprintf("<forwarded_messages count=\"%d\">\n%s\n</forwarded_messages>", total, body)
}
// flattenMessage turns one fetched message into plain text: structural
// flatten by msg_type, then @_user_N placeholder resolution against the
// message's own mentions. The bot mention is NOT stripped here (unlike
// the inbound decoder) — a quoted / forwarded message is historical
// context, not a fresh trigger, so passing empty bot identifiers leaves
// every @-mention rendered as a readable @name.
func (e *inboundEnricher) flattenMessage(m LarkMessage) string {
if m.Deleted {
return "[deleted message]"
}
raw := flattenContent(m.MessageType, m.Content)
if raw == "" {
return ""
}
return resolveMentions(raw, restMentionsToEvent(m.Mentions), "", "")
}
// restMentionsToEvent adapts the IM REST mention shape (flat string id)
// to the WS-event larkMention shape resolveMentions consumes, so a
// single mention-resolution implementation serves both ingress paths.
func restMentionsToEvent(ms []LarkMessageMention) []larkMention {
if len(ms) == 0 {
return nil
}
out := make([]larkMention, 0, len(ms))
for _, m := range ms {
lm := larkMention{Key: m.Key, Name: m.Name}
lm.ID.OpenID = m.ID
out = append(out, lm)
}
return out
}
func wrapQuoted(messageID, sender, msgType, inner string) string {
return fmt.Sprintf("<quoted_message message_id=%q sender=%q type=%q>\n%s\n</quoted_message>",
messageID, sender, msgType, inner)
}
func quotedErrorBlock(messageID string) string {
return fmt.Sprintf("<quoted_message message_id=%q type=\"error\">[unable to fetch]</quoted_message>", messageID)
}
func forwardedErrorBlock() string {
return "<forwarded_messages type=\"error\">[unable to fetch]</forwarded_messages>"
}
func parseLarkMillis(s string) int64 {
n, _ := strconv.ParseInt(s, 10, 64)
return n
}
// speakerLabeler assigns stable, human-readable labels to the senders
// within one rendered block. Lark message items carry only a sender id
// (no display name — resolving real names needs a separate Contact API
// lookup, tracked as a follow-up), so we map each distinct user id to
// "User 1", "User 2", … in first-appearance order, and app senders to
// "Bot". This keeps the conversation's turn-taking structure legible
// without a per-sender network round-trip.
type speakerLabeler struct {
seen map[string]string
n int
}
func newSpeakerLabeler() *speakerLabeler {
return &speakerLabeler{seen: make(map[string]string)}
}
func (l *speakerLabeler) label(m LarkMessage) string {
if m.SenderType == "app" {
return "Bot"
}
key := m.SenderID
if key == "" {
key = "unknown"
}
if lbl, ok := l.seen[key]; ok {
return lbl
}
l.n++
lbl := fmt.Sprintf("User %d", l.n)
l.seen[key] = lbl
return lbl
}

View File

@@ -0,0 +1,343 @@
package lark
import (
"context"
"errors"
"strings"
"testing"
)
// enricherFakeClient is a programmable APIClient for enricher tests. It
// returns canned GetMessage results keyed by message_id and records the
// ids it was asked for, so tests can assert both the rendered body and
// the network fan-out (e.g. "no call when nothing to enrich").
type enricherFakeClient struct {
configured bool
byID map[string][]LarkMessage
errByID map[string]error
calls []string
}
func newEnricherFake() *enricherFakeClient {
return &enricherFakeClient{
configured: true,
byID: map[string][]LarkMessage{},
errByID: map[string]error{},
}
}
func (f *enricherFakeClient) IsConfigured() bool { return f.configured }
func (f *enricherFakeClient) GetMessage(ctx context.Context, creds InstallationCredentials, id string) ([]LarkMessage, error) {
f.calls = append(f.calls, id)
if e, ok := f.errByID[id]; ok {
return nil, e
}
return f.byID[id], nil
}
// Unused-by-enricher methods — present only to satisfy APIClient.
func (f *enricherFakeClient) SendInteractiveCard(context.Context, SendCardParams) (string, error) {
return "", nil
}
func (f *enricherFakeClient) PatchInteractiveCard(context.Context, PatchCardParams) error { return nil }
func (f *enricherFakeClient) SendTextMessage(context.Context, SendTextParams) (string, error) {
return "", nil
}
func (f *enricherFakeClient) SendMarkdownCard(context.Context, SendMarkdownCardParams) (string, error) {
return "", nil
}
func (f *enricherFakeClient) SendBindingPromptCard(context.Context, BindingPromptParams) error {
return nil
}
func (f *enricherFakeClient) GetBotInfo(context.Context, InstallationCredentials) (BotInfo, error) {
return BotInfo{}, nil
}
func textMsg(id, sender, text, createTime string) LarkMessage {
return LarkMessage{
MessageID: id,
MessageType: "text",
Content: `{"text":"` + text + `"}`,
SenderID: sender,
SenderType: "user",
CreateTime: createTime,
}
}
func enrich(t *testing.T, fake *enricherFakeClient, msg InboundMessage, cfg InboundEnricherConfig) InboundMessage {
t.Helper()
e := NewInboundEnricher(fake, cfg)
return e.Enrich(context.Background(), msg, InstallationCredentials{AppID: "a", AppSecret: "s"})
}
// TestEnrichQuotedReply covers the MUL-2951 quoted-reply example: a text
// reply to a prior text message gets the parent inlined as a
// <quoted_message> block ahead of the user's own prose.
func TestEnrichQuotedReply(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_parent"] = []LarkMessage{
textMsg("om_parent", "ou_jiayuan", "做一个删除 issue 的按钮吧", "1000"),
}
in := InboundMessage{MessageType: "text", MessageID: "om_child", Body: "去实现", ParentID: "om_parent"}
out := enrich(t, fake, in, InboundEnricherConfig{})
want := `<quoted_message message_id="om_parent" sender="User 1" type="text">
做一个删除 issue 的按钮吧
</quoted_message>
去实现`
if out.Body != want {
t.Errorf("body\n got = %q\nwant = %q", out.Body, want)
}
if len(fake.calls) != 1 || fake.calls[0] != "om_parent" {
t.Errorf("expected one GetMessage(om_parent), got %v", fake.calls)
}
}
// TestEnrichMergeForward covers the merge_forward example: the forwarded
// transcript is fetched via GetMessage(forward_id) — whose items[] are
// [sentinel, child…] — and inlined as a <forwarded_messages> block with
// per-speaker labels. The four original lines must all be present.
func TestEnrichMergeForward(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_forward"] = []LarkMessage{
{MessageID: "om_forward", MessageType: "merge_forward", SenderID: "ou_bohan", SenderType: "user", Content: `{"content":"Merged and Forwarded Message"}`},
textMsg("c1", "ou_jiayuan", "你们线上的 Multica 能用吗", "1000"),
textMsg("c2", "ou_jiayuan", "我这边无法登录", "2000"),
textMsg("c3", "ou_bohan", "我这边 web 和 desktop 都能登陆", "3000"),
{MessageID: "c4", MessageType: "image", SenderID: "ou_jiayuan", SenderType: "user", Content: `{"image_key":"img_x"}`, CreateTime: "4000"},
}
in := InboundMessage{MessageType: "merge_forward", MessageID: "om_forward"}
out := enrich(t, fake, in, InboundEnricherConfig{})
want := `<forwarded_messages count="4">
[User 1]: 你们线上的 Multica 能用吗
[User 1]: 我这边无法登录
[User 2]: 我这边 web 和 desktop 都能登陆
[User 1]: [Image]
</forwarded_messages>`
if out.Body != want {
t.Errorf("body\n got = %q\nwant = %q", out.Body, want)
}
}
// TestEnrichMergeForwardSortsByCreateTime ensures children are ordered
// chronologically regardless of the order Lark returns them.
func TestEnrichMergeForwardSortsByCreateTime(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_f"] = []LarkMessage{
{MessageID: "om_f", MessageType: "merge_forward", SenderID: "ou_x", SenderType: "user"},
textMsg("c2", "ou_a", "second", "2000"),
textMsg("c1", "ou_a", "first", "1000"),
textMsg("c3", "ou_a", "third", "3000"),
}
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{})
first := strings.Index(out.Body, "first")
second := strings.Index(out.Body, "second")
third := strings.Index(out.Body, "third")
if !(first < second && second < third) {
t.Errorf("children not chronologically ordered: %q", out.Body)
}
}
// TestEnrichMergeForwardCap truncates beyond the configured cap and
// flags how many were dropped.
func TestEnrichMergeForwardCap(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_f"] = []LarkMessage{
{MessageID: "om_f", MessageType: "merge_forward", SenderID: "ou_x", SenderType: "user"},
textMsg("c1", "ou_a", "one", "1000"),
textMsg("c2", "ou_a", "two", "2000"),
textMsg("c3", "ou_a", "three", "3000"),
textMsg("c4", "ou_a", "four", "4000"),
}
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{MaxForwardChildren: 2})
if !strings.Contains(out.Body, "... (2 more truncated)") {
t.Errorf("expected truncation marker, got %q", out.Body)
}
if strings.Contains(out.Body, "three") || strings.Contains(out.Body, "four") {
t.Errorf("over-cap children should be dropped, got %q", out.Body)
}
if !strings.Contains(out.Body, `count="4"`) {
t.Errorf("count should reflect the true total, got %q", out.Body)
}
}
// TestEnrichQuotedMergeForwardNests covers a quote-reply whose parent is
// itself a merge_forward: the forwarded transcript renders INSIDE the
// quoted block, using the same single GetMessage response.
func TestEnrichQuotedMergeForwardNests(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_fwd"] = []LarkMessage{
{MessageID: "om_fwd", MessageType: "merge_forward", SenderID: "ou_bohan", SenderType: "user"},
textMsg("c1", "ou_a", "line A", "1000"),
textMsg("c2", "ou_b", "line B", "2000"),
}
in := InboundMessage{MessageType: "text", MessageID: "om_child", Body: "see above", ParentID: "om_fwd"}
out := enrich(t, fake, in, InboundEnricherConfig{})
if !strings.Contains(out.Body, `<quoted_message message_id="om_fwd" sender="User 1" type="merge_forward">`) {
t.Errorf("missing quoted wrapper for merge_forward parent: %q", out.Body)
}
if !strings.Contains(out.Body, "<forwarded_messages count=\"2\">") {
t.Errorf("forwarded block should nest inside quoted: %q", out.Body)
}
if !strings.Contains(out.Body, "line A") || !strings.Contains(out.Body, "line B") {
t.Errorf("nested children missing: %q", out.Body)
}
if !strings.HasSuffix(out.Body, "see above") {
t.Errorf("user prose should follow the quoted block: %q", out.Body)
}
}
// TestEnrichNestedForwardChildIsPlaceholder bounds HTTP fan-out: a child
// that is itself a forward is NOT recursed into.
func TestEnrichNestedForwardChildIsPlaceholder(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_f"] = []LarkMessage{
{MessageID: "om_f", MessageType: "merge_forward", SenderID: "ou_x", SenderType: "user"},
textMsg("c1", "ou_a", "hello", "1000"),
{MessageID: "c2", MessageType: "merge_forward", SenderID: "ou_a", SenderType: "user", CreateTime: "2000"},
}
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{})
if !strings.Contains(out.Body, "[nested merge_forward, expand manually]") {
t.Errorf("nested forward child should be a placeholder: %q", out.Body)
}
// Only the top forward should have been fetched.
if len(fake.calls) != 1 {
t.Errorf("expected exactly one GetMessage, got %v", fake.calls)
}
}
// TestEnrichQuotedFetchFailureDegrades verifies a parent fetch failure
// degrades to the documented error block and still keeps the user's
// message — it must not block ingestion.
func TestEnrichQuotedFetchFailureDegrades(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.errByID["om_gone"] = errors.New("not found")
in := InboundMessage{MessageType: "text", MessageID: "om_child", Body: "ping", ParentID: "om_gone"}
out := enrich(t, fake, in, InboundEnricherConfig{})
want := `<quoted_message message_id="om_gone" type="error">[unable to fetch]</quoted_message>
ping`
if out.Body != want {
t.Errorf("body\n got = %q\nwant = %q", out.Body, want)
}
}
func TestEnrichQuotedDeletedParentDegrades(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_del"] = []LarkMessage{{MessageID: "om_del", MessageType: "text", Deleted: true, SenderID: "ou_a", SenderType: "user"}}
out := enrich(t, fake, InboundMessage{MessageType: "text", Body: "x", ParentID: "om_del"}, InboundEnricherConfig{})
if !strings.Contains(out.Body, `type="error"`) {
t.Errorf("deleted parent should degrade to error block: %q", out.Body)
}
}
func TestEnrichForwardFetchFailureDegrades(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.errByID["om_f"] = errors.New("boom")
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{})
if out.Body != `<forwarded_messages type="error">[unable to fetch]</forwarded_messages>` {
t.Errorf("forward fetch failure should degrade: %q", out.Body)
}
}
// TestEnrichNoopWhenNothingAttached: a plain message (no parent, not a
// forward) is returned untouched WITHOUT any network call.
func TestEnrichNoopWhenNothingAttached(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
in := InboundMessage{MessageType: "text", MessageID: "om", Body: "hello"}
out := enrich(t, fake, in, InboundEnricherConfig{})
if out.Body != "hello" {
t.Errorf("body should be unchanged, got %q", out.Body)
}
if len(fake.calls) != 0 {
t.Errorf("no GetMessage should be issued, got %v", fake.calls)
}
}
// TestEnrichSkipsWhenClientUnconfigured: with the stub/unconfigured
// client we must not stamp a fetch error on every reply — skip silently.
func TestEnrichSkipsWhenClientUnconfigured(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.configured = false
in := InboundMessage{MessageType: "text", MessageID: "om", Body: "hi", ParentID: "om_parent"}
out := enrich(t, fake, in, InboundEnricherConfig{})
if out.Body != "hi" {
t.Errorf("body should be unchanged when client unconfigured, got %q", out.Body)
}
if len(fake.calls) != 0 {
t.Errorf("no GetMessage when unconfigured, got %v", fake.calls)
}
}
// TestEnrichPreservesCommandBodyForIssueParsing is the regression guard
// for the quote-reply + /issue interaction: enrichment prepends a
// <quoted_message> block (so the enriched Body no longer parses as a
// command), but CommandBody is left untouched and still parses, so
// `/issue` keeps working when typed as a quote-reply.
func TestEnrichPreservesCommandBodyForIssueParsing(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_parent"] = []LarkMessage{textMsg("om_parent", "ou_j", "做个删除按钮", "1000")}
in := InboundMessage{
MessageType: "text",
MessageID: "om_child",
Body: "/issue 删除 issue 按钮",
CommandBody: "/issue 删除 issue 按钮",
ParentID: "om_parent",
}
out := enrich(t, fake, in, InboundEnricherConfig{})
// Enriched Body now starts with the quoted block → no longer a command.
if _, ok := parseIssueCommand(out.Body); ok {
t.Errorf("enriched Body should not parse as /issue (it is prefixed): %q", out.Body)
}
// CommandBody is untouched and still parses with the right title.
cmd, ok := parseIssueCommand(out.CommandBody)
if !ok || cmd.Title != "删除 issue 按钮" {
t.Errorf("CommandBody should still parse /issue: cmd=%+v ok=%v", cmd, ok)
}
// And the quoted context did land in the stored Body.
if !strings.Contains(out.Body, "做个删除按钮") {
t.Errorf("quoted context missing from Body: %q", out.Body)
}
}
// TestEnrichResolvesMentionsInChildren: @_user_N placeholders in a
// forwarded child resolve to @name via that child's own mentions array.
func TestEnrichResolvesMentionsInChildren(t *testing.T) {
t.Parallel()
fake := newEnricherFake()
fake.byID["om_f"] = []LarkMessage{
{MessageID: "om_f", MessageType: "merge_forward", SenderID: "ou_x", SenderType: "user"},
{
MessageID: "c1",
MessageType: "text",
Content: `{"text":"@_user_1 看一下"}`,
SenderID: "ou_a",
SenderType: "user",
CreateTime: "1000",
Mentions: []LarkMessageMention{{Key: "@_user_1", ID: "ou_alice", Name: "Alice"}},
},
}
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{})
if !strings.Contains(out.Body, "@Alice 看一下") {
t.Errorf("child mention not resolved: %q", out.Body)
}
}

View File

@@ -119,6 +119,9 @@ func (f *fakeAPIClient) SendBindingPromptCard(ctx context.Context, p BindingProm
func (f *fakeAPIClient) GetBotInfo(ctx context.Context, creds InstallationCredentials) (BotInfo, error) {
return BotInfo{}, nil
}
func (f *fakeAPIClient) GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error) {
return nil, nil
}
func newTestPatcher(t *testing.T) (*Patcher, *fakePatcherQueries, *fakeAPIClient) {
t.Helper()

View File

@@ -72,6 +72,10 @@ func (s *stubAPIClientWithRecorder) GetBotInfo(ctx context.Context, creds Instal
return BotInfo{}, nil
}
func (s *stubAPIClientWithRecorder) GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error) {
return nil, nil
}
// stubCredentialsResolver returns a fixed plaintext secret.
type stubCredentialsResolver struct{ secret string }

View File

@@ -88,6 +88,21 @@ type WSConnectorConfig struct {
// should not tear down the entire connection.
FrameDecoder FrameDecoder
// Enricher optionally expands a decoded message's body with the
// context the user explicitly attached (quoted reply / forwarded
// bundle) before it is emitted to the dispatcher. It runs on the
// inbound read loop, so it is bounded by EnrichTimeout to protect
// the Lark long-conn ACK budget; on timeout / fetch failure the
// enricher degrades to a placeholder rather than blocking. Nil
// disables enrichment (the decoded body is emitted as-is).
Enricher Enricher
// EnrichTimeout caps a single message's enrichment (at most two
// GetMessage calls). It MUST stay well under Lark's ~3s long-conn
// ACK window, since enrichment runs before the frame is ACKed.
// Zero defaults to 2 seconds.
EnrichTimeout time.Duration
// CredentialsProvider returns the InstallationCredentials the
// EndpointFetcher needs. Typically wraps
// InstallationService.DecryptAppSecret so the plaintext secret
@@ -138,6 +153,9 @@ func (c WSConnectorConfig) withDefaults() WSConnectorConfig {
if c.ChunkTTL == 0 {
c.ChunkTTL = 5 * time.Second
}
if c.EnrichTimeout == 0 {
c.EnrichTimeout = 2 * time.Second
}
if c.Now == nil {
c.Now = time.Now
}
@@ -363,6 +381,18 @@ func (c *WSLongConnConnector) Run(ctx context.Context, inst db.LarkInstallation,
continue
}
// Enrich the decoded body with explicitly-attached context
// (quoted reply / forwarded bundle) before emitting. This runs
// before the frame ACK, so it is bounded by EnrichTimeout and
// degrades to a placeholder on failure rather than blocking the
// pipeline. Most messages need no enrichment and return
// immediately without any network call.
if c.cfg.Enricher != nil {
enrichCtx, cancelEnrich := context.WithTimeout(ctx, c.cfg.EnrichTimeout)
msg = c.cfg.Enricher.Enrich(enrichCtx, msg, creds)
cancelEnrich()
}
_, emitErr := emit(ctx, msg)
if emitErr != nil {
// Infra failure from Dispatcher (DB down, etc.). NACK so

View File

@@ -0,0 +1,118 @@
package lark
import (
"context"
"io"
"log/slog"
"sync"
"testing"
"time"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// recordingEnricher captures what the connector hands it and rewrites
// the body so the test can prove enrichment ran between decode and emit.
type recordingEnricher struct {
mu sync.Mutex
msgs []InboundMessage
creds []InstallationCredentials
}
func (e *recordingEnricher) Enrich(ctx context.Context, msg InboundMessage, creds InstallationCredentials) InboundMessage {
e.mu.Lock()
defer e.mu.Unlock()
e.msgs = append(e.msgs, msg)
e.creds = append(e.creds, creds)
msg.Body = "ENRICHED:" + msg.Body
return msg
}
// TestWSConnectorEnrichesBeforeEmit verifies the connector runs the
// Enricher on a decoded message — with the connection's resolved
// credentials — before emitting it to the dispatcher.
func TestWSConnectorEnrichesBeforeEmit(t *testing.T) {
t.Parallel()
conn := newFakeWSConn()
decoder := FrameDecoderFunc(func(payload []byte, _ db.LarkInstallation) (InboundMessage, bool, error) {
return InboundMessage{
EventID: string(payload),
AppID: "test_app",
MessageID: "msg-" + string(payload),
Body: "raw-" + string(payload),
}, true, nil
})
enr := &recordingEnricher{}
c, err := NewWSLongConnConnector(WSConnectorConfig{
Dialer: &fakeWSDialer{conn: conn},
EndpointFetcher: EndpointFetcherFunc(func(context.Context, InstallationCredentials) (WSEndpoint, error) {
return WSEndpoint{URL: "wss://test/ignored", ServiceID: 7, PingInterval: time.Hour}, nil
}),
FrameDecoder: decoder,
Enricher: enr,
CredentialsProvider: CredentialsProviderFunc(func(context.Context, db.LarkInstallation) (InstallationCredentials, error) {
return InstallationCredentials{AppID: "test_app", AppSecret: "secret"}, nil
}),
PingInterval: time.Hour,
ReadDeadline: time.Second,
WriteTimeout: time.Second,
EnrichTimeout: time.Second,
Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
})
if err != nil {
t.Fatalf("NewWSLongConnConnector: %v", err)
}
var emitted []InboundMessage
var emitMu sync.Mutex
emit := func(_ context.Context, msg InboundMessage) (DispatchResult, error) {
emitMu.Lock()
emitted = append(emitted, msg)
emitMu.Unlock()
return DispatchResult{Outcome: OutcomeIngested}, nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan error, 1)
go func() { done <- c.Run(ctx, db.LarkInstallation{AppID: "test_app"}, emit) }()
pushDataFrame(conn, []byte("evt-1"), "m1")
deadline := time.After(2 * time.Second)
for {
emitMu.Lock()
n := len(emitted)
emitMu.Unlock()
if n >= 1 {
break
}
select {
case <-deadline:
t.Fatal("no emit within 2s")
case <-time.After(5 * time.Millisecond):
}
}
cancel()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("Run did not return after cancel")
}
emitMu.Lock()
defer emitMu.Unlock()
if emitted[0].Body != "ENRICHED:raw-evt-1" {
t.Errorf("emit body = %q; enricher did not run before emit", emitted[0].Body)
}
enr.mu.Lock()
defer enr.mu.Unlock()
if len(enr.msgs) != 1 || enr.msgs[0].Body != "raw-evt-1" {
t.Errorf("enricher received %+v", enr.msgs)
}
if len(enr.creds) != 1 || enr.creds[0].AppID != "test_app" || enr.creds[0].AppSecret != "secret" {
t.Errorf("enricher got wrong creds: %+v", enr.creds)
}
}

View File

@@ -74,6 +74,13 @@ func (d *LarkJSONFrameDecoder) Decode(payload []byte, inst db.LarkInstallation)
ChatType: normalizeChatType(evt.Message.ChatType),
MessageID: evt.Message.MessageID,
SenderOpenID: OpenID(evt.Sender.SenderID.OpenID),
MessageType: evt.Message.MessageType,
// parent_id / root_id are populated by Lark only in reply
// scenarios. The enricher keys quoted-reply expansion off
// ParentID (the directly quoted message); RootID is carried for
// completeness / future thread handling.
ParentID: evt.Message.ParentID,
RootID: evt.Message.RootID,
}
botUnionID := ""
@@ -81,12 +88,24 @@ func (d *LarkJSONFrameDecoder) Decode(payload []byte, inst db.LarkInstallation)
botUnionID = inst.BotUnionID.String
}
// text + post are flattened synchronously here (no external calls —
// the decoder must stay fast and dependency-free). merge_forward
// leaves Body empty: it needs an HTTP round-trip to expand and is
// handled downstream by the enricher, which keys off MessageType.
// Other types (image, file, …) also leave Body empty in this MVP;
// attachment ingestion is a separate issue.
switch evt.Message.MessageType {
case "text":
msg.Body = resolveMentions(extractTextBody(evt.Message.Content),
case "text", "post":
msg.Body = resolveMentions(flattenContent(evt.Message.MessageType, evt.Message.Content),
evt.Message.Mentions, inst.BotOpenID, botUnionID)
}
// Snapshot the user's own text as the command source BEFORE any
// enrichment runs. The enricher rewrites Body (prepending quoted /
// forwarded context) but never touches CommandBody, so `/issue …`
// is still parsed against what the user actually typed.
msg.CommandBody = msg.Body
if msg.ChatType == ChatTypeGroup {
msg.AddressedToBot = containsMention(evt.Message.Mentions, inst.BotOpenID, botUnionID)
}
@@ -130,6 +149,11 @@ type larkMessageReceiveEvent struct {
Content string `json:"content"`
Mentions []larkMention `json:"mentions"`
CreateTime string `json:"create_time"`
// ParentID / RootID are only present when the message is a
// reply / quote. ParentID is the directly quoted message;
// RootID is the root of the reply tree.
ParentID string `json:"parent_id"`
RootID string `json:"root_id"`
} `json:"message"`
}

View File

@@ -451,3 +451,112 @@ func TestLarkJSONFrameDecoderNonTextMessageHasEmptyBody(t *testing.T) {
t.Error("MessageID should still be populated for non-text events")
}
}
// TestLarkJSONFrameDecoderPostMessageFlattened verifies that a rich-text
// `post` message is flattened to plain text end-to-end through Decode —
// the MUL-2951 example. Body.content is the JSON-encoded post object; we
// marshal a Go string to get the correctly-escaped content field.
func TestLarkJSONFrameDecoderPostMessageFlattened(t *testing.T) {
t.Parallel()
postContent := `{"title":"周报","content":[[{"tag":"text","text":"本周完成:"}],[{"tag":"text","text":"Lark 集成"},{"tag":"a","href":"https://github.com/multica-ai/multica/pull/3277","text":"PR #3277"}]]}`
escaped, err := json.Marshal(postContent)
if err != nil {
t.Fatalf("marshal: %v", err)
}
raw := []byte(`{
"type":"event_callback",
"header":{"event_id":"e","event_type":"im.message.receive_v1","app_id":"a"},
"event":{
"sender":{"sender_id":{"open_id":"ou_user"}},
"message":{"message_id":"m","chat_id":"c","chat_type":"p2p","message_type":"post","content":` + string(escaped) + `}
}
}`)
msg, ok, err := NewLarkJSONFrameDecoder().Decode(raw, db.LarkInstallation{BotOpenID: "ou_bot"})
if err != nil || !ok {
t.Fatalf("Decode ok=%v err=%v", ok, err)
}
want := "周报\n本周完成\nLark 集成 PR #3277 (https://github.com/multica-ai/multica/pull/3277)"
if msg.Body != want {
t.Errorf("post Body\n got = %q\nwant = %q", msg.Body, want)
}
if msg.MessageType != "post" {
t.Errorf("MessageType = %q want post", msg.MessageType)
}
}
// TestLarkJSONFrameDecoderPostResolvesMentions checks that @-mentions in
// a post (carried as `at` spans with @_user_N placeholders) are resolved
// through the same mention pipeline as text, including stripping the
// bot's own mention.
func TestLarkJSONFrameDecoderPostResolvesMentions(t *testing.T) {
t.Parallel()
postContent := `{"content":[[{"tag":"at","user_id":"@_user_1","user_name":""},{"tag":"text","text":"please review"},{"tag":"at","user_id":"@_user_2","user_name":""}]]}`
escaped, err := json.Marshal(postContent)
if err != nil {
t.Fatalf("marshal: %v", err)
}
raw := []byte(`{
"type":"event_callback",
"header":{"event_id":"e","event_type":"im.message.receive_v1","app_id":"a"},
"event":{
"sender":{"sender_id":{"open_id":"ou_user"}},
"message":{
"message_id":"m","chat_id":"c","chat_type":"group","message_type":"post",
"content":` + string(escaped) + `,
"mentions":[
{"key":"@_user_1","id":{"open_id":"ou_bot"},"name":"Bot"},
{"key":"@_user_2","id":{"open_id":"ou_alice"},"name":"Alice"}
]
}
}
}`)
msg, ok, err := NewLarkJSONFrameDecoder().Decode(raw, db.LarkInstallation{BotOpenID: "ou_bot"})
if err != nil || !ok {
t.Fatalf("Decode ok=%v err=%v", ok, err)
}
// @_user_1 is the bot → stripped; @_user_2 → @Alice.
want := "please review @Alice"
if msg.Body != want {
t.Errorf("post Body\n got = %q\nwant = %q", msg.Body, want)
}
if !msg.AddressedToBot {
t.Error("AddressedToBot should be true (bot was @-mentioned)")
}
}
// TestLarkJSONFrameDecoderCapturesReplyLinkage verifies parent_id /
// root_id from a quote-reply event land on the InboundMessage so the
// enricher can expand them.
func TestLarkJSONFrameDecoderCapturesReplyLinkage(t *testing.T) {
t.Parallel()
raw := []byte(`{
"type":"event_callback",
"header":{"event_id":"e","event_type":"im.message.receive_v1","app_id":"a"},
"event":{
"sender":{"sender_id":{"open_id":"ou_user"}},
"message":{
"message_id":"om_child","chat_id":"c","chat_type":"group","message_type":"text",
"content":"{\"text\":\"去实现\"}",
"parent_id":"om_parent","root_id":"om_root"
}
}
}`)
msg, ok, err := NewLarkJSONFrameDecoder().Decode(raw, db.LarkInstallation{BotOpenID: "ou_bot"})
if err != nil || !ok {
t.Fatalf("Decode ok=%v err=%v", ok, err)
}
if msg.ParentID != "om_parent" {
t.Errorf("ParentID = %q want om_parent", msg.ParentID)
}
if msg.RootID != "om_root" {
t.Errorf("RootID = %q want om_root", msg.RootID)
}
if msg.MessageType != "text" {
t.Errorf("MessageType = %q want text", msg.MessageType)
}
// CommandBody snapshots the user's own text (pre-enrichment) so
// /issue parsing survives the enricher's prepended context blocks.
if msg.CommandBody != "去实现" {
t.Errorf("CommandBody = %q want 去实现", msg.CommandBody)
}
}