mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
When a message is successfully ingested, send a Typing reaction to the user's message. When the agent replies (EventChatDone) or fails (EventTaskFailed), clear the reaction before the reply is visible. - Add AddMessageReaction / DeleteMessageReaction to APIClient - Implement reaction HTTP calls in httpAPIClient - Introduce TypingIndicatorManager for per-session state tracking - Wire into Hub (add on ingest) and Patcher (clear before reply) - Skip typing for messages older than 2 minutes (WS replay guard) Co-authored-by: miaolong001 <miaolong@xd.com>
491 lines
17 KiB
Go
491 lines
17 KiB
Go
package lark
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
"github.com/multica-ai/multica/server/internal/events"
|
|
db "github.com/multica-ai/multica/server/pkg/db/generated"
|
|
"github.com/multica-ai/multica/server/pkg/protocol"
|
|
)
|
|
|
|
// CardStatus mirrors lark_outbound_card_message.status. Kept as a typed
|
|
// alias so callers can't pass arbitrary strings into the status column.
|
|
type CardStatus string
|
|
|
|
const (
|
|
CardStatusPending CardStatus = "pending"
|
|
CardStatusStreaming CardStatus = "streaming"
|
|
CardStatusFinal CardStatus = "final"
|
|
CardStatusError CardStatus = "error"
|
|
)
|
|
|
|
// CardKind enumerates the small set of card variants the patcher
|
|
// renders. The Renderer is plug-replaceable so the on-wire card
|
|
// template can evolve without touching the patcher's transport / DB
|
|
// logic.
|
|
type CardKind string
|
|
|
|
const (
|
|
CardKindThinking CardKind = "thinking"
|
|
CardKindRunning CardKind = "running"
|
|
CardKindFinal CardKind = "final"
|
|
CardKindError CardKind = "error"
|
|
)
|
|
|
|
// CardRender is the rendered card body the Renderer produces. The
|
|
// patcher serializes the JSON before handing it to APIClient.
|
|
type CardRender struct {
|
|
JSON string
|
|
}
|
|
|
|
// RenderInput is the (typed) snapshot the Renderer sees when building
|
|
// or patching a card. Fields are populated as they become available
|
|
// during a task lifecycle — IssueNumber is set for `/issue` flows,
|
|
// Content is set for completed chat tasks, ErrorMessage for failed.
|
|
type RenderInput struct {
|
|
Kind CardKind
|
|
AgentName string
|
|
IssueNumber int32
|
|
IssueID pgtype.UUID
|
|
TaskID pgtype.UUID
|
|
Content string
|
|
ErrorMessage string
|
|
}
|
|
|
|
// Renderer turns a typed RenderInput into the actual Lark card JSON.
|
|
// Centralizing this lets us swap card templates (or A/B them) without
|
|
// touching event subscription or persistence code.
|
|
type Renderer interface {
|
|
Render(in RenderInput) (CardRender, error)
|
|
}
|
|
|
|
// defaultRenderer produces minimal text-only cards that work against
|
|
// Lark's generic interactive-card schema. The exact JSON layout will
|
|
// be refined when the real product card design lands; this default
|
|
// keeps the wiring real (the JSON deserializes against Lark's schema)
|
|
// without committing the product to a particular template.
|
|
type defaultRenderer struct{}
|
|
|
|
// NewDefaultRenderer returns the production-default Renderer. Override
|
|
// via PatcherConfig.Renderer when a custom template is needed.
|
|
func NewDefaultRenderer() Renderer { return &defaultRenderer{} }
|
|
|
|
func (defaultRenderer) Render(in RenderInput) (CardRender, error) {
|
|
header := "Multica"
|
|
if in.AgentName != "" {
|
|
header = in.AgentName
|
|
}
|
|
var body string
|
|
switch in.Kind {
|
|
case CardKindThinking:
|
|
body = "Thinking…"
|
|
case CardKindRunning:
|
|
body = "Working on it…"
|
|
case CardKindFinal:
|
|
body = in.Content
|
|
if body == "" {
|
|
body = "Done."
|
|
}
|
|
case CardKindError:
|
|
body = "Run failed."
|
|
if in.ErrorMessage != "" {
|
|
body = "Run failed: " + in.ErrorMessage
|
|
}
|
|
default:
|
|
return CardRender{}, fmt.Errorf("unknown card kind %q", in.Kind)
|
|
}
|
|
// update_multi MUST be true on every render: Lark refuses to apply
|
|
// PatchInteractiveCard to a card whose config does not declare it
|
|
// a "shared, updatable" card. Since this renderer drives the
|
|
// thinking → streaming → final/error lifecycle (the card is sent
|
|
// once and patched multiple times), an absent update_multi causes
|
|
// every patch after the first send to silently no-op on the
|
|
// Lark side while the local outbound status row still flips to
|
|
// streaming/final. Keep this on every kind — including thinking
|
|
// and error — because that initial JSON IS the body Lark stores
|
|
// and consults for subsequent patches.
|
|
doc := map[string]any{
|
|
"config": map[string]any{
|
|
"wide_screen_mode": true,
|
|
"update_multi": true,
|
|
},
|
|
"header": map[string]any{
|
|
"template": "blue",
|
|
"title": map[string]any{"tag": "plain_text", "content": header},
|
|
},
|
|
"elements": []any{
|
|
map[string]any{
|
|
"tag": "div",
|
|
"text": map[string]any{
|
|
"tag": "plain_text",
|
|
"content": body,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
raw, err := json.Marshal(doc)
|
|
if err != nil {
|
|
return CardRender{}, err
|
|
}
|
|
return CardRender{JSON: string(raw)}, nil
|
|
}
|
|
|
|
// PatcherQueries is the narrow subset of *db.Queries the Patcher
|
|
// needs. Declared as an interface so the patcher is unit-testable
|
|
// without a real Postgres connection.
|
|
type PatcherQueries interface {
|
|
GetAgentTask(ctx context.Context, id pgtype.UUID) (db.AgentTaskQueue, error)
|
|
GetChatSession(ctx context.Context, id pgtype.UUID) (db.ChatSession, error)
|
|
GetAgent(ctx context.Context, id pgtype.UUID) (db.Agent, error)
|
|
GetLarkInstallation(ctx context.Context, id pgtype.UUID) (db.LarkInstallation, error)
|
|
GetLarkChatSessionBindingBySession(ctx context.Context, chatSessionID pgtype.UUID) (db.LarkChatSessionBinding, error)
|
|
GetLarkOutboundCardByTask(ctx context.Context, taskID pgtype.UUID) (db.LarkOutboundCardMessage, error)
|
|
CreateLarkOutboundCardMessage(ctx context.Context, arg db.CreateLarkOutboundCardMessageParams) (db.LarkOutboundCardMessage, error)
|
|
UpdateLarkOutboundCardStatus(ctx context.Context, arg db.UpdateLarkOutboundCardStatusParams) error
|
|
}
|
|
|
|
// CredentialsResolver decrypts an installation's app_secret for the
|
|
// transport layer. *InstallationService satisfies it directly; tests
|
|
// substitute a fake.
|
|
type CredentialsResolver interface {
|
|
DecryptAppSecret(inst db.LarkInstallation) (string, error)
|
|
}
|
|
|
|
// PatcherConfig tunes the outbound Patcher. Defaults via withDefaults;
|
|
// tests typically override Renderer / Now / Logger.
|
|
type PatcherConfig struct {
|
|
// Renderer drives the error card template used on the EventTaskFailed
|
|
// path. The success path (EventChatDone) bypasses the renderer
|
|
// entirely — it sends the raw assistant reply as a plain text IM
|
|
// message — so this only matters for the failure branch.
|
|
Renderer Renderer
|
|
Now func() time.Time
|
|
Logger *slog.Logger
|
|
}
|
|
|
|
func (c PatcherConfig) withDefaults() PatcherConfig {
|
|
if c.Renderer == nil {
|
|
c.Renderer = NewDefaultRenderer()
|
|
}
|
|
if c.Now == nil {
|
|
c.Now = time.Now
|
|
}
|
|
if c.Logger == nil {
|
|
c.Logger = slog.Default()
|
|
}
|
|
return c
|
|
}
|
|
|
|
// Patcher reacts to task-lifecycle events on the event bus and forwards
|
|
// chat replies to Lark as plain text IM messages. It is the outbound
|
|
// side of §4.5 — but the original "thinking → streaming → final card"
|
|
// lifecycle was reduced to a single plain-text reply on EventChatDone
|
|
// after Bohan reported the card chrome made replies feel like system
|
|
// notifications. The error path is the one survivor of card rendering:
|
|
// failed runs surface as a short error card on EventTaskFailed because
|
|
// the visual distinction from a normal reply is genuinely useful.
|
|
//
|
|
// Scope:
|
|
//
|
|
// - Only tasks whose chat_session has a lark_chat_session_binding
|
|
// produce outbound. Tasks born from the web UI or autopilot pass
|
|
// through unchanged.
|
|
//
|
|
// - Each EventChatDone yields one Lark text message; there is no
|
|
// streaming, no throttling, no DB row to track card-state.
|
|
//
|
|
// - Multi-replica safety is inherited from the inbound WS lease: at
|
|
// most one replica holds the installation lease at a time, the
|
|
// event bus is per-process, so exactly one Patcher reacts per run.
|
|
type Patcher struct {
|
|
queries PatcherQueries
|
|
credentials CredentialsResolver
|
|
client APIClient
|
|
typingIndicator *TypingIndicatorManager
|
|
cfg PatcherConfig
|
|
}
|
|
|
|
// NewPatcher constructs a Patcher bound to its dependencies. The
|
|
// patcher does not subscribe to the bus until Register is called.
|
|
func NewPatcher(queries PatcherQueries, credentials CredentialsResolver, client APIClient, cfg PatcherConfig) *Patcher {
|
|
cfg = cfg.withDefaults()
|
|
return &Patcher{
|
|
queries: queries,
|
|
credentials: credentials,
|
|
client: client,
|
|
cfg: cfg,
|
|
}
|
|
}
|
|
|
|
// SetTypingIndicatorManager wires the typing-indicator manager into the
|
|
// patcher so that replies clear the "processing" reaction before they
|
|
// are sent. Call once at boot after both the patcher and manager are
|
|
// constructed. Nil disables the clear step.
|
|
func (p *Patcher) SetTypingIndicatorManager(m *TypingIndicatorManager) {
|
|
p.typingIndicator = m
|
|
}
|
|
|
|
// Register subscribes the patcher to the task-lifecycle events it
|
|
// cares about on the supplied bus. Idempotent only if you call it
|
|
// against a fresh bus; call sites should invoke it exactly once
|
|
// during server boot (after the bus + patcher are constructed and
|
|
// before HTTP traffic starts).
|
|
//
|
|
// Subscriptions are deliberately minimal:
|
|
//
|
|
// - EventChatDone — the agent finished replying. The Patcher sends
|
|
// the reply as a plain text IM message (Lark's `msg_type=text`),
|
|
// not as an interactive card. The earlier card-based design (with
|
|
// thinking → running → final patches) made every reply look like
|
|
// a system notification nested in card chrome; flipping to plain
|
|
// text makes free-form chat feel native.
|
|
//
|
|
// - EventTaskFailed — the run failed; surface a short error card
|
|
// so the failure is visually distinct from a successful reply.
|
|
//
|
|
// We deliberately do NOT subscribe to EventTaskQueued / EventTaskRunning
|
|
// (no thinking-card lifecycle anymore — adds noise without value) or to
|
|
// EventTaskCompleted (chat tasks always emit EventChatDone first, which
|
|
// is what we care about; non-chat tasks have no Lark binding anyway and
|
|
// would early-return). Leaving EventTaskCompleted unsubscribed also
|
|
// avoids the prior "Done." overwrite regression where the no-content
|
|
// EventTaskCompleted payload would wipe the real reply.
|
|
func (p *Patcher) Register(bus *events.Bus) {
|
|
bus.Subscribe(protocol.EventTaskFailed, p.handleEvent)
|
|
bus.Subscribe(protocol.EventChatDone, p.handleEvent)
|
|
}
|
|
|
|
func (p *Patcher) handleEvent(e events.Event) {
|
|
// Use a fresh background ctx with a tight timeout: bus delivery is
|
|
// synchronous so a stuck Lark HTTP call would otherwise wedge the
|
|
// whole publish call site.
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
if err := p.processEvent(ctx, e); err != nil {
|
|
p.cfg.Logger.Warn("lark patcher: event handling failed",
|
|
"event_type", e.Type,
|
|
"task_id", e.TaskID,
|
|
"chat_session_id", e.ChatSessionID,
|
|
"error", err,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (p *Patcher) processEvent(ctx context.Context, e events.Event) error {
|
|
taskID, chatSessionID, ok := taskAndSessionFromEvent(e)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
if !chatSessionID.Valid {
|
|
// Issue / autopilot tasks have no chat_session.
|
|
return nil
|
|
}
|
|
|
|
binding, err := p.queries.GetLarkChatSessionBindingBySession(ctx, chatSessionID)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
// Web-only chat session — not a Lark target.
|
|
return nil
|
|
}
|
|
return fmt.Errorf("lookup chat session binding: %w", err)
|
|
}
|
|
|
|
inst, err := p.queries.GetLarkInstallation(ctx, binding.InstallationID)
|
|
if err != nil {
|
|
return fmt.Errorf("load installation: %w", err)
|
|
}
|
|
if InstallationStatus(inst.Status) != InstallationActive {
|
|
// Revoked between trigger and event; nothing to patch.
|
|
return nil
|
|
}
|
|
creds, err := p.installationCredentials(inst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
agent, agentErr := p.queries.GetAgent(ctx, inst.AgentID)
|
|
agentName := ""
|
|
if agentErr == nil {
|
|
agentName = agent.Name
|
|
}
|
|
|
|
// Clear the "processing" reaction before the reply is visible so the
|
|
// user sees a clean transition. Best-effort: a failure here is logged
|
|
// but does not block the actual reply.
|
|
if p.typingIndicator != nil {
|
|
p.typingIndicator.Clear(ctx, chatSessionID)
|
|
}
|
|
|
|
switch e.Type {
|
|
case protocol.EventChatDone:
|
|
return p.sendChatReply(ctx, creds, binding, e.Payload)
|
|
case protocol.EventTaskFailed:
|
|
return p.fail(ctx, creds, binding, taskID, agentName, e.Payload)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// sendChatReply turns ChatDonePayload.Content into a Lark message.
|
|
// The wire shape is chosen per-reply based on whether the body
|
|
// contains any markdown syntax:
|
|
//
|
|
// - Plain prose (no markdown) → `msg_type=text`. A one-line "Hi!"
|
|
// reply should feel like a normal IM message, not a notification
|
|
// card with chrome around it.
|
|
//
|
|
// - Anything with markdown (headings, lists, code blocks, tables,
|
|
// bold/italic, links) → schema-2.0 interactive card with a
|
|
// `tag: "markdown"` body element so Lark's client renders the
|
|
// formatting instead of leaving raw `**bold**` characters in
|
|
// the transcript. The card is visually subtler than the legacy
|
|
// binding-prompt template — just a single markdown block, no
|
|
// header / icon / CTA buttons.
|
|
//
|
|
// Empty content is silently dropped: we'd rather show nothing than
|
|
// "Done." (the prior card fallback that confused Bohan in the live
|
|
// dev env). In practice an empty Content means the daemon completed
|
|
// the task without producing visible output, which only happens for
|
|
// edge cases like a chat task that just acknowledged a system event;
|
|
// not emitting a message there is the right product call.
|
|
func (p *Patcher) sendChatReply(ctx context.Context, creds InstallationCredentials, binding db.LarkChatSessionBinding, payload any) error {
|
|
content := chatDoneContent(payload)
|
|
if content == "" {
|
|
return nil
|
|
}
|
|
if containsMarkdown(content) {
|
|
if _, err := p.client.SendMarkdownCard(ctx, SendMarkdownCardParams{
|
|
InstallationID: creds,
|
|
ChatID: ChatID(binding.LarkChatID),
|
|
Markdown: content,
|
|
}); err != nil {
|
|
return fmt.Errorf("send markdown card: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
if _, err := p.client.SendTextMessage(ctx, SendTextParams{
|
|
InstallationID: creds,
|
|
ChatID: ChatID(binding.LarkChatID),
|
|
Text: content,
|
|
}); err != nil {
|
|
return fmt.Errorf("send text message: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *Patcher) installationCredentials(inst db.LarkInstallation) (InstallationCredentials, error) {
|
|
if p.credentials == nil {
|
|
return InstallationCredentials{}, errors.New("lark patcher: credentials resolver missing")
|
|
}
|
|
secret, err := p.credentials.DecryptAppSecret(inst)
|
|
if err != nil {
|
|
return InstallationCredentials{}, fmt.Errorf("decrypt app_secret: %w", err)
|
|
}
|
|
creds := InstallationCredentials{
|
|
AppID: inst.AppID,
|
|
AppSecret: secret,
|
|
Region: RegionOrDefault(inst.Region),
|
|
}
|
|
if inst.TenantKey.Valid {
|
|
creds.TenantKey = inst.TenantKey.String
|
|
}
|
|
return creds, nil
|
|
}
|
|
|
|
// fail surfaces a short error card on task failure. Unlike the
|
|
// success path (plain text via sendChatReply), failures stay as cards
|
|
// because the user benefits from the visual distinction — a red /
|
|
// header-styled card is much harder to miss than a regular bubble,
|
|
// and these are rare enough that the card chrome isn't noisy.
|
|
//
|
|
// One-shot send (no patching, no DB row): if the task fails a second
|
|
// time we'd just send a second card, which is fine — failure is
|
|
// usually a single terminal event.
|
|
func (p *Patcher) fail(ctx context.Context, creds InstallationCredentials, binding db.LarkChatSessionBinding, taskID pgtype.UUID, agentName string, payload any) error {
|
|
render, err := p.cfg.Renderer.Render(RenderInput{
|
|
Kind: CardKindError,
|
|
AgentName: agentName,
|
|
TaskID: taskID,
|
|
ErrorMessage: errorMessageFromPayload(payload),
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("render error card: %w", err)
|
|
}
|
|
if _, err := p.client.SendInteractiveCard(ctx, SendCardParams{
|
|
InstallationID: creds,
|
|
ChatID: ChatID(binding.LarkChatID),
|
|
CardJSON: render.JSON,
|
|
}); err != nil {
|
|
return fmt.Errorf("send error card: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// taskAndSessionFromEvent parses the typed-ish payload broadcastTaskEvent
|
|
// publishes — a map[string]any with `task_id` (always) and
|
|
// `chat_session_id` (chat tasks only). EventChatDone carries a
|
|
// ChatDonePayload struct instead.
|
|
func taskAndSessionFromEvent(e events.Event) (taskID, chatSessionID pgtype.UUID, ok bool) {
|
|
if e.TaskID != "" {
|
|
if err := taskID.Scan(e.TaskID); err != nil {
|
|
taskID = pgtype.UUID{}
|
|
}
|
|
}
|
|
if e.ChatSessionID != "" {
|
|
if err := chatSessionID.Scan(e.ChatSessionID); err != nil {
|
|
chatSessionID = pgtype.UUID{}
|
|
}
|
|
}
|
|
switch p := e.Payload.(type) {
|
|
case map[string]any:
|
|
if !taskID.Valid {
|
|
if s, _ := p["task_id"].(string); s != "" {
|
|
_ = taskID.Scan(s)
|
|
}
|
|
}
|
|
if !chatSessionID.Valid {
|
|
if s, _ := p["chat_session_id"].(string); s != "" {
|
|
_ = chatSessionID.Scan(s)
|
|
}
|
|
}
|
|
case protocol.ChatDonePayload:
|
|
if !taskID.Valid {
|
|
_ = taskID.Scan(p.TaskID)
|
|
}
|
|
if !chatSessionID.Valid {
|
|
_ = chatSessionID.Scan(p.ChatSessionID)
|
|
}
|
|
}
|
|
return taskID, chatSessionID, taskID.Valid
|
|
}
|
|
|
|
func chatDoneContent(payload any) string {
|
|
switch p := payload.(type) {
|
|
case protocol.ChatDonePayload:
|
|
return p.Content
|
|
case map[string]any:
|
|
if s, ok := p["content"].(string); ok {
|
|
return s
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func errorMessageFromPayload(payload any) string {
|
|
if m, ok := payload.(map[string]any); ok {
|
|
if s, ok := m["error"].(string); ok {
|
|
return s
|
|
}
|
|
if s, ok := m["error_message"].(string); ok {
|
|
return s
|
|
}
|
|
}
|
|
return ""
|
|
}
|