mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-29 02:19:19 +02:00
Compare commits
1 Commits
agent/lamb
...
agent/j/a4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9a39a309ae |
@@ -250,7 +250,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
// inbound messages will be silently dropped until the
|
||||
// config is fixed, with the boot log labelling the mode
|
||||
// "noop" so operators can spot it.
|
||||
connectorFactory, connectorLabel := buildLarkConnectorFactory(installSvc)
|
||||
connectorFactory, connectorLabel := buildLarkConnectorFactory(installSvc, larkClient)
|
||||
h.LarkHub = lark.NewHub(queries, connectorFactory, dispatcher, lark.HubConfig{})
|
||||
|
||||
// OutcomeReplier wires the outbound side of the
|
||||
@@ -943,7 +943,7 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
//
|
||||
// Returns the factory plus a short label for the boot log: "ws" in
|
||||
// the healthy case, "noop" in the fallback case.
|
||||
func buildLarkConnectorFactory(installSvc *lark.InstallationService) (lark.ConnectorFactory, string) {
|
||||
func buildLarkConnectorFactory(installSvc *lark.InstallationService, apiClient lark.APIClient) (lark.ConnectorFactory, string) {
|
||||
endpointFetcher, err := lark.NewHTTPConnectionTokenFetcher(lark.HTTPConnectionTokenConfig{
|
||||
BaseURL: strings.TrimSpace(os.Getenv("MULTICA_LARK_CALLBACK_BASE_URL")),
|
||||
Logger: slog.Default(),
|
||||
@@ -968,10 +968,16 @@ func buildLarkConnectorFactory(installSvc *lark.InstallationService) (lark.Conne
|
||||
}
|
||||
return creds, nil
|
||||
})
|
||||
// Inbound enricher: expands quoted replies / forwarded bundles into
|
||||
// the agent's body via the IM API before dispatch. It shares the
|
||||
// connector's resolved credentials and runs under the connector's
|
||||
// EnrichTimeout so it cannot overrun the Lark long-conn ACK budget.
|
||||
enricher := lark.NewInboundEnricher(apiClient, lark.InboundEnricherConfig{Logger: slog.Default()})
|
||||
conn, err := lark.NewWSLongConnConnector(lark.WSConnectorConfig{
|
||||
Dialer: dialer,
|
||||
EndpointFetcher: endpointFetcher,
|
||||
FrameDecoder: decoder,
|
||||
Enricher: enricher,
|
||||
CredentialsProvider: credsProvider,
|
||||
Logger: slog.Default(),
|
||||
})
|
||||
|
||||
@@ -71,9 +71,16 @@ type EnsureChatSessionParams struct {
|
||||
// for callers that have already finalized dedup outside the
|
||||
// transaction.
|
||||
type AppendUserMessageParams struct {
|
||||
ChatSessionID pgtype.UUID
|
||||
Sender pgtype.UUID
|
||||
Body string
|
||||
ChatSessionID pgtype.UUID
|
||||
Sender pgtype.UUID
|
||||
// Body is the full text stored as the chat_message — including any
|
||||
// quoted-reply / forwarded context the enricher inlined.
|
||||
Body string
|
||||
// CommandBody is the user's own typed text, used as the `/issue`
|
||||
// command source. It is the un-enriched Body; when empty (callers
|
||||
// that don't set it), `/issue` parsing falls back to Body so
|
||||
// behavior is unchanged for the non-enriched path.
|
||||
CommandBody string
|
||||
InstallationID pgtype.UUID
|
||||
LarkMessageID string
|
||||
ClaimToken pgtype.UUID
|
||||
|
||||
@@ -198,11 +198,22 @@ func (s *chatSessionService) AppendUserMessage(ctx context.Context, p AppendUser
|
||||
defer tx.Rollback(ctx)
|
||||
qtx := s.queries.WithTx(tx)
|
||||
|
||||
// Parse the command BEFORE the insert, so the "/issue alone → use
|
||||
// previous user message" fallback queries from the message set
|
||||
// that does NOT yet include the message currently being appended.
|
||||
// Otherwise the previous-message lookup would self-reference.
|
||||
cmd, _ := parseIssueCommand(p.Body)
|
||||
// Parse the command from the user's OWN typed text (CommandBody),
|
||||
// not the stored Body: the enricher prepends quoted / forwarded
|
||||
// context to Body, which would push a `/issue …` off the first line
|
||||
// and silently stop creating the issue (parseIssueCommand only
|
||||
// inspects the first non-empty line). Fall back to Body when
|
||||
// CommandBody is unset so non-enriched callers are unaffected.
|
||||
//
|
||||
// Parse BEFORE the insert so the "/issue alone → use previous user
|
||||
// message" fallback queries from the message set that does NOT yet
|
||||
// include the message currently being appended; otherwise the
|
||||
// previous-message lookup would self-reference.
|
||||
commandSource := p.CommandBody
|
||||
if commandSource == "" {
|
||||
commandSource = p.Body
|
||||
}
|
||||
cmd, _ := parseIssueCommand(commandSource)
|
||||
if cmd != nil && cmd.Title == "" {
|
||||
prev, err := qtx.GetMostRecentUserChatMessage(ctx, p.ChatSessionID)
|
||||
if err == nil {
|
||||
|
||||
@@ -69,6 +69,46 @@ type APIClient interface {
|
||||
// is then frozen into lark_installation alongside the app_id /
|
||||
// app_secret in the same transaction as the installer-bind.
|
||||
GetBotInfo(ctx context.Context, creds InstallationCredentials) (BotInfo, error)
|
||||
|
||||
// GetMessage fetches a message by id via
|
||||
// GET /open-apis/im/v1/messages/{message_id}. Lark always returns an
|
||||
// ARRAY (data.items[]): for a normal message exactly one element;
|
||||
// for a `merge_forward` message the first element is the forward
|
||||
// sentinel and the remaining elements are the bundled child messages
|
||||
// (each a normal typed message linked back by upper_message_id). The
|
||||
// inbound enricher relies on both shapes: items[0] for a quoted-reply
|
||||
// parent, items[1:] for a forwarded transcript. Returning the raw
|
||||
// slice keeps this method a thin transport adapter — flattening and
|
||||
// block assembly are the enricher's job.
|
||||
GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error)
|
||||
}
|
||||
|
||||
// LarkMessage is the normalized slice of an IM v1 message item the
|
||||
// enricher needs. Body.content is passed through raw (still the
|
||||
// JSON-encoded, msg_type-specific string Lark double-encodes) so the
|
||||
// flattener — not the transport client — owns content interpretation.
|
||||
type LarkMessage struct {
|
||||
MessageID string
|
||||
MessageType string // Lark `msg_type`: text / post / image / merge_forward / …
|
||||
Content string // raw body.content (a JSON-encoded string)
|
||||
SenderID string // sender.id (open_id for users, app_id for apps)
|
||||
SenderType string // sender.sender_type: user / app / anonymous / …
|
||||
CreateTime string // epoch milliseconds, as Lark returns it (a string)
|
||||
ParentID string
|
||||
RootID string
|
||||
UpperMessageID string // the merge_forward parent a child hangs under
|
||||
Deleted bool
|
||||
Mentions []LarkMessageMention
|
||||
}
|
||||
|
||||
// LarkMessageMention mirrors a mentions[] entry on the IM REST item
|
||||
// shape. Note this differs from the WS receive event's mention shape:
|
||||
// here `id` is a bare open_id string, not a nested {open_id, union_id,
|
||||
// user_id} object.
|
||||
type LarkMessageMention struct {
|
||||
Key string // e.g. "@_user_1"
|
||||
ID string // open_id
|
||||
Name string // display name (may be empty)
|
||||
}
|
||||
|
||||
// BotInfo is the slice of /open-apis/bot/v3/info (+ a follow-up
|
||||
@@ -227,3 +267,8 @@ func (s *stubAPIClient) GetBotInfo(ctx context.Context, creds InstallationCreden
|
||||
s.log.Warn("lark stub client: GetBotInfo called", "app_id", creds.AppID)
|
||||
return BotInfo{}, ErrAPIClientNotConfigured
|
||||
}
|
||||
|
||||
func (s *stubAPIClient) GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error) {
|
||||
s.log.Warn("lark stub client: GetMessage called", "message_id", messageID)
|
||||
return nil, ErrAPIClientNotConfigured
|
||||
}
|
||||
|
||||
164
server/internal/integrations/lark/content_flatten.go
Normal file
164
server/internal/integrations/lark/content_flatten.go
Normal file
@@ -0,0 +1,164 @@
|
||||
package lark
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// flattenContent renders a Lark message's body.content — the raw,
|
||||
// JSON-encoded string Lark double-encodes — into plain text, dispatching
|
||||
// on msg_type. It is the shared structural step used by BOTH ingress
|
||||
// paths:
|
||||
//
|
||||
// - the inbound decoder, for the user's own text / post message, and
|
||||
// - the enricher, for the quoted-reply parent and merge_forward child
|
||||
// messages it pulls back over the IM REST API.
|
||||
//
|
||||
// Mention placeholders (@_user_N) are preserved verbatim; the caller is
|
||||
// responsible for resolving them against the message's mentions[] array
|
||||
// via resolveMentions. The two ingress shapes (WS receive event vs IM
|
||||
// REST item) carry the mentions array differently — only the caller
|
||||
// knows which one applies — so flattening stays mention-agnostic.
|
||||
//
|
||||
// Non-text media types render as a stable bracketed placeholder so the
|
||||
// agent sees that *something* was attached without us downloading the
|
||||
// binary. Attachment ingestion is explicitly out of scope (tracked as a
|
||||
// separate attachment-pipeline issue), and merge_forward is intercepted
|
||||
// by the enricher before it reaches here (expanding it needs an HTTP
|
||||
// round-trip); the inline placeholder is only a fallback for a forward
|
||||
// nested inside another forward.
|
||||
func flattenContent(msgType, rawContent string) string {
|
||||
switch msgType {
|
||||
case "text":
|
||||
return extractTextBody(rawContent)
|
||||
case "post":
|
||||
return flattenPostContent(rawContent)
|
||||
case "image":
|
||||
return "[Image]"
|
||||
case "file":
|
||||
return "[File]"
|
||||
case "audio":
|
||||
return "[Audio]"
|
||||
case "media":
|
||||
return "[Video]"
|
||||
case "sticker":
|
||||
return "[Sticker]"
|
||||
case "interactive":
|
||||
return "[interactive card]"
|
||||
case "share_chat":
|
||||
return "[Shared Chat]"
|
||||
case "share_user":
|
||||
return "[Shared User Card]"
|
||||
case "system":
|
||||
return "[System Message]"
|
||||
case "merge_forward":
|
||||
return "[forwarded messages]"
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
// larkPostContent mirrors the RECEIVE-side shape of a `post` rich-text
|
||||
// body.content. Crucially this is NOT the locale-wrapped form the SEND
|
||||
// API takes ({"zh_cn": {...}}): an inbound post body.content unmarshals
|
||||
// directly into {title, content}. content is a 2-D array — the outer
|
||||
// array is the ordered list of paragraphs, each inner array the ordered
|
||||
// spans of that paragraph; the newline between paragraphs is implicit in
|
||||
// the array boundary, not a span.
|
||||
type larkPostContent struct {
|
||||
Title string `json:"title"`
|
||||
Content [][]larkPostSpan `json:"content"`
|
||||
}
|
||||
|
||||
// larkPostSpan is one node inside a post paragraph. Only the fields that
|
||||
// carry renderable text are modelled; the tag set is extensible, so the
|
||||
// flattener emits `text` for any unrecognized tag and skips it otherwise
|
||||
// rather than failing.
|
||||
type larkPostSpan struct {
|
||||
Tag string `json:"tag"`
|
||||
Text string `json:"text"`
|
||||
Href string `json:"href"`
|
||||
UserID string `json:"user_id"`
|
||||
UserName string `json:"user_name"`
|
||||
}
|
||||
|
||||
// flattenPostContent flattens a received `post` body.content into plain
|
||||
// text: the title (when present) on its own first line, then one line
|
||||
// per paragraph. Within a paragraph spans are joined with a single space
|
||||
// — this matches Lark's own rendering, where logically separate chunks
|
||||
// ("Lark 集成", then a link "PR #3277") read as space-separated words.
|
||||
//
|
||||
// A link span renders as "text (href)" so the URL survives into the
|
||||
// agent's context; an `at` span renders as its @_user_N placeholder (or
|
||||
// the inline user_name when Lark already resolved it) so a downstream
|
||||
// resolveMentions pass can substitute the display name. Media spans
|
||||
// degrade to the same bracketed placeholders flattenContent uses.
|
||||
func flattenPostContent(raw string) string {
|
||||
if raw == "" {
|
||||
return ""
|
||||
}
|
||||
var doc larkPostContent
|
||||
if err := json.Unmarshal([]byte(raw), &doc); err != nil {
|
||||
return ""
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
write := func(line string) {
|
||||
if b.Len() > 0 {
|
||||
b.WriteByte('\n')
|
||||
}
|
||||
b.WriteString(line)
|
||||
}
|
||||
if doc.Title != "" {
|
||||
write(doc.Title)
|
||||
}
|
||||
for _, para := range doc.Content {
|
||||
write(flattenPostParagraph(para))
|
||||
}
|
||||
return strings.TrimRight(b.String(), "\n")
|
||||
}
|
||||
|
||||
func flattenPostParagraph(spans []larkPostSpan) string {
|
||||
parts := make([]string, 0, len(spans))
|
||||
for _, s := range spans {
|
||||
switch s.Tag {
|
||||
case "text", "code_block":
|
||||
if s.Text != "" {
|
||||
parts = append(parts, s.Text)
|
||||
}
|
||||
case "a":
|
||||
switch {
|
||||
case s.Text != "" && s.Href != "":
|
||||
parts = append(parts, s.Text+" ("+s.Href+")")
|
||||
case s.Text != "":
|
||||
parts = append(parts, s.Text)
|
||||
case s.Href != "":
|
||||
parts = append(parts, s.Href)
|
||||
}
|
||||
case "at":
|
||||
// Prefer an already-resolved display name; otherwise emit
|
||||
// the user_id, which on the receive side is the @_user_N
|
||||
// placeholder a later resolveMentions pass maps to a name.
|
||||
switch {
|
||||
case s.UserName != "":
|
||||
parts = append(parts, "@"+s.UserName)
|
||||
case s.UserID != "":
|
||||
parts = append(parts, s.UserID)
|
||||
}
|
||||
case "img":
|
||||
parts = append(parts, "[Image]")
|
||||
case "media":
|
||||
parts = append(parts, "[Video]")
|
||||
case "emotion":
|
||||
// emoji_type is an enum key (e.g. "SMILE"), not display
|
||||
// text — skip it rather than leak the key.
|
||||
case "hr":
|
||||
parts = append(parts, "---")
|
||||
default:
|
||||
if s.Text != "" {
|
||||
parts = append(parts, s.Text)
|
||||
}
|
||||
}
|
||||
}
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
101
server/internal/integrations/lark/content_flatten_test.go
Normal file
101
server/internal/integrations/lark/content_flatten_test.go
Normal file
@@ -0,0 +1,101 @@
|
||||
package lark
|
||||
|
||||
import "testing"
|
||||
|
||||
// TestFlattenPostContent_IssueExample pins the exact rich-text `post`
|
||||
// example from MUL-2951: a title line, a prose paragraph, and a
|
||||
// paragraph mixing a text span with a hyperlink span. The link must
|
||||
// render as "text (href)" so the URL survives into the agent's context.
|
||||
func TestFlattenPostContent_IssueExample(t *testing.T) {
|
||||
t.Parallel()
|
||||
// Received-side post body.content (NOT locale-wrapped).
|
||||
raw := `{
|
||||
"title": "周报",
|
||||
"content": [
|
||||
[{ "tag": "text", "text": "本周完成:" }],
|
||||
[
|
||||
{ "tag": "text", "text": "Lark 集成" },
|
||||
{ "tag": "a", "href": "https://github.com/multica-ai/multica/pull/3277", "text": "PR #3277" }
|
||||
]
|
||||
]
|
||||
}`
|
||||
want := "周报\n本周完成:\nLark 集成 PR #3277 (https://github.com/multica-ai/multica/pull/3277)"
|
||||
if got := flattenPostContent(raw); got != want {
|
||||
t.Errorf("flattenPostContent()\n got = %q\nwant = %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlattenPostContent_NoTitle(t *testing.T) {
|
||||
t.Parallel()
|
||||
raw := `{"content":[[{"tag":"text","text":"line one"}],[{"tag":"text","text":"line two"}]]}`
|
||||
want := "line one\nline two"
|
||||
if got := flattenPostContent(raw); got != want {
|
||||
t.Errorf("got %q want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlattenPostContent_MediaAndMentionSpans(t *testing.T) {
|
||||
t.Parallel()
|
||||
// at span carries the @_user_N placeholder (resolved later by
|
||||
// resolveMentions); media tags degrade to bracket placeholders;
|
||||
// emotion is skipped entirely.
|
||||
raw := `{"content":[[
|
||||
{"tag":"at","user_id":"@_user_1","user_name":""},
|
||||
{"tag":"text","text":"look"},
|
||||
{"tag":"img","image_key":"img_x"},
|
||||
{"tag":"emotion","emoji_type":"SMILE"}
|
||||
]]}`
|
||||
want := "@_user_1 look [Image]"
|
||||
if got := flattenPostContent(raw); got != want {
|
||||
t.Errorf("got %q want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlattenPostContent_AtPrefersResolvedName(t *testing.T) {
|
||||
t.Parallel()
|
||||
raw := `{"content":[[{"tag":"at","user_id":"@_user_1","user_name":"Tom"},{"tag":"text","text":"hi"}]]}`
|
||||
want := "@Tom hi"
|
||||
if got := flattenPostContent(raw); got != want {
|
||||
t.Errorf("got %q want %q", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlattenPostContent_Malformed(t *testing.T) {
|
||||
t.Parallel()
|
||||
if got := flattenPostContent("not json"); got != "" {
|
||||
t.Errorf("malformed content should flatten to empty, got %q", got)
|
||||
}
|
||||
if got := flattenPostContent(""); got != "" {
|
||||
t.Errorf("empty content should flatten to empty, got %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlattenContent_DispatchByType(t *testing.T) {
|
||||
t.Parallel()
|
||||
cases := []struct {
|
||||
name string
|
||||
msgType string
|
||||
content string
|
||||
want string
|
||||
}{
|
||||
{"text", "text", `{"text":"hello"}`, "hello"},
|
||||
{"image", "image", `{"image_key":"img_x"}`, "[Image]"},
|
||||
{"file", "file", `{"file_key":"f"}`, "[File]"},
|
||||
{"audio", "audio", `{"file_key":"f"}`, "[Audio]"},
|
||||
{"media", "media", `{"file_key":"f"}`, "[Video]"},
|
||||
{"sticker", "sticker", `{"file_key":"f"}`, "[Sticker]"},
|
||||
{"interactive", "interactive", `{"title":"t"}`, "[interactive card]"},
|
||||
{"share_chat", "share_chat", `{"chat_id":"oc"}`, "[Shared Chat]"},
|
||||
{"merge_forward", "merge_forward", `{"content":"Merged and Forwarded Message"}`, "[forwarded messages]"},
|
||||
{"unknown", "totally_new_type", `{}`, ""},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
tc := tc
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
if got := flattenContent(tc.msgType, tc.content); got != tc.want {
|
||||
t.Errorf("flattenContent(%q) = %q want %q", tc.msgType, got, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,32 @@ type InboundMessage struct {
|
||||
SenderOpenID OpenID
|
||||
Body string
|
||||
AddressedToBot bool
|
||||
|
||||
// MessageType is the raw Lark msg_type ("text", "post",
|
||||
// "merge_forward", "image", "interactive", …). The decoder
|
||||
// populates it so the inbound enricher can decide whether a
|
||||
// message needs an HTTP round-trip to expand (merge_forward) while
|
||||
// the dispatcher itself stays msg_type-agnostic and only reads Body.
|
||||
MessageType string
|
||||
|
||||
// ParentID is the message_id of the message this one quote-replies
|
||||
// to, taken verbatim from the receive event's `parent_id`. Empty
|
||||
// when the message is not a reply. The enricher fetches it and
|
||||
// prepends a <quoted_message> block. RootID is the thread/root
|
||||
// anchor Lark also reports; we keep it for completeness but the
|
||||
// quoted-reply expansion keys off ParentID (the immediate parent),
|
||||
// not the root.
|
||||
ParentID string
|
||||
RootID string
|
||||
|
||||
// CommandBody is the user's OWN typed text (the decoded Body before
|
||||
// the enricher prepends any <quoted_message> / <forwarded_messages>
|
||||
// context). The `/issue` command is parsed from THIS, not from the
|
||||
// enriched Body: enrichment prepends context blocks, which would
|
||||
// otherwise push the user's `/issue …` off the first line and
|
||||
// silently stop creating the issue. The enricher leaves CommandBody
|
||||
// untouched while it rewrites Body.
|
||||
CommandBody string
|
||||
}
|
||||
|
||||
// Outcome categorizes what the Dispatcher decided to do with an
|
||||
@@ -383,6 +409,7 @@ func (d *Dispatcher) processClaimed(ctx context.Context, msg InboundMessage, ins
|
||||
ChatSessionID: sessionID,
|
||||
Sender: binding.MulticaUserID,
|
||||
Body: msg.Body,
|
||||
CommandBody: msg.CommandBody,
|
||||
InstallationID: inst.ID,
|
||||
LarkMessageID: msg.MessageID,
|
||||
ClaimToken: claimToken,
|
||||
|
||||
@@ -505,6 +505,102 @@ func (c *httpAPIClient) GetBotInfo(ctx context.Context, creds InstallationCreden
|
||||
return BotInfo{OpenID: OpenID(botResp.Bot.OpenID), UnionID: unionID}, nil
|
||||
}
|
||||
|
||||
// GetMessage retrieves a message by id via
|
||||
// GET /open-apis/im/v1/messages/{message_id}. The endpoint always wraps
|
||||
// the result in data.items[] — one element for a normal message, and a
|
||||
// forward sentinel followed by the bundled child messages for a
|
||||
// `merge_forward`. We pass user_id_type=open_id so sender.id and
|
||||
// mentions[].id come back as open_ids, matching the identifiers the
|
||||
// rest of the package keys on.
|
||||
//
|
||||
// body.content is forwarded verbatim (the raw, JSON-encoded, msg_type-
|
||||
// specific string Lark double-encodes); the enricher's flattener owns
|
||||
// interpreting it. A deleted / out-of-scope message surfaces as a Lark
|
||||
// error code, which we turn into a normal Go error so the enricher can
|
||||
// degrade to its "[unable to fetch]" placeholder without aborting the
|
||||
// inbound pipeline.
|
||||
func (c *httpAPIClient) GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error) {
|
||||
if messageID == "" {
|
||||
return nil, errors.New("lark http client: missing message_id")
|
||||
}
|
||||
token, err := c.tenantAccessToken(ctx, creds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
q := url.Values{}
|
||||
q.Set("user_id_type", "open_id")
|
||||
path := "/open-apis/im/v1/messages/" + url.PathEscape(messageID) + "?" + q.Encode()
|
||||
|
||||
var resp struct {
|
||||
Code int `json:"code"`
|
||||
Msg string `json:"msg"`
|
||||
Data struct {
|
||||
Items []larkRESTMessageItem `json:"items"`
|
||||
} `json:"data"`
|
||||
}
|
||||
if err := c.doJSON(ctx, http.MethodGet, path, token, nil, &resp); err != nil {
|
||||
return nil, fmt.Errorf("lark http client: get message: %w", err)
|
||||
}
|
||||
if resp.Code != 0 {
|
||||
if isTokenError(resp.Code) {
|
||||
c.invalidateToken(creds.AppID)
|
||||
}
|
||||
return nil, fmt.Errorf("lark http client: get message: code=%d msg=%q", resp.Code, resp.Msg)
|
||||
}
|
||||
|
||||
out := make([]LarkMessage, 0, len(resp.Data.Items))
|
||||
for _, it := range resp.Data.Items {
|
||||
out = append(out, it.normalize())
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// larkRESTMessageItem is the IM v1 message item shape returned by the
|
||||
// get / list endpoints. It differs from the WS receive event in two
|
||||
// ways the enricher cares about: msg_type (not message_type), and a
|
||||
// flat `sender.id` / `mentions[].id` string (not a nested id object).
|
||||
type larkRESTMessageItem struct {
|
||||
MessageID string `json:"message_id"`
|
||||
RootID string `json:"root_id"`
|
||||
ParentID string `json:"parent_id"`
|
||||
UpperMessageID string `json:"upper_message_id"`
|
||||
MsgType string `json:"msg_type"`
|
||||
CreateTime string `json:"create_time"`
|
||||
Deleted bool `json:"deleted"`
|
||||
Sender struct {
|
||||
ID string `json:"id"`
|
||||
IDType string `json:"id_type"`
|
||||
SenderType string `json:"sender_type"`
|
||||
} `json:"sender"`
|
||||
Body struct {
|
||||
Content string `json:"content"`
|
||||
} `json:"body"`
|
||||
Mentions []struct {
|
||||
Key string `json:"key"`
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
} `json:"mentions"`
|
||||
}
|
||||
|
||||
func (it larkRESTMessageItem) normalize() LarkMessage {
|
||||
m := LarkMessage{
|
||||
MessageID: it.MessageID,
|
||||
MessageType: it.MsgType,
|
||||
Content: it.Body.Content,
|
||||
SenderID: it.Sender.ID,
|
||||
SenderType: it.Sender.SenderType,
|
||||
CreateTime: it.CreateTime,
|
||||
ParentID: it.ParentID,
|
||||
RootID: it.RootID,
|
||||
UpperMessageID: it.UpperMessageID,
|
||||
Deleted: it.Deleted,
|
||||
}
|
||||
for _, mn := range it.Mentions {
|
||||
m.Mentions = append(m.Mentions, LarkMessageMention{Key: mn.Key, ID: mn.ID, Name: mn.Name})
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// fetchBotUnionID resolves a Bot's `union_id` from its `open_id` via
|
||||
// /open-apis/contact/v3/users/{open_id}?user_id_type=open_id. Split
|
||||
// out from GetBotInfo so the failure mode is explicit and the call
|
||||
|
||||
124
server/internal/integrations/lark/http_client_getmessage_test.go
Normal file
124
server/internal/integrations/lark/http_client_getmessage_test.go
Normal file
@@ -0,0 +1,124 @@
|
||||
package lark
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
const messageGetPrefix = "/open-apis/im/v1/messages/"
|
||||
|
||||
// TestHTTPClient_GetMessageSingle exercises the happy path: a normal
|
||||
// message comes back as a one-element items[] and is normalized with
|
||||
// raw body.content, sender, and REST-shaped mentions intact.
|
||||
func TestHTTPClient_GetMessageSingle(t *testing.T) {
|
||||
fake := newLarkFake(t)
|
||||
fake.stubToken("tok", 7200)
|
||||
fake.mux.HandleFunc(messageGetPrefix, func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
t.Errorf("want GET, got %s", r.Method)
|
||||
}
|
||||
if id := strings.TrimPrefix(r.URL.Path, messageGetPrefix); id != "om_parent" {
|
||||
t.Errorf("path id = %q", id)
|
||||
}
|
||||
if r.URL.Query().Get("user_id_type") != "open_id" {
|
||||
t.Errorf("missing user_id_type=open_id: %q", r.URL.RawQuery)
|
||||
}
|
||||
writeJSON(w, map[string]any{
|
||||
"code": 0, "msg": "ok",
|
||||
"data": map[string]any{
|
||||
"items": []any{
|
||||
map[string]any{
|
||||
"message_id": "om_parent",
|
||||
"msg_type": "text",
|
||||
"create_time": "1000",
|
||||
"sender": map[string]any{"id": "ou_a", "id_type": "open_id", "sender_type": "user"},
|
||||
"body": map[string]any{"content": `{"text":"hi"}`},
|
||||
"mentions": []any{map[string]any{"key": "@_user_1", "id": "ou_b", "name": "Bob"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
c := newTestClient(fake, time.Now)
|
||||
items, err := c.GetMessage(context.Background(), testCreds(), "om_parent")
|
||||
if err != nil {
|
||||
t.Fatalf("GetMessage: %v", err)
|
||||
}
|
||||
if len(items) != 1 {
|
||||
t.Fatalf("items = %d, want 1", len(items))
|
||||
}
|
||||
m := items[0]
|
||||
if m.MessageID != "om_parent" || m.MessageType != "text" || m.Content != `{"text":"hi"}` {
|
||||
t.Errorf("normalized = %+v", m)
|
||||
}
|
||||
if m.SenderID != "ou_a" || m.SenderType != "user" {
|
||||
t.Errorf("sender = id:%q type:%q", m.SenderID, m.SenderType)
|
||||
}
|
||||
if len(m.Mentions) != 1 || m.Mentions[0].Key != "@_user_1" || m.Mentions[0].ID != "ou_b" || m.Mentions[0].Name != "Bob" {
|
||||
t.Errorf("mentions = %+v", m.Mentions)
|
||||
}
|
||||
if a := fake.lastAuth(); a != "Bearer tok" {
|
||||
t.Errorf("auth header = %q", a)
|
||||
}
|
||||
}
|
||||
|
||||
// TestHTTPClient_GetMessageMergeForward pins the merge_forward contract:
|
||||
// GetMessage(forward_id) returns the sentinel parent followed by the
|
||||
// bundled child messages, all in one items[] array.
|
||||
func TestHTTPClient_GetMessageMergeForward(t *testing.T) {
|
||||
fake := newLarkFake(t)
|
||||
fake.stubToken("tok", 7200)
|
||||
fake.mux.HandleFunc(messageGetPrefix, func(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, map[string]any{
|
||||
"code": 0, "msg": "ok",
|
||||
"data": map[string]any{
|
||||
"items": []any{
|
||||
map[string]any{"message_id": "om_fwd", "msg_type": "merge_forward", "body": map[string]any{"content": `{"content":"Merged and Forwarded Message"}`}},
|
||||
map[string]any{"message_id": "c1", "msg_type": "text", "upper_message_id": "om_fwd", "create_time": "1000", "sender": map[string]any{"id": "ou_a", "sender_type": "user"}, "body": map[string]any{"content": `{"text":"one"}`}},
|
||||
map[string]any{"message_id": "c2", "msg_type": "text", "upper_message_id": "om_fwd", "create_time": "2000", "sender": map[string]any{"id": "ou_b", "sender_type": "user"}, "body": map[string]any{"content": `{"text":"two"}`}},
|
||||
},
|
||||
},
|
||||
})
|
||||
})
|
||||
|
||||
c := newTestClient(fake, time.Now)
|
||||
items, err := c.GetMessage(context.Background(), testCreds(), "om_fwd")
|
||||
if err != nil {
|
||||
t.Fatalf("GetMessage: %v", err)
|
||||
}
|
||||
if len(items) != 3 {
|
||||
t.Fatalf("items = %d, want 3 (sentinel + 2 children)", len(items))
|
||||
}
|
||||
if items[0].MessageType != "merge_forward" {
|
||||
t.Errorf("items[0] should be the forward sentinel, got %q", items[0].MessageType)
|
||||
}
|
||||
if items[1].UpperMessageID != "om_fwd" || items[2].UpperMessageID != "om_fwd" {
|
||||
t.Errorf("children should link to the forward via upper_message_id")
|
||||
}
|
||||
}
|
||||
|
||||
// TestHTTPClient_GetMessageErrorCode maps a Lark business error (e.g.
|
||||
// deleted / not visible) to a Go error so the enricher can degrade.
|
||||
func TestHTTPClient_GetMessageErrorCode(t *testing.T) {
|
||||
fake := newLarkFake(t)
|
||||
fake.stubToken("tok", 7200)
|
||||
fake.mux.HandleFunc(messageGetPrefix, func(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, map[string]any{"code": 230110, "msg": "message has been deleted"})
|
||||
})
|
||||
c := newTestClient(fake, time.Now)
|
||||
if _, err := c.GetMessage(context.Background(), testCreds(), "om_gone"); err == nil {
|
||||
t.Fatal("expected error for non-zero Lark code")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTPClient_GetMessageEmptyID(t *testing.T) {
|
||||
fake := newLarkFake(t)
|
||||
c := newTestClient(fake, time.Now)
|
||||
if _, err := c.GetMessage(context.Background(), testCreds(), ""); err == nil {
|
||||
t.Fatal("expected error for empty message id")
|
||||
}
|
||||
}
|
||||
300
server/internal/integrations/lark/inbound_enricher.go
Normal file
300
server/internal/integrations/lark/inbound_enricher.go
Normal file
@@ -0,0 +1,300 @@
|
||||
package lark
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// larkMsgTypeMergeForward is the msg_type of a "merged & forwarded"
|
||||
// message — a bundle of other messages a user forwarded as one unit.
|
||||
// Its own body.content is a fixed sentinel string; the actual forwarded
|
||||
// messages come back as the extra items[] of a GetMessage call.
|
||||
const larkMsgTypeMergeForward = "merge_forward"
|
||||
|
||||
// defaultMaxForwardChildren caps how many child messages we inline from
|
||||
// a single forward. Lark itself bounds a merge_forward at 100 messages;
|
||||
// we mirror that as a safety valve so a pathological bundle can't blow
|
||||
// up the agent's context. Anything beyond the cap is dropped with a
|
||||
// visible "... (N more truncated)" marker.
|
||||
const defaultMaxForwardChildren = 100
|
||||
|
||||
// Enricher expands an inbound message's body with context the user
|
||||
// EXPLICITLY attached — a quoted reply or a merged-and-forwarded bundle
|
||||
// — by calling back into Lark's IM API. It runs after the (fast,
|
||||
// HTTP-free) decoder and before the dispatcher, turning a bare
|
||||
// "@bot 总结一下" into a body that already carries the referenced
|
||||
// conversation inline.
|
||||
//
|
||||
// It is best-effort by contract: every fetch failure degrades to a
|
||||
// visible placeholder block and Enrich NEVER returns an error or blocks
|
||||
// ingestion. A message with nothing to expand (no parent_id, not a
|
||||
// merge_forward) is returned untouched without any network call.
|
||||
type Enricher interface {
|
||||
Enrich(ctx context.Context, msg InboundMessage, creds InstallationCredentials) InboundMessage
|
||||
}
|
||||
|
||||
// InboundEnricherConfig tunes the enricher. Both fields default.
|
||||
type InboundEnricherConfig struct {
|
||||
// MaxForwardChildren caps inlined forward children. <=0 uses
|
||||
// defaultMaxForwardChildren.
|
||||
MaxForwardChildren int
|
||||
// Logger receives best-effort warnings about fetch failures. Nil
|
||||
// uses slog.Default().
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
type inboundEnricher struct {
|
||||
client APIClient
|
||||
maxForwardChildren int
|
||||
logger *slog.Logger
|
||||
}
|
||||
|
||||
// NewInboundEnricher builds an Enricher backed by the given Lark API
|
||||
// client. The client supplies GetMessage; everything else (flattening,
|
||||
// block assembly, speaker labelling) is local.
|
||||
func NewInboundEnricher(client APIClient, cfg InboundEnricherConfig) Enricher {
|
||||
if cfg.MaxForwardChildren <= 0 {
|
||||
cfg.MaxForwardChildren = defaultMaxForwardChildren
|
||||
}
|
||||
if cfg.Logger == nil {
|
||||
cfg.Logger = slog.Default()
|
||||
}
|
||||
return &inboundEnricher{
|
||||
client: client,
|
||||
maxForwardChildren: cfg.MaxForwardChildren,
|
||||
logger: cfg.Logger,
|
||||
}
|
||||
}
|
||||
|
||||
// Enrich rewrites msg.Body to inline any quoted-reply parent and/or
|
||||
// forwarded bundle. Composition order matches the product spec: the
|
||||
// quoted block is prepended, then the message's own content (or, for a
|
||||
// forward, the rendered transcript) follows.
|
||||
//
|
||||
// <quoted_message …>…</quoted_message>
|
||||
//
|
||||
// <the user's own message, or the forwarded transcript>
|
||||
func (e *inboundEnricher) Enrich(ctx context.Context, msg InboundMessage, creds InstallationCredentials) InboundMessage {
|
||||
isForward := msg.MessageType == larkMsgTypeMergeForward
|
||||
if msg.ParentID == "" && !isForward {
|
||||
// Nothing the user explicitly attached — no network call.
|
||||
return msg
|
||||
}
|
||||
// If the transport isn't wired (stub client on a deployment without
|
||||
// a Lark app), skip rather than stamp every reply with a fetch
|
||||
// error. Body stays whatever the decoder produced.
|
||||
if e.client == nil || !e.client.IsConfigured() {
|
||||
return msg
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
if msg.ParentID != "" {
|
||||
b.WriteString(e.renderQuoted(ctx, creds, msg.ParentID))
|
||||
}
|
||||
|
||||
var core string
|
||||
if isForward {
|
||||
core = e.renderForwarded(ctx, creds, msg.MessageID)
|
||||
} else {
|
||||
core = msg.Body
|
||||
}
|
||||
if b.Len() > 0 && core != "" {
|
||||
b.WriteString("\n\n")
|
||||
}
|
||||
b.WriteString(core)
|
||||
|
||||
msg.Body = b.String()
|
||||
return msg
|
||||
}
|
||||
|
||||
// renderQuoted fetches the directly-quoted parent and renders a
|
||||
// <quoted_message> block. A parent that is itself a merge_forward nests
|
||||
// a <forwarded_messages> transcript inside the quoted block (the
|
||||
// GetMessage response already carries both the forward sentinel and its
|
||||
// children, so no extra round-trip is needed). Any failure degrades to
|
||||
// the documented error block.
|
||||
func (e *inboundEnricher) renderQuoted(ctx context.Context, creds InstallationCredentials, parentID string) string {
|
||||
items, err := e.client.GetMessage(ctx, creds, parentID)
|
||||
if err != nil || len(items) == 0 {
|
||||
e.logger.Warn("lark enricher: quoted parent fetch failed",
|
||||
"parent_id", parentID, "items", len(items), "err", err)
|
||||
return quotedErrorBlock(parentID)
|
||||
}
|
||||
parent := items[0]
|
||||
if parent.Deleted {
|
||||
return quotedErrorBlock(parentID)
|
||||
}
|
||||
|
||||
labeler := newSpeakerLabeler()
|
||||
sender := labeler.label(parent)
|
||||
|
||||
if parent.MessageType == larkMsgTypeMergeForward {
|
||||
inner := e.renderForwardedItems(items, parentID)
|
||||
return wrapQuoted(parentID, sender, larkMsgTypeMergeForward, inner)
|
||||
}
|
||||
text := e.flattenMessage(parent)
|
||||
if text == "" {
|
||||
text = "[empty message]"
|
||||
}
|
||||
return wrapQuoted(parentID, sender, parent.MessageType, text)
|
||||
}
|
||||
|
||||
// renderForwarded fetches a merge_forward by id and renders its bundled
|
||||
// children. The GetMessage response is [sentinel, child…]; we filter the
|
||||
// forward's own record out by id (robust to whether Lark returns the
|
||||
// sentinel first or not) and render the rest.
|
||||
func (e *inboundEnricher) renderForwarded(ctx context.Context, creds InstallationCredentials, forwardID string) string {
|
||||
items, err := e.client.GetMessage(ctx, creds, forwardID)
|
||||
if err != nil {
|
||||
e.logger.Warn("lark enricher: forward fetch failed", "message_id", forwardID, "err", err)
|
||||
return forwardedErrorBlock()
|
||||
}
|
||||
return e.renderForwardedItems(items, forwardID)
|
||||
}
|
||||
|
||||
// renderForwardedItems renders the children of a forward whose own
|
||||
// record id is forwardID. Children are time-ordered, capped, and each
|
||||
// rendered as "[<speaker>]: <text>"; a child that is itself a forward is
|
||||
// not recursed into (it gets a manual-expand placeholder) so the HTTP
|
||||
// fan-out on the ACK-latency-sensitive inbound path stays bounded.
|
||||
func (e *inboundEnricher) renderForwardedItems(items []LarkMessage, forwardID string) string {
|
||||
// The verified contract is that GetMessage(forward_id) returns one
|
||||
// level of bundling: [sentinel, direct-children…]. We therefore
|
||||
// treat every non-sentinel item as a direct child. We filter by id
|
||||
// (not by upper_message_id == forwardID) on purpose: a strict
|
||||
// upper_message_id match would silently DROP a real child if Lark
|
||||
// ever returned one with that field unpopulated. A child that is
|
||||
// itself a forward is rendered as a manual-expand placeholder below
|
||||
// rather than recursed into, so grandchildren are never inlined.
|
||||
children := make([]LarkMessage, 0, len(items))
|
||||
for _, it := range items {
|
||||
if it.MessageID == forwardID {
|
||||
continue // the forward sentinel itself
|
||||
}
|
||||
children = append(children, it)
|
||||
}
|
||||
total := len(children)
|
||||
if total == 0 {
|
||||
return "<forwarded_messages count=\"0\">\n[no forwarded content available]\n</forwarded_messages>"
|
||||
}
|
||||
|
||||
sort.SliceStable(children, func(i, j int) bool {
|
||||
return parseLarkMillis(children[i].CreateTime) < parseLarkMillis(children[j].CreateTime)
|
||||
})
|
||||
|
||||
truncated := 0
|
||||
if total > e.maxForwardChildren {
|
||||
truncated = total - e.maxForwardChildren
|
||||
children = children[:e.maxForwardChildren]
|
||||
}
|
||||
|
||||
labeler := newSpeakerLabeler()
|
||||
lines := make([]string, 0, len(children))
|
||||
for _, c := range children {
|
||||
label := labeler.label(c)
|
||||
var text string
|
||||
switch {
|
||||
case c.MessageType == larkMsgTypeMergeForward:
|
||||
text = "[nested merge_forward, expand manually]"
|
||||
default:
|
||||
text = e.flattenMessage(c)
|
||||
if text == "" {
|
||||
text = "[empty message]"
|
||||
}
|
||||
}
|
||||
lines = append(lines, fmt.Sprintf("[%s]: %s", label, text))
|
||||
}
|
||||
body := strings.Join(lines, "\n")
|
||||
if truncated > 0 {
|
||||
body += fmt.Sprintf("\n... (%d more truncated)", truncated)
|
||||
}
|
||||
return fmt.Sprintf("<forwarded_messages count=\"%d\">\n%s\n</forwarded_messages>", total, body)
|
||||
}
|
||||
|
||||
// flattenMessage turns one fetched message into plain text: structural
|
||||
// flatten by msg_type, then @_user_N placeholder resolution against the
|
||||
// message's own mentions. The bot mention is NOT stripped here (unlike
|
||||
// the inbound decoder) — a quoted / forwarded message is historical
|
||||
// context, not a fresh trigger, so passing empty bot identifiers leaves
|
||||
// every @-mention rendered as a readable @name.
|
||||
func (e *inboundEnricher) flattenMessage(m LarkMessage) string {
|
||||
if m.Deleted {
|
||||
return "[deleted message]"
|
||||
}
|
||||
raw := flattenContent(m.MessageType, m.Content)
|
||||
if raw == "" {
|
||||
return ""
|
||||
}
|
||||
return resolveMentions(raw, restMentionsToEvent(m.Mentions), "", "")
|
||||
}
|
||||
|
||||
// restMentionsToEvent adapts the IM REST mention shape (flat string id)
|
||||
// to the WS-event larkMention shape resolveMentions consumes, so a
|
||||
// single mention-resolution implementation serves both ingress paths.
|
||||
func restMentionsToEvent(ms []LarkMessageMention) []larkMention {
|
||||
if len(ms) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]larkMention, 0, len(ms))
|
||||
for _, m := range ms {
|
||||
lm := larkMention{Key: m.Key, Name: m.Name}
|
||||
lm.ID.OpenID = m.ID
|
||||
out = append(out, lm)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func wrapQuoted(messageID, sender, msgType, inner string) string {
|
||||
return fmt.Sprintf("<quoted_message message_id=%q sender=%q type=%q>\n%s\n</quoted_message>",
|
||||
messageID, sender, msgType, inner)
|
||||
}
|
||||
|
||||
func quotedErrorBlock(messageID string) string {
|
||||
return fmt.Sprintf("<quoted_message message_id=%q type=\"error\">[unable to fetch]</quoted_message>", messageID)
|
||||
}
|
||||
|
||||
func forwardedErrorBlock() string {
|
||||
return "<forwarded_messages type=\"error\">[unable to fetch]</forwarded_messages>"
|
||||
}
|
||||
|
||||
func parseLarkMillis(s string) int64 {
|
||||
n, _ := strconv.ParseInt(s, 10, 64)
|
||||
return n
|
||||
}
|
||||
|
||||
// speakerLabeler assigns stable, human-readable labels to the senders
|
||||
// within one rendered block. Lark message items carry only a sender id
|
||||
// (no display name — resolving real names needs a separate Contact API
|
||||
// lookup, tracked as a follow-up), so we map each distinct user id to
|
||||
// "User 1", "User 2", … in first-appearance order, and app senders to
|
||||
// "Bot". This keeps the conversation's turn-taking structure legible
|
||||
// without a per-sender network round-trip.
|
||||
type speakerLabeler struct {
|
||||
seen map[string]string
|
||||
n int
|
||||
}
|
||||
|
||||
func newSpeakerLabeler() *speakerLabeler {
|
||||
return &speakerLabeler{seen: make(map[string]string)}
|
||||
}
|
||||
|
||||
func (l *speakerLabeler) label(m LarkMessage) string {
|
||||
if m.SenderType == "app" {
|
||||
return "Bot"
|
||||
}
|
||||
key := m.SenderID
|
||||
if key == "" {
|
||||
key = "unknown"
|
||||
}
|
||||
if lbl, ok := l.seen[key]; ok {
|
||||
return lbl
|
||||
}
|
||||
l.n++
|
||||
lbl := fmt.Sprintf("User %d", l.n)
|
||||
l.seen[key] = lbl
|
||||
return lbl
|
||||
}
|
||||
343
server/internal/integrations/lark/inbound_enricher_test.go
Normal file
343
server/internal/integrations/lark/inbound_enricher_test.go
Normal file
@@ -0,0 +1,343 @@
|
||||
package lark
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// enricherFakeClient is a programmable APIClient for enricher tests. It
|
||||
// returns canned GetMessage results keyed by message_id and records the
|
||||
// ids it was asked for, so tests can assert both the rendered body and
|
||||
// the network fan-out (e.g. "no call when nothing to enrich").
|
||||
type enricherFakeClient struct {
|
||||
configured bool
|
||||
byID map[string][]LarkMessage
|
||||
errByID map[string]error
|
||||
calls []string
|
||||
}
|
||||
|
||||
func newEnricherFake() *enricherFakeClient {
|
||||
return &enricherFakeClient{
|
||||
configured: true,
|
||||
byID: map[string][]LarkMessage{},
|
||||
errByID: map[string]error{},
|
||||
}
|
||||
}
|
||||
|
||||
func (f *enricherFakeClient) IsConfigured() bool { return f.configured }
|
||||
func (f *enricherFakeClient) GetMessage(ctx context.Context, creds InstallationCredentials, id string) ([]LarkMessage, error) {
|
||||
f.calls = append(f.calls, id)
|
||||
if e, ok := f.errByID[id]; ok {
|
||||
return nil, e
|
||||
}
|
||||
return f.byID[id], nil
|
||||
}
|
||||
|
||||
// Unused-by-enricher methods — present only to satisfy APIClient.
|
||||
func (f *enricherFakeClient) SendInteractiveCard(context.Context, SendCardParams) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
func (f *enricherFakeClient) PatchInteractiveCard(context.Context, PatchCardParams) error { return nil }
|
||||
func (f *enricherFakeClient) SendTextMessage(context.Context, SendTextParams) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
func (f *enricherFakeClient) SendMarkdownCard(context.Context, SendMarkdownCardParams) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
func (f *enricherFakeClient) SendBindingPromptCard(context.Context, BindingPromptParams) error {
|
||||
return nil
|
||||
}
|
||||
func (f *enricherFakeClient) GetBotInfo(context.Context, InstallationCredentials) (BotInfo, error) {
|
||||
return BotInfo{}, nil
|
||||
}
|
||||
|
||||
func textMsg(id, sender, text, createTime string) LarkMessage {
|
||||
return LarkMessage{
|
||||
MessageID: id,
|
||||
MessageType: "text",
|
||||
Content: `{"text":"` + text + `"}`,
|
||||
SenderID: sender,
|
||||
SenderType: "user",
|
||||
CreateTime: createTime,
|
||||
}
|
||||
}
|
||||
|
||||
func enrich(t *testing.T, fake *enricherFakeClient, msg InboundMessage, cfg InboundEnricherConfig) InboundMessage {
|
||||
t.Helper()
|
||||
e := NewInboundEnricher(fake, cfg)
|
||||
return e.Enrich(context.Background(), msg, InstallationCredentials{AppID: "a", AppSecret: "s"})
|
||||
}
|
||||
|
||||
// TestEnrichQuotedReply covers the MUL-2951 quoted-reply example: a text
|
||||
// reply to a prior text message gets the parent inlined as a
|
||||
// <quoted_message> block ahead of the user's own prose.
|
||||
func TestEnrichQuotedReply(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_parent"] = []LarkMessage{
|
||||
textMsg("om_parent", "ou_jiayuan", "做一个删除 issue 的按钮吧", "1000"),
|
||||
}
|
||||
in := InboundMessage{MessageType: "text", MessageID: "om_child", Body: "去实现", ParentID: "om_parent"}
|
||||
|
||||
out := enrich(t, fake, in, InboundEnricherConfig{})
|
||||
|
||||
want := `<quoted_message message_id="om_parent" sender="User 1" type="text">
|
||||
做一个删除 issue 的按钮吧
|
||||
</quoted_message>
|
||||
|
||||
去实现`
|
||||
if out.Body != want {
|
||||
t.Errorf("body\n got = %q\nwant = %q", out.Body, want)
|
||||
}
|
||||
if len(fake.calls) != 1 || fake.calls[0] != "om_parent" {
|
||||
t.Errorf("expected one GetMessage(om_parent), got %v", fake.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichMergeForward covers the merge_forward example: the forwarded
|
||||
// transcript is fetched via GetMessage(forward_id) — whose items[] are
|
||||
// [sentinel, child…] — and inlined as a <forwarded_messages> block with
|
||||
// per-speaker labels. The four original lines must all be present.
|
||||
func TestEnrichMergeForward(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_forward"] = []LarkMessage{
|
||||
{MessageID: "om_forward", MessageType: "merge_forward", SenderID: "ou_bohan", SenderType: "user", Content: `{"content":"Merged and Forwarded Message"}`},
|
||||
textMsg("c1", "ou_jiayuan", "你们线上的 Multica 能用吗", "1000"),
|
||||
textMsg("c2", "ou_jiayuan", "我这边无法登录", "2000"),
|
||||
textMsg("c3", "ou_bohan", "我这边 web 和 desktop 都能登陆", "3000"),
|
||||
{MessageID: "c4", MessageType: "image", SenderID: "ou_jiayuan", SenderType: "user", Content: `{"image_key":"img_x"}`, CreateTime: "4000"},
|
||||
}
|
||||
in := InboundMessage{MessageType: "merge_forward", MessageID: "om_forward"}
|
||||
|
||||
out := enrich(t, fake, in, InboundEnricherConfig{})
|
||||
|
||||
want := `<forwarded_messages count="4">
|
||||
[User 1]: 你们线上的 Multica 能用吗
|
||||
[User 1]: 我这边无法登录
|
||||
[User 2]: 我这边 web 和 desktop 都能登陆
|
||||
[User 1]: [Image]
|
||||
</forwarded_messages>`
|
||||
if out.Body != want {
|
||||
t.Errorf("body\n got = %q\nwant = %q", out.Body, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichMergeForwardSortsByCreateTime ensures children are ordered
|
||||
// chronologically regardless of the order Lark returns them.
|
||||
func TestEnrichMergeForwardSortsByCreateTime(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_f"] = []LarkMessage{
|
||||
{MessageID: "om_f", MessageType: "merge_forward", SenderID: "ou_x", SenderType: "user"},
|
||||
textMsg("c2", "ou_a", "second", "2000"),
|
||||
textMsg("c1", "ou_a", "first", "1000"),
|
||||
textMsg("c3", "ou_a", "third", "3000"),
|
||||
}
|
||||
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{})
|
||||
first := strings.Index(out.Body, "first")
|
||||
second := strings.Index(out.Body, "second")
|
||||
third := strings.Index(out.Body, "third")
|
||||
if !(first < second && second < third) {
|
||||
t.Errorf("children not chronologically ordered: %q", out.Body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichMergeForwardCap truncates beyond the configured cap and
|
||||
// flags how many were dropped.
|
||||
func TestEnrichMergeForwardCap(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_f"] = []LarkMessage{
|
||||
{MessageID: "om_f", MessageType: "merge_forward", SenderID: "ou_x", SenderType: "user"},
|
||||
textMsg("c1", "ou_a", "one", "1000"),
|
||||
textMsg("c2", "ou_a", "two", "2000"),
|
||||
textMsg("c3", "ou_a", "three", "3000"),
|
||||
textMsg("c4", "ou_a", "four", "4000"),
|
||||
}
|
||||
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{MaxForwardChildren: 2})
|
||||
if !strings.Contains(out.Body, "... (2 more truncated)") {
|
||||
t.Errorf("expected truncation marker, got %q", out.Body)
|
||||
}
|
||||
if strings.Contains(out.Body, "three") || strings.Contains(out.Body, "four") {
|
||||
t.Errorf("over-cap children should be dropped, got %q", out.Body)
|
||||
}
|
||||
if !strings.Contains(out.Body, `count="4"`) {
|
||||
t.Errorf("count should reflect the true total, got %q", out.Body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichQuotedMergeForwardNests covers a quote-reply whose parent is
|
||||
// itself a merge_forward: the forwarded transcript renders INSIDE the
|
||||
// quoted block, using the same single GetMessage response.
|
||||
func TestEnrichQuotedMergeForwardNests(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_fwd"] = []LarkMessage{
|
||||
{MessageID: "om_fwd", MessageType: "merge_forward", SenderID: "ou_bohan", SenderType: "user"},
|
||||
textMsg("c1", "ou_a", "line A", "1000"),
|
||||
textMsg("c2", "ou_b", "line B", "2000"),
|
||||
}
|
||||
in := InboundMessage{MessageType: "text", MessageID: "om_child", Body: "see above", ParentID: "om_fwd"}
|
||||
out := enrich(t, fake, in, InboundEnricherConfig{})
|
||||
|
||||
if !strings.Contains(out.Body, `<quoted_message message_id="om_fwd" sender="User 1" type="merge_forward">`) {
|
||||
t.Errorf("missing quoted wrapper for merge_forward parent: %q", out.Body)
|
||||
}
|
||||
if !strings.Contains(out.Body, "<forwarded_messages count=\"2\">") {
|
||||
t.Errorf("forwarded block should nest inside quoted: %q", out.Body)
|
||||
}
|
||||
if !strings.Contains(out.Body, "line A") || !strings.Contains(out.Body, "line B") {
|
||||
t.Errorf("nested children missing: %q", out.Body)
|
||||
}
|
||||
if !strings.HasSuffix(out.Body, "see above") {
|
||||
t.Errorf("user prose should follow the quoted block: %q", out.Body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichNestedForwardChildIsPlaceholder bounds HTTP fan-out: a child
|
||||
// that is itself a forward is NOT recursed into.
|
||||
func TestEnrichNestedForwardChildIsPlaceholder(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_f"] = []LarkMessage{
|
||||
{MessageID: "om_f", MessageType: "merge_forward", SenderID: "ou_x", SenderType: "user"},
|
||||
textMsg("c1", "ou_a", "hello", "1000"),
|
||||
{MessageID: "c2", MessageType: "merge_forward", SenderID: "ou_a", SenderType: "user", CreateTime: "2000"},
|
||||
}
|
||||
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{})
|
||||
if !strings.Contains(out.Body, "[nested merge_forward, expand manually]") {
|
||||
t.Errorf("nested forward child should be a placeholder: %q", out.Body)
|
||||
}
|
||||
// Only the top forward should have been fetched.
|
||||
if len(fake.calls) != 1 {
|
||||
t.Errorf("expected exactly one GetMessage, got %v", fake.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichQuotedFetchFailureDegrades verifies a parent fetch failure
|
||||
// degrades to the documented error block and still keeps the user's
|
||||
// message — it must not block ingestion.
|
||||
func TestEnrichQuotedFetchFailureDegrades(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.errByID["om_gone"] = errors.New("not found")
|
||||
in := InboundMessage{MessageType: "text", MessageID: "om_child", Body: "ping", ParentID: "om_gone"}
|
||||
out := enrich(t, fake, in, InboundEnricherConfig{})
|
||||
|
||||
want := `<quoted_message message_id="om_gone" type="error">[unable to fetch]</quoted_message>
|
||||
|
||||
ping`
|
||||
if out.Body != want {
|
||||
t.Errorf("body\n got = %q\nwant = %q", out.Body, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnrichQuotedDeletedParentDegrades(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_del"] = []LarkMessage{{MessageID: "om_del", MessageType: "text", Deleted: true, SenderID: "ou_a", SenderType: "user"}}
|
||||
out := enrich(t, fake, InboundMessage{MessageType: "text", Body: "x", ParentID: "om_del"}, InboundEnricherConfig{})
|
||||
if !strings.Contains(out.Body, `type="error"`) {
|
||||
t.Errorf("deleted parent should degrade to error block: %q", out.Body)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEnrichForwardFetchFailureDegrades(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.errByID["om_f"] = errors.New("boom")
|
||||
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{})
|
||||
if out.Body != `<forwarded_messages type="error">[unable to fetch]</forwarded_messages>` {
|
||||
t.Errorf("forward fetch failure should degrade: %q", out.Body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichNoopWhenNothingAttached: a plain message (no parent, not a
|
||||
// forward) is returned untouched WITHOUT any network call.
|
||||
func TestEnrichNoopWhenNothingAttached(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
in := InboundMessage{MessageType: "text", MessageID: "om", Body: "hello"}
|
||||
out := enrich(t, fake, in, InboundEnricherConfig{})
|
||||
if out.Body != "hello" {
|
||||
t.Errorf("body should be unchanged, got %q", out.Body)
|
||||
}
|
||||
if len(fake.calls) != 0 {
|
||||
t.Errorf("no GetMessage should be issued, got %v", fake.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichSkipsWhenClientUnconfigured: with the stub/unconfigured
|
||||
// client we must not stamp a fetch error on every reply — skip silently.
|
||||
func TestEnrichSkipsWhenClientUnconfigured(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.configured = false
|
||||
in := InboundMessage{MessageType: "text", MessageID: "om", Body: "hi", ParentID: "om_parent"}
|
||||
out := enrich(t, fake, in, InboundEnricherConfig{})
|
||||
if out.Body != "hi" {
|
||||
t.Errorf("body should be unchanged when client unconfigured, got %q", out.Body)
|
||||
}
|
||||
if len(fake.calls) != 0 {
|
||||
t.Errorf("no GetMessage when unconfigured, got %v", fake.calls)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichPreservesCommandBodyForIssueParsing is the regression guard
|
||||
// for the quote-reply + /issue interaction: enrichment prepends a
|
||||
// <quoted_message> block (so the enriched Body no longer parses as a
|
||||
// command), but CommandBody is left untouched and still parses, so
|
||||
// `/issue` keeps working when typed as a quote-reply.
|
||||
func TestEnrichPreservesCommandBodyForIssueParsing(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_parent"] = []LarkMessage{textMsg("om_parent", "ou_j", "做个删除按钮", "1000")}
|
||||
in := InboundMessage{
|
||||
MessageType: "text",
|
||||
MessageID: "om_child",
|
||||
Body: "/issue 删除 issue 按钮",
|
||||
CommandBody: "/issue 删除 issue 按钮",
|
||||
ParentID: "om_parent",
|
||||
}
|
||||
out := enrich(t, fake, in, InboundEnricherConfig{})
|
||||
|
||||
// Enriched Body now starts with the quoted block → no longer a command.
|
||||
if _, ok := parseIssueCommand(out.Body); ok {
|
||||
t.Errorf("enriched Body should not parse as /issue (it is prefixed): %q", out.Body)
|
||||
}
|
||||
// CommandBody is untouched and still parses with the right title.
|
||||
cmd, ok := parseIssueCommand(out.CommandBody)
|
||||
if !ok || cmd.Title != "删除 issue 按钮" {
|
||||
t.Errorf("CommandBody should still parse /issue: cmd=%+v ok=%v", cmd, ok)
|
||||
}
|
||||
// And the quoted context did land in the stored Body.
|
||||
if !strings.Contains(out.Body, "做个删除按钮") {
|
||||
t.Errorf("quoted context missing from Body: %q", out.Body)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEnrichResolvesMentionsInChildren: @_user_N placeholders in a
|
||||
// forwarded child resolve to @name via that child's own mentions array.
|
||||
func TestEnrichResolvesMentionsInChildren(t *testing.T) {
|
||||
t.Parallel()
|
||||
fake := newEnricherFake()
|
||||
fake.byID["om_f"] = []LarkMessage{
|
||||
{MessageID: "om_f", MessageType: "merge_forward", SenderID: "ou_x", SenderType: "user"},
|
||||
{
|
||||
MessageID: "c1",
|
||||
MessageType: "text",
|
||||
Content: `{"text":"@_user_1 看一下"}`,
|
||||
SenderID: "ou_a",
|
||||
SenderType: "user",
|
||||
CreateTime: "1000",
|
||||
Mentions: []LarkMessageMention{{Key: "@_user_1", ID: "ou_alice", Name: "Alice"}},
|
||||
},
|
||||
}
|
||||
out := enrich(t, fake, InboundMessage{MessageType: "merge_forward", MessageID: "om_f"}, InboundEnricherConfig{})
|
||||
if !strings.Contains(out.Body, "@Alice 看一下") {
|
||||
t.Errorf("child mention not resolved: %q", out.Body)
|
||||
}
|
||||
}
|
||||
@@ -119,6 +119,9 @@ func (f *fakeAPIClient) SendBindingPromptCard(ctx context.Context, p BindingProm
|
||||
func (f *fakeAPIClient) GetBotInfo(ctx context.Context, creds InstallationCredentials) (BotInfo, error) {
|
||||
return BotInfo{}, nil
|
||||
}
|
||||
func (f *fakeAPIClient) GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func newTestPatcher(t *testing.T) (*Patcher, *fakePatcherQueries, *fakeAPIClient) {
|
||||
t.Helper()
|
||||
|
||||
@@ -72,6 +72,10 @@ func (s *stubAPIClientWithRecorder) GetBotInfo(ctx context.Context, creds Instal
|
||||
return BotInfo{}, nil
|
||||
}
|
||||
|
||||
func (s *stubAPIClientWithRecorder) GetMessage(ctx context.Context, creds InstallationCredentials, messageID string) ([]LarkMessage, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// stubCredentialsResolver returns a fixed plaintext secret.
|
||||
type stubCredentialsResolver struct{ secret string }
|
||||
|
||||
|
||||
@@ -88,6 +88,21 @@ type WSConnectorConfig struct {
|
||||
// should not tear down the entire connection.
|
||||
FrameDecoder FrameDecoder
|
||||
|
||||
// Enricher optionally expands a decoded message's body with the
|
||||
// context the user explicitly attached (quoted reply / forwarded
|
||||
// bundle) before it is emitted to the dispatcher. It runs on the
|
||||
// inbound read loop, so it is bounded by EnrichTimeout to protect
|
||||
// the Lark long-conn ACK budget; on timeout / fetch failure the
|
||||
// enricher degrades to a placeholder rather than blocking. Nil
|
||||
// disables enrichment (the decoded body is emitted as-is).
|
||||
Enricher Enricher
|
||||
|
||||
// EnrichTimeout caps a single message's enrichment (at most two
|
||||
// GetMessage calls). It MUST stay well under Lark's ~3s long-conn
|
||||
// ACK window, since enrichment runs before the frame is ACKed.
|
||||
// Zero defaults to 2 seconds.
|
||||
EnrichTimeout time.Duration
|
||||
|
||||
// CredentialsProvider returns the InstallationCredentials the
|
||||
// EndpointFetcher needs. Typically wraps
|
||||
// InstallationService.DecryptAppSecret so the plaintext secret
|
||||
@@ -138,6 +153,9 @@ func (c WSConnectorConfig) withDefaults() WSConnectorConfig {
|
||||
if c.ChunkTTL == 0 {
|
||||
c.ChunkTTL = 5 * time.Second
|
||||
}
|
||||
if c.EnrichTimeout == 0 {
|
||||
c.EnrichTimeout = 2 * time.Second
|
||||
}
|
||||
if c.Now == nil {
|
||||
c.Now = time.Now
|
||||
}
|
||||
@@ -363,6 +381,18 @@ func (c *WSLongConnConnector) Run(ctx context.Context, inst db.LarkInstallation,
|
||||
continue
|
||||
}
|
||||
|
||||
// Enrich the decoded body with explicitly-attached context
|
||||
// (quoted reply / forwarded bundle) before emitting. This runs
|
||||
// before the frame ACK, so it is bounded by EnrichTimeout and
|
||||
// degrades to a placeholder on failure rather than blocking the
|
||||
// pipeline. Most messages need no enrichment and return
|
||||
// immediately without any network call.
|
||||
if c.cfg.Enricher != nil {
|
||||
enrichCtx, cancelEnrich := context.WithTimeout(ctx, c.cfg.EnrichTimeout)
|
||||
msg = c.cfg.Enricher.Enrich(enrichCtx, msg, creds)
|
||||
cancelEnrich()
|
||||
}
|
||||
|
||||
_, emitErr := emit(ctx, msg)
|
||||
if emitErr != nil {
|
||||
// Infra failure from Dispatcher (DB down, etc.). NACK so
|
||||
|
||||
118
server/internal/integrations/lark/ws_connector_enrich_test.go
Normal file
118
server/internal/integrations/lark/ws_connector_enrich_test.go
Normal file
@@ -0,0 +1,118 @@
|
||||
package lark
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"log/slog"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
||||
)
|
||||
|
||||
// recordingEnricher captures what the connector hands it and rewrites
|
||||
// the body so the test can prove enrichment ran between decode and emit.
|
||||
type recordingEnricher struct {
|
||||
mu sync.Mutex
|
||||
msgs []InboundMessage
|
||||
creds []InstallationCredentials
|
||||
}
|
||||
|
||||
func (e *recordingEnricher) Enrich(ctx context.Context, msg InboundMessage, creds InstallationCredentials) InboundMessage {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.msgs = append(e.msgs, msg)
|
||||
e.creds = append(e.creds, creds)
|
||||
msg.Body = "ENRICHED:" + msg.Body
|
||||
return msg
|
||||
}
|
||||
|
||||
// TestWSConnectorEnrichesBeforeEmit verifies the connector runs the
|
||||
// Enricher on a decoded message — with the connection's resolved
|
||||
// credentials — before emitting it to the dispatcher.
|
||||
func TestWSConnectorEnrichesBeforeEmit(t *testing.T) {
|
||||
t.Parallel()
|
||||
conn := newFakeWSConn()
|
||||
decoder := FrameDecoderFunc(func(payload []byte, _ db.LarkInstallation) (InboundMessage, bool, error) {
|
||||
return InboundMessage{
|
||||
EventID: string(payload),
|
||||
AppID: "test_app",
|
||||
MessageID: "msg-" + string(payload),
|
||||
Body: "raw-" + string(payload),
|
||||
}, true, nil
|
||||
})
|
||||
enr := &recordingEnricher{}
|
||||
|
||||
c, err := NewWSLongConnConnector(WSConnectorConfig{
|
||||
Dialer: &fakeWSDialer{conn: conn},
|
||||
EndpointFetcher: EndpointFetcherFunc(func(context.Context, InstallationCredentials) (WSEndpoint, error) {
|
||||
return WSEndpoint{URL: "wss://test/ignored", ServiceID: 7, PingInterval: time.Hour}, nil
|
||||
}),
|
||||
FrameDecoder: decoder,
|
||||
Enricher: enr,
|
||||
CredentialsProvider: CredentialsProviderFunc(func(context.Context, db.LarkInstallation) (InstallationCredentials, error) {
|
||||
return InstallationCredentials{AppID: "test_app", AppSecret: "secret"}, nil
|
||||
}),
|
||||
PingInterval: time.Hour,
|
||||
ReadDeadline: time.Second,
|
||||
WriteTimeout: time.Second,
|
||||
EnrichTimeout: time.Second,
|
||||
Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("NewWSLongConnConnector: %v", err)
|
||||
}
|
||||
|
||||
var emitted []InboundMessage
|
||||
var emitMu sync.Mutex
|
||||
emit := func(_ context.Context, msg InboundMessage) (DispatchResult, error) {
|
||||
emitMu.Lock()
|
||||
emitted = append(emitted, msg)
|
||||
emitMu.Unlock()
|
||||
return DispatchResult{Outcome: OutcomeIngested}, nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
done := make(chan error, 1)
|
||||
go func() { done <- c.Run(ctx, db.LarkInstallation{AppID: "test_app"}, emit) }()
|
||||
|
||||
pushDataFrame(conn, []byte("evt-1"), "m1")
|
||||
|
||||
deadline := time.After(2 * time.Second)
|
||||
for {
|
||||
emitMu.Lock()
|
||||
n := len(emitted)
|
||||
emitMu.Unlock()
|
||||
if n >= 1 {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-deadline:
|
||||
t.Fatal("no emit within 2s")
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Run did not return after cancel")
|
||||
}
|
||||
|
||||
emitMu.Lock()
|
||||
defer emitMu.Unlock()
|
||||
if emitted[0].Body != "ENRICHED:raw-evt-1" {
|
||||
t.Errorf("emit body = %q; enricher did not run before emit", emitted[0].Body)
|
||||
}
|
||||
|
||||
enr.mu.Lock()
|
||||
defer enr.mu.Unlock()
|
||||
if len(enr.msgs) != 1 || enr.msgs[0].Body != "raw-evt-1" {
|
||||
t.Errorf("enricher received %+v", enr.msgs)
|
||||
}
|
||||
if len(enr.creds) != 1 || enr.creds[0].AppID != "test_app" || enr.creds[0].AppSecret != "secret" {
|
||||
t.Errorf("enricher got wrong creds: %+v", enr.creds)
|
||||
}
|
||||
}
|
||||
@@ -74,6 +74,13 @@ func (d *LarkJSONFrameDecoder) Decode(payload []byte, inst db.LarkInstallation)
|
||||
ChatType: normalizeChatType(evt.Message.ChatType),
|
||||
MessageID: evt.Message.MessageID,
|
||||
SenderOpenID: OpenID(evt.Sender.SenderID.OpenID),
|
||||
MessageType: evt.Message.MessageType,
|
||||
// parent_id / root_id are populated by Lark only in reply
|
||||
// scenarios. The enricher keys quoted-reply expansion off
|
||||
// ParentID (the directly quoted message); RootID is carried for
|
||||
// completeness / future thread handling.
|
||||
ParentID: evt.Message.ParentID,
|
||||
RootID: evt.Message.RootID,
|
||||
}
|
||||
|
||||
botUnionID := ""
|
||||
@@ -81,12 +88,24 @@ func (d *LarkJSONFrameDecoder) Decode(payload []byte, inst db.LarkInstallation)
|
||||
botUnionID = inst.BotUnionID.String
|
||||
}
|
||||
|
||||
// text + post are flattened synchronously here (no external calls —
|
||||
// the decoder must stay fast and dependency-free). merge_forward
|
||||
// leaves Body empty: it needs an HTTP round-trip to expand and is
|
||||
// handled downstream by the enricher, which keys off MessageType.
|
||||
// Other types (image, file, …) also leave Body empty in this MVP;
|
||||
// attachment ingestion is a separate issue.
|
||||
switch evt.Message.MessageType {
|
||||
case "text":
|
||||
msg.Body = resolveMentions(extractTextBody(evt.Message.Content),
|
||||
case "text", "post":
|
||||
msg.Body = resolveMentions(flattenContent(evt.Message.MessageType, evt.Message.Content),
|
||||
evt.Message.Mentions, inst.BotOpenID, botUnionID)
|
||||
}
|
||||
|
||||
// Snapshot the user's own text as the command source BEFORE any
|
||||
// enrichment runs. The enricher rewrites Body (prepending quoted /
|
||||
// forwarded context) but never touches CommandBody, so `/issue …`
|
||||
// is still parsed against what the user actually typed.
|
||||
msg.CommandBody = msg.Body
|
||||
|
||||
if msg.ChatType == ChatTypeGroup {
|
||||
msg.AddressedToBot = containsMention(evt.Message.Mentions, inst.BotOpenID, botUnionID)
|
||||
}
|
||||
@@ -130,6 +149,11 @@ type larkMessageReceiveEvent struct {
|
||||
Content string `json:"content"`
|
||||
Mentions []larkMention `json:"mentions"`
|
||||
CreateTime string `json:"create_time"`
|
||||
// ParentID / RootID are only present when the message is a
|
||||
// reply / quote. ParentID is the directly quoted message;
|
||||
// RootID is the root of the reply tree.
|
||||
ParentID string `json:"parent_id"`
|
||||
RootID string `json:"root_id"`
|
||||
} `json:"message"`
|
||||
}
|
||||
|
||||
|
||||
@@ -451,3 +451,112 @@ func TestLarkJSONFrameDecoderNonTextMessageHasEmptyBody(t *testing.T) {
|
||||
t.Error("MessageID should still be populated for non-text events")
|
||||
}
|
||||
}
|
||||
|
||||
// TestLarkJSONFrameDecoderPostMessageFlattened verifies that a rich-text
|
||||
// `post` message is flattened to plain text end-to-end through Decode —
|
||||
// the MUL-2951 example. Body.content is the JSON-encoded post object; we
|
||||
// marshal a Go string to get the correctly-escaped content field.
|
||||
func TestLarkJSONFrameDecoderPostMessageFlattened(t *testing.T) {
|
||||
t.Parallel()
|
||||
postContent := `{"title":"周报","content":[[{"tag":"text","text":"本周完成:"}],[{"tag":"text","text":"Lark 集成"},{"tag":"a","href":"https://github.com/multica-ai/multica/pull/3277","text":"PR #3277"}]]}`
|
||||
escaped, err := json.Marshal(postContent)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal: %v", err)
|
||||
}
|
||||
raw := []byte(`{
|
||||
"type":"event_callback",
|
||||
"header":{"event_id":"e","event_type":"im.message.receive_v1","app_id":"a"},
|
||||
"event":{
|
||||
"sender":{"sender_id":{"open_id":"ou_user"}},
|
||||
"message":{"message_id":"m","chat_id":"c","chat_type":"p2p","message_type":"post","content":` + string(escaped) + `}
|
||||
}
|
||||
}`)
|
||||
msg, ok, err := NewLarkJSONFrameDecoder().Decode(raw, db.LarkInstallation{BotOpenID: "ou_bot"})
|
||||
if err != nil || !ok {
|
||||
t.Fatalf("Decode ok=%v err=%v", ok, err)
|
||||
}
|
||||
want := "周报\n本周完成:\nLark 集成 PR #3277 (https://github.com/multica-ai/multica/pull/3277)"
|
||||
if msg.Body != want {
|
||||
t.Errorf("post Body\n got = %q\nwant = %q", msg.Body, want)
|
||||
}
|
||||
if msg.MessageType != "post" {
|
||||
t.Errorf("MessageType = %q want post", msg.MessageType)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLarkJSONFrameDecoderPostResolvesMentions checks that @-mentions in
|
||||
// a post (carried as `at` spans with @_user_N placeholders) are resolved
|
||||
// through the same mention pipeline as text, including stripping the
|
||||
// bot's own mention.
|
||||
func TestLarkJSONFrameDecoderPostResolvesMentions(t *testing.T) {
|
||||
t.Parallel()
|
||||
postContent := `{"content":[[{"tag":"at","user_id":"@_user_1","user_name":""},{"tag":"text","text":"please review"},{"tag":"at","user_id":"@_user_2","user_name":""}]]}`
|
||||
escaped, err := json.Marshal(postContent)
|
||||
if err != nil {
|
||||
t.Fatalf("marshal: %v", err)
|
||||
}
|
||||
raw := []byte(`{
|
||||
"type":"event_callback",
|
||||
"header":{"event_id":"e","event_type":"im.message.receive_v1","app_id":"a"},
|
||||
"event":{
|
||||
"sender":{"sender_id":{"open_id":"ou_user"}},
|
||||
"message":{
|
||||
"message_id":"m","chat_id":"c","chat_type":"group","message_type":"post",
|
||||
"content":` + string(escaped) + `,
|
||||
"mentions":[
|
||||
{"key":"@_user_1","id":{"open_id":"ou_bot"},"name":"Bot"},
|
||||
{"key":"@_user_2","id":{"open_id":"ou_alice"},"name":"Alice"}
|
||||
]
|
||||
}
|
||||
}
|
||||
}`)
|
||||
msg, ok, err := NewLarkJSONFrameDecoder().Decode(raw, db.LarkInstallation{BotOpenID: "ou_bot"})
|
||||
if err != nil || !ok {
|
||||
t.Fatalf("Decode ok=%v err=%v", ok, err)
|
||||
}
|
||||
// @_user_1 is the bot → stripped; @_user_2 → @Alice.
|
||||
want := "please review @Alice"
|
||||
if msg.Body != want {
|
||||
t.Errorf("post Body\n got = %q\nwant = %q", msg.Body, want)
|
||||
}
|
||||
if !msg.AddressedToBot {
|
||||
t.Error("AddressedToBot should be true (bot was @-mentioned)")
|
||||
}
|
||||
}
|
||||
|
||||
// TestLarkJSONFrameDecoderCapturesReplyLinkage verifies parent_id /
|
||||
// root_id from a quote-reply event land on the InboundMessage so the
|
||||
// enricher can expand them.
|
||||
func TestLarkJSONFrameDecoderCapturesReplyLinkage(t *testing.T) {
|
||||
t.Parallel()
|
||||
raw := []byte(`{
|
||||
"type":"event_callback",
|
||||
"header":{"event_id":"e","event_type":"im.message.receive_v1","app_id":"a"},
|
||||
"event":{
|
||||
"sender":{"sender_id":{"open_id":"ou_user"}},
|
||||
"message":{
|
||||
"message_id":"om_child","chat_id":"c","chat_type":"group","message_type":"text",
|
||||
"content":"{\"text\":\"去实现\"}",
|
||||
"parent_id":"om_parent","root_id":"om_root"
|
||||
}
|
||||
}
|
||||
}`)
|
||||
msg, ok, err := NewLarkJSONFrameDecoder().Decode(raw, db.LarkInstallation{BotOpenID: "ou_bot"})
|
||||
if err != nil || !ok {
|
||||
t.Fatalf("Decode ok=%v err=%v", ok, err)
|
||||
}
|
||||
if msg.ParentID != "om_parent" {
|
||||
t.Errorf("ParentID = %q want om_parent", msg.ParentID)
|
||||
}
|
||||
if msg.RootID != "om_root" {
|
||||
t.Errorf("RootID = %q want om_root", msg.RootID)
|
||||
}
|
||||
if msg.MessageType != "text" {
|
||||
t.Errorf("MessageType = %q want text", msg.MessageType)
|
||||
}
|
||||
// CommandBody snapshots the user's own text (pre-enrichment) so
|
||||
// /issue parsing survives the enricher's prepended context blocks.
|
||||
if msg.CommandBody != "去实现" {
|
||||
t.Errorf("CommandBody = %q want 去实现", msg.CommandBody)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user