Compare commits

...

1 Commits

Author SHA1 Message Date
J
a58bfdd5c1 feat(lark): debounce inbound run trigger per chat session (MUL-2968)
A forwarded transcript plus a follow-up note arrive as two separate Lark
messages, each of which synchronously called EnqueueChatTask — so the bot
ran twice (once on the bare forward, before the note arrived). The chat
task already reads the whole session history at run time, so the messages
never needed stitching; only the run TRIGGER did.

Introduce pendingBatcher: a per-chat_session debouncer that collapses a
burst into one agent run on a 3s silence window. Each message is still
appended, deduped, and ACKed synchronously and individually; step 8 of the
dispatcher now schedules a debounced flush instead of enqueuing inline.

Because EnqueueChatTask's agent-offline / agent-archived verdict is now
only known at flush, the dispatcher emits that notice itself via an
injected FlushReply (wired to OutcomeReplier.Reply) rather than returning
it synchronously to the hub. Infra failures are logged, not surfaced — the
inbound frame was ACKed long ago. The hub drains the batcher on graceful
shutdown so a normal restart does not drop a pending window.

Out of scope (owner-aligned): group-chat multi-speaker batching, restart
recovery for the in-process window, and forwarded-sender real-name
resolution.

Co-authored-by: multica-agent <github@multica.ai>
2026-06-04 12:39:32 +08:00
6 changed files with 689 additions and 58 deletions

View File

@@ -235,7 +235,13 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
Audit: auditLogger,
IssueService: h.IssueService,
TaskService: h.TaskService,
Logger: slog.Default(),
}
// Debounce the per-session run trigger so a burst of
// messages (e.g. "forward a transcript, then type a note")
// collapses into one agent run instead of one per message.
// MUL-2968.
dispatcher.EnableRunBatching(lark.DefaultChatRunBatchWindow)
// WS Hub: lease + supervisor goroutines per installation.
// The WSLongConnConnector talks Lark's long-conn protocol
@@ -271,6 +277,11 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
Logger: slog.Default(),
})
h.LarkHub.SetOutcomeReplier(replier)
// The agent-offline / agent-archived notice is now decided
// at debounce-flush time rather than synchronously from
// Handle, so the dispatcher drives that reply itself through
// the same replier. MUL-2968.
dispatcher.FlushReply = replier.Reply
slog.Info("lark inbound pipeline wired", "connector", connectorLabel)
// One-shot union_id backfill for installations created

View File

@@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
@@ -111,9 +113,15 @@ type DispatchResult struct {
InstallationID pgtype.UUID
ChatSessionID pgtype.UUID
SenderOpenID OpenID
TaskID pgtype.UUID
IssueID pgtype.UUID
IssueNumber int32
// TaskID was populated when the dispatcher enqueued the chat task
// synchronously. With the short-window debounce (MUL-2968) the run is
// triggered asynchronously at flush time, so Handle no longer knows a
// task id — this field is left zero for the chat path. Kept on the
// struct because the emit contract still carries it for any future
// synchronous enqueue (e.g. /issue follow-ups).
TaskID pgtype.UUID
IssueID pgtype.UUID
IssueNumber int32
// IssueIdentifier is the workspace-qualified human key for the
// created issue ("MUL-42"). Populated only when /issue produced a
// new row. The OutcomeReplier uses this verbatim in the "Created
@@ -191,6 +199,63 @@ type Dispatcher struct {
Audit AuditLogger
IssueService IssueCreator
TaskService ChatTaskEnqueuer
// FlushReply emits the offline/archived notice that EnqueueChatTask
// now produces only at debounce-flush time. Before MUL-2968 those
// outcomes were returned synchronously from Handle and the hub's
// OutcomeReplier sent the card; with the run trigger debounced, the
// verdict is not known until the window closes, so the dispatcher
// drives the reply itself via this callback. Wired to
// OutcomeReplier.Reply in production; nil disables the notice (the
// message is still durable, only the card is skipped).
FlushReply FlushReplyFunc
// Logger is used by the detached flush path, which cannot return
// errors to a caller and must log them. Defaults to slog.Default().
Logger *slog.Logger
// batcher debounces the per-session run trigger. Installed via
// EnableRunBatching in production; when nil (unit tests / degenerate
// config) the run fires inline with no debounce — a zero-length
// window, not a separate code path.
batcher *pendingBatcher
}
// FlushReplyFunc matches OutcomeReplier.Reply so the production replier can
// be injected directly. It is invoked from the debounced flush goroutine
// to deliver the agent-offline / agent-archived notice.
type FlushReplyFunc func(ctx context.Context, inst db.LarkInstallation, msg InboundMessage, res DispatchResult)
// chatRunFlushTimeout bounds the detached flush (session reload +
// EnqueueChatTask + offline/archived notice). The flush runs on its own
// fresh context because the inbound request ctx is long cancelled by the
// time the window closes.
const chatRunFlushTimeout = 10 * time.Second
// EnableRunBatching installs the in-memory debouncer in front of the
// per-session run trigger. Call once at boot. A non-positive window uses
// DefaultChatRunBatchWindow. Without this, the dispatcher triggers runs
// inline (used by unit tests that assert the immediate effect).
func (d *Dispatcher) EnableRunBatching(window time.Duration) {
d.batcher = newPendingBatcher(window)
}
// FlushPendingRuns drains every still-pending run trigger immediately and
// blocks until in-flight flushes finish. The hub calls this on graceful
// shutdown, after inbound delivery has stopped, so a normal restart does
// not silently drop a window's worth of messages. No-op when batching is
// disabled.
func (d *Dispatcher) FlushPendingRuns() {
if d.batcher != nil {
d.batcher.FlushAll()
}
}
func (d *Dispatcher) logger() *slog.Logger {
if d.Logger != nil {
return d.Logger
}
return slog.Default()
}
// Handle processes one inbound Lark message end-to-end. It never
@@ -467,35 +532,96 @@ func (d *Dispatcher) processClaimed(ctx context.Context, msg InboundMessage, ins
}
}
// 8. Enqueue the chat task that triggers the agent run. Only the
// productizable rejections from EnqueueChatTask (agent
// archived, agent has no runtime configured) are mapped to a
// user-visible Outcome; real infra failures bubble up as
// errors so the WS adapter can retry or page.
// 8. Debounce the run trigger. The chat_message + dedup Mark are
// already durable; the agent run reads the WHOLE session at
// execution time, so a burst of messages in this session is
// collapsed into ONE run by deferring EnqueueChatTask behind a
// short silence window (MUL-2968). The synchronous outcome is
// OutcomeIngested with NO TaskID — the task row is created later,
// at flush. EnqueueChatTask's productizable verdicts (agent
// offline / archived) and infra errors are now handled inside the
// flush (see flushChatRun), not returned here.
//
// Note: a daemon that's merely disconnected is NOT an error
// here. As long as agent.runtime_id is set, the chat task is
// enqueued and waits for the daemon to claim it on next
// online; this path returns OutcomeIngested with a TaskID.
// Note: a daemon that's merely disconnected is NOT an error. As
// long as agent.runtime_id is set, the chat task is enqueued at
// flush and waits for the daemon to claim it on next online.
d.scheduleRun(inst, msg, sessionID)
return res, postAppendFinalize, nil
}
// scheduleRun hands the per-session run trigger to the debouncer (or fires
// it inline when batching is disabled). The flush closure captures this
// message's installation + InboundMessage so the offline/archived notice,
// if any, targets the right chat; the latest message in a window wins.
func (d *Dispatcher) scheduleRun(inst db.LarkInstallation, msg InboundMessage, sessionID pgtype.UUID) {
flush := func() { d.flushChatRun(inst, msg, sessionID) }
if d.batcher == nil {
// Batching disabled (unit tests / degenerate config): trigger the
// run immediately. Production always installs a batcher via
// EnableRunBatching, so this branch does not run in prod.
flush()
return
}
d.batcher.Schedule(keyForSession(sessionID), flush)
}
// flushChatRun is the debounced run-trigger. It runs once per silence
// window per chat session, detached from the inbound path (on its own
// goroutine and fresh context). It reloads the session, enqueues exactly
// one chat task for the whole window's worth of messages, and — because
// EnqueueChatTask's offline/archived verdict is only known here now —
// emits the corresponding notice itself via FlushReply. Errors cannot be
// returned to a caller (the message is already ACKed and durable), so they
// are logged: a failed enqueue leaves the message in the session to be
// picked up by the next message's run.
func (d *Dispatcher) flushChatRun(inst db.LarkInstallation, msg InboundMessage, sessionID pgtype.UUID) {
ctx, cancel := context.WithTimeout(context.Background(), chatRunFlushTimeout)
defer cancel()
session, err := d.Queries.GetChatSession(ctx, sessionID)
if err != nil {
return DispatchResult{}, postAppendFinalize, fmt.Errorf("reload chat session: %w", err)
d.logger().Error("lark dispatcher: flush reload chat session failed",
"chat_session_id", uuidString(sessionID),
"err", err.Error(),
)
return
}
task, err := d.TaskService.EnqueueChatTask(ctx, session)
if err != nil {
if _, err := d.TaskService.EnqueueChatTask(ctx, session); err != nil {
switch {
case errors.Is(err, service.ErrChatTaskAgentNoRuntime):
res.Outcome = OutcomeAgentOffline
return res, postAppendFinalize, nil
d.emitFlushReply(ctx, inst, msg, sessionID, OutcomeAgentOffline)
case errors.Is(err, service.ErrChatTaskAgentArchived):
res.Outcome = OutcomeAgentArchived
return res, postAppendFinalize, nil
d.emitFlushReply(ctx, inst, msg, sessionID, OutcomeAgentArchived)
default:
return DispatchResult{}, postAppendFinalize, fmt.Errorf("enqueue chat task: %w", err)
// Infra failure (DB down, etc.). Nothing to retry against —
// the inbound frame was ACKed long ago. Log so the gap is
// visible; the next message in this session re-triggers a run
// that will read this message too.
d.logger().Error("lark dispatcher: flush enqueue chat task failed",
"chat_session_id", uuidString(sessionID),
"err", err.Error(),
)
}
}
res.TaskID = task.ID
return res, postAppendFinalize, nil
}
// emitFlushReply delivers an offline/archived notice for a flushed run.
func (d *Dispatcher) emitFlushReply(ctx context.Context, inst db.LarkInstallation, msg InboundMessage, sessionID pgtype.UUID, outcome Outcome) {
if d.FlushReply == nil {
return
}
d.FlushReply(ctx, inst, msg, DispatchResult{
Outcome: outcome,
InstallationID: inst.ID,
ChatSessionID: sessionID,
SenderOpenID: msg.SenderOpenID,
})
}
// keyForSession is the batcher key. chat_session_id is a globally-unique
// UUID, so it alone disambiguates sessions across installations.
func keyForSession(sessionID pgtype.UUID) string {
return string(sessionID.Bytes[:])
}
// applyFinalize flips the in-flight claim row to its terminal state,

View File

@@ -210,7 +210,7 @@ type fakeChat struct {
ensureErr error
appendResult AppendResult
appendErr error
queries *fakeQueries // when set, runs the in-tx Mark
queries *fakeQueries // when set, runs the in-tx Mark
beforeAppend func(AppendUserMessageParams) // race-injection hook
calledEnsure int
calledAppend int
@@ -445,8 +445,14 @@ func TestDispatcher_PlainMessageEnqueuesTask(t *testing.T) {
if res.Outcome != OutcomeIngested {
t.Fatalf("expected ingested, got %q", res.Outcome)
}
if !res.TaskID.Valid || res.TaskID != enq.task.ID {
t.Fatalf("task id propagation broken: %+v", res.TaskID)
// The run trigger is debounced (MUL-2968): no TaskID is surfaced
// synchronously anymore, and with batching disabled the flush fires
// inline so the enqueue is observable right here.
if res.TaskID.Valid {
t.Fatalf("TaskID must not be set synchronously after debounce; got %+v", res.TaskID)
}
if enq.called != 1 {
t.Fatalf("expected exactly one EnqueueChatTask at flush; called=%d", enq.called)
}
// For p2p the session creator should be the bound user, not the
// installer — verifies the chat-type branch in Handle.
@@ -492,7 +498,7 @@ func TestDispatcher_DedupHitDoesNotEnqueue(t *testing.T) {
queries := &fakeQueries{
installationByApp: activeInstallation(),
userBinding: boundUser(),
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-dup"): {processed: true, token: validUUID(0xAB)}},
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-dup"): {processed: true, token: validUUID(0xAB)}},
}
chat := &fakeChat{
ensureID: validUUID(0x66),
@@ -542,7 +548,7 @@ func TestDispatcher_DedupHitDoesNotEnqueue(t *testing.T) {
func TestDispatcher_DedupBeforeGroupFilter(t *testing.T) {
queries := &fakeQueries{
installationByApp: activeInstallation(),
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-replay"): {processed: true, token: validUUID(0xAB)}},
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-replay"): {processed: true, token: validUUID(0xAB)}},
}
audit := &fakeAudit{}
d := &Dispatcher{Queries: queries, Audit: audit}
@@ -610,7 +616,7 @@ func TestDispatcher_DedupBeforeIdentityCheck(t *testing.T) {
queries := &fakeQueries{
installationByApp: activeInstallation(),
userBindingErr: pgx.ErrNoRows, // unbound — would normally trigger OutcomeNeedsBinding
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-replay"): {processed: true, token: validUUID(0xAB)}},
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-replay"): {processed: true, token: validUUID(0xAB)}},
}
audit := &fakeAudit{}
d := &Dispatcher{Queries: queries, Audit: audit}
@@ -782,7 +788,24 @@ func TestDispatcher_EmptyTitleSurfacesError(t *testing.T) {
}
}
func TestDispatcher_AgentOfflineFallsThroughCleanly(t *testing.T) {
// captureReply is a FlushReply seam: it records every offline/archived
// notice the dispatcher emits at flush time so tests can assert what the
// user-facing card would say.
type captureReply struct {
count int
results []DispatchResult
}
func (c *captureReply) reply(_ context.Context, _ db.LarkInstallation, _ InboundMessage, res DispatchResult) {
c.count++
c.results = append(c.results, res)
}
func TestDispatcher_AgentOfflineRepliesAtFlush(t *testing.T) {
// With the run trigger debounced (MUL-2968), the agent-offline verdict
// is only known at flush time. Handle itself returns OutcomeIngested
// (the message is durable + ACKed); the offline notice is delivered
// through FlushReply. With batching disabled the flush fires inline.
sessionID := validUUID(0x66)
queries := &fakeQueries{
installationByApp: activeInstallation(),
@@ -791,11 +814,13 @@ func TestDispatcher_AgentOfflineFallsThroughCleanly(t *testing.T) {
}
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
enq := &fakeEnqueuer{err: service.ErrChatTaskAgentNoRuntime}
cap := &captureReply{}
d := &Dispatcher{
Queries: queries,
Chat: chat,
Audit: &fakeAudit{},
TaskService: enq,
FlushReply: cap.reply,
}
res, err := d.Handle(context.Background(), InboundMessage{
@@ -808,15 +833,24 @@ func TestDispatcher_AgentOfflineFallsThroughCleanly(t *testing.T) {
if err != nil {
t.Fatalf("offline path should not return error, got %v", err)
}
if res.Outcome != OutcomeAgentOffline {
t.Fatalf("expected OutcomeAgentOffline, got %q", res.Outcome)
if res.Outcome != OutcomeIngested {
t.Fatalf("synchronous outcome must be ingested, got %q", res.Outcome)
}
if res.ChatSessionID != sessionID {
t.Fatalf("session id not propagated: %+v", res.ChatSessionID)
if enq.called != 1 {
t.Fatalf("flush must call EnqueueChatTask exactly once; called=%d", enq.called)
}
if cap.count != 1 {
t.Fatalf("expected exactly one flush reply; got %d", cap.count)
}
if cap.results[0].Outcome != OutcomeAgentOffline {
t.Fatalf("expected OutcomeAgentOffline at flush, got %q", cap.results[0].Outcome)
}
if cap.results[0].ChatSessionID != sessionID {
t.Fatalf("session id not propagated to flush reply: %+v", cap.results[0].ChatSessionID)
}
}
func TestDispatcher_AgentArchivedSurfacesDistinctOutcome(t *testing.T) {
func TestDispatcher_AgentArchivedRepliesAtFlush(t *testing.T) {
sessionID := validUUID(0x66)
queries := &fakeQueries{
installationByApp: activeInstallation(),
@@ -825,11 +859,13 @@ func TestDispatcher_AgentArchivedSurfacesDistinctOutcome(t *testing.T) {
}
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
enq := &fakeEnqueuer{err: service.ErrChatTaskAgentArchived}
cap := &captureReply{}
d := &Dispatcher{
Queries: queries,
Chat: chat,
Audit: &fakeAudit{},
TaskService: enq,
FlushReply: cap.reply,
}
res, err := d.Handle(context.Background(), InboundMessage{
@@ -842,16 +878,21 @@ func TestDispatcher_AgentArchivedSurfacesDistinctOutcome(t *testing.T) {
if err != nil {
t.Fatalf("archived path should not return error, got %v", err)
}
if res.Outcome != OutcomeAgentArchived {
t.Fatalf("expected OutcomeAgentArchived, got %q", res.Outcome)
if res.Outcome != OutcomeIngested {
t.Fatalf("synchronous outcome must be ingested, got %q", res.Outcome)
}
if cap.count != 1 || cap.results[0].Outcome != OutcomeAgentArchived {
t.Fatalf("expected OutcomeAgentArchived at flush, got count=%d results=%+v", cap.count, cap.results)
}
}
func TestDispatcher_InfraFailureSurfacesError(t *testing.T) {
// A DB / load / create failure from TaskService.EnqueueChatTask is
// NOT a productizable state — the WS adapter must see a real
// error so it can retry or page, not an "offline" card that
// silently hides the outage.
func TestDispatcher_FlushInfraFailureIsNotReplied(t *testing.T) {
// A DB / load / create failure from EnqueueChatTask is NOT a
// productizable state. The inbound frame was ACKed and the message is
// already durable, so Handle returns no error (nothing for the
// connector to retry against), the failure is logged, and NO
// offline/archived card is sent — a bogus "offline" card would
// silently hide the outage.
sessionID := validUUID(0x66)
queries := &fakeQueries{
installationByApp: activeInstallation(),
@@ -861,25 +902,91 @@ func TestDispatcher_InfraFailureSurfacesError(t *testing.T) {
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
infraErr := errors.New("create chat task: connection refused")
enq := &fakeEnqueuer{err: infraErr}
cap := &captureReply{}
d := &Dispatcher{
Queries: queries,
Chat: chat,
Audit: &fakeAudit{},
TaskService: enq,
FlushReply: cap.reply,
}
_, err := d.Handle(context.Background(), InboundMessage{
res, err := d.Handle(context.Background(), InboundMessage{
AppID: "ok",
ChatType: ChatTypeP2P,
SenderOpenID: "ou_user_a",
Body: "hi",
MessageID: "msg-infra",
})
if err == nil {
t.Fatalf("infra failure should surface as error, got nil")
if err != nil {
t.Fatalf("flush infra failure must not surface from Handle, got %v", err)
}
if !errors.Is(err, infraErr) {
t.Fatalf("infra error should propagate (errors.Is), got %v", err)
if res.Outcome != OutcomeIngested {
t.Fatalf("synchronous outcome must be ingested, got %q", res.Outcome)
}
if enq.called != 1 {
t.Fatalf("flush must attempt EnqueueChatTask once; called=%d", enq.called)
}
if cap.count != 0 {
t.Fatalf("infra failure must not emit any offline/archived card; replies=%d", cap.count)
}
}
func TestDispatcher_DebounceCoalescesRunTrigger(t *testing.T) {
// Two messages in the same chat_session within the silence window must
// produce exactly ONE EnqueueChatTask (one agent run). The run reads
// the whole session history, so both messages are covered by it. This
// is the core MUL-2968 behaviour: "forward a transcript, then type a
// note" stops triggering two runs.
sessionID := validUUID(0x66)
queries := &fakeQueries{
installationByApp: activeInstallation(),
userBinding: boundUser(),
chatSession: db.ChatSession{ID: sessionID, AgentID: validUUID(0x33)},
}
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
enq := &fakeEnqueuer{task: db.AgentTaskQueue{ID: validUUID(0x77)}}
f := &fakeTimerFactory{}
d := &Dispatcher{Queries: queries, Chat: chat, Audit: &fakeAudit{}, TaskService: enq}
d.batcher = newTestBatcher(f)
send := func(id string) {
res, err := d.Handle(context.Background(), InboundMessage{
AppID: "ok",
ChatType: ChatTypeP2P,
SenderOpenID: "ou_user_a",
Body: "hi",
MessageID: id,
})
if err != nil {
t.Fatalf("unexpected error for %s: %v", id, err)
}
if res.Outcome != OutcomeIngested {
t.Fatalf("expected ingested for %s, got %q", id, res.Outcome)
}
}
send("m1")
send("m2")
// Both messages are durable + ACKed, but the run is still pending.
if enq.called != 0 {
t.Fatalf("run trigger must be debounced; enqueue called=%d before window closed", enq.called)
}
if got := d.batcher.pendingCount(); got != 1 {
t.Fatalf("both messages share one session window; pending=%d", got)
}
f.fireArmed() // window closes
if enq.called != 1 {
t.Fatalf("a coalesced burst must enqueue exactly once; called=%d", enq.called)
}
// A message arriving after the window fired is a new burst → new run.
send("m3")
f.fireArmed()
if enq.called != 2 {
t.Fatalf("a message after the window must start a new run; called=%d", enq.called)
}
}
@@ -1051,34 +1158,41 @@ func TestDispatcher_AppendUserMessageFailureReleasesClaim(t *testing.T) {
// committed) but a downstream step returns an error, the dispatcher
// MUST mark the claim processed. Otherwise a replay would re-process
// the message and write a second chat_message row.
//
// The run-trigger enqueue is now debounced and cannot fail synchronously,
// so the synchronous downstream error this pins is the /issue create
// path — the remaining step that runs after the chat_message is durable
// and can still surface an error to the caller.
func TestDispatcher_DurableErrorMarksClaim(t *testing.T) {
sessionID := validUUID(0x66)
inst := activeInstallation()
queries := &fakeQueries{
installationByApp: activeInstallation(),
installationByApp: inst,
userBinding: boundUser(),
chatSession: db.ChatSession{ID: sessionID, AgentID: validUUID(0x33)},
chatSession: db.ChatSession{ID: sessionID, AgentID: inst.AgentID},
}
chat := &fakeChat{
ensureID: sessionID,
appendResult: AppendResult{},
appendResult: AppendResult{IssueCommand: &IssueCommand{Title: "boom"}},
}
infraErr := errors.New("create chat task: connection refused")
issueErr := errors.New("create issue: connection refused")
d := &Dispatcher{
Queries: queries,
Chat: chat,
Audit: &fakeAudit{},
TaskService: &fakeEnqueuer{err: infraErr},
Queries: queries,
Chat: chat,
Audit: &fakeAudit{},
IssueService: &fakeIssueCreator{err: issueErr},
TaskService: &fakeEnqueuer{},
}
_, err := d.Handle(context.Background(), InboundMessage{
AppID: "ok",
ChatType: ChatTypeP2P,
SenderOpenID: "ou_user_a",
Body: "hi",
Body: "/issue boom",
MessageID: "msg-durable-err",
})
if !errors.Is(err, infraErr) {
t.Fatalf("expected infra error to propagate, got %v", err)
if !errors.Is(err, issueErr) {
t.Fatalf("expected post-append durable error to propagate, got %v", err)
}
if queries.calledRelease != 0 {
t.Fatalf("must not release: chat_message already committed; calledRelease=%d", queries.calledRelease)
@@ -1190,7 +1304,7 @@ func TestDispatcher_StaleInFlightClaimReclaimable(t *testing.T) {
installationByApp: activeInstallation(),
userBinding: boundUser(),
chatSession: db.ChatSession{ID: sessionID, AgentID: validUUID(0x33)},
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-stale"): {processed: false, token: validUUID(0xAB)}},
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-stale"): {processed: false, token: validUUID(0xAB)}},
dedupReclaim: true, // simulates received_at < now() - 60s
}
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}

View File

@@ -336,6 +336,13 @@ func (h *Hub) Run(ctx context.Context) {
// returns once those deadlines elapse.
func (h *Hub) Wait() {
h.wg.Wait()
// Supervisors (and thus inbound delivery) have stopped, so no new
// run triggers can be scheduled. Drain the debounced pending triggers
// before joining replies: the flush may itself emit an offline/archived
// notice, and FlushPendingRuns blocks until those finish.
if h.dispatcher != nil {
h.dispatcher.FlushPendingRuns()
}
h.replyWg.Wait()
}

View File

@@ -0,0 +1,176 @@
package lark
import (
"sync"
"time"
)
// DefaultChatRunBatchWindow is the silence window the inbound debouncer
// waits before triggering an agent run for a chat session. Owner-aligned
// at 3s on MUL-2968: long enough to absorb a "forward a transcript, then
// type a note" burst into one run, short enough that the bot's first
// reply is not perceptibly late.
const DefaultChatRunBatchWindow = 3 * time.Second
// stoppableTimer is the slice of *time.Timer the batcher depends on.
// Pinned to an interface so unit tests inject a manually-fired fake
// instead of sleeping real wall-clock seconds. *time.Timer satisfies it
// directly (Stop() bool).
type stoppableTimer interface {
Stop() bool
}
// pendingBatcher debounces the per-chat_session run trigger. Each inbound
// Lark message that lands in a session calls Schedule, which (re)arms a
// single timer for that session; when the session goes quiet for the
// configured window the latest-registered flush runs exactly once. This
// collapses a burst of messages into ONE agent run instead of one run per
// message — safe because the chat task reads the WHOLE session history at
// run time, so the individually-persisted messages are all visible to the
// single run. Only the run TRIGGER is debounced; the chat_message rows,
// per-message dedup, and frame ACK already happened synchronously upstream.
//
// State is in-process only, keyed by chat_session_id (a globally-unique
// UUID, so no installation qualifier is needed). The WS lease guarantees a
// single active owner per installation, so a session is only ever debounced
// by one process. A hard crash inside the window drops the pending trigger
// (the messages are already durable in chat_session; they simply do not
// fire a run until the next message arrives) — an accepted low-frequency
// boundary per MUL-2968 decision 5. Graceful shutdown calls FlushAll so
// that boundary is not hit on a normal restart.
//
// The batcher is goroutine-safe: a single instance is shared across all
// supervisor goroutines.
type pendingBatcher struct {
window time.Duration
// afterFunc builds a timer that invokes fn after d. Defaults to
// time.AfterFunc; tests substitute a fake so flushes are deterministic.
afterFunc func(d time.Duration, fn func()) stoppableTimer
mu sync.Mutex
pending map[string]*pendingEntry
// seq mints a monotonic generation per (re)schedule. onFire carries
// the generation it was armed with and bails if a newer schedule has
// superseded it — this fences the classic AfterFunc race where a timer
// fires concurrently with the Stop() that was meant to cancel it.
seq uint64
stopped bool
// inflight tracks flush callbacks that are currently executing (timer
// already fired) so FlushAll can join them before a graceful shutdown
// proceeds.
inflight sync.WaitGroup
}
type pendingEntry struct {
timer stoppableTimer
flush func()
gen uint64
}
// newPendingBatcher returns a batcher with the given silence window. A
// non-positive window falls back to DefaultChatRunBatchWindow.
func newPendingBatcher(window time.Duration) *pendingBatcher {
if window <= 0 {
window = DefaultChatRunBatchWindow
}
return &pendingBatcher{
window: window,
afterFunc: realAfterFunc,
pending: make(map[string]*pendingEntry),
}
}
// realAfterFunc adapts time.AfterFunc to the stoppableTimer seam.
func realAfterFunc(d time.Duration, fn func()) stoppableTimer {
return time.AfterFunc(d, fn)
}
// Schedule (re)arms the silence window for key. The most recent flush wins:
// only session-level information is needed to fire a run, so keeping the
// latest closure (which captures the latest installation/message context
// for the offline/archived notice) is sufficient. Calling Schedule after
// FlushAll runs the flush inline best-effort rather than silently dropping
// it; this only happens on the shutdown race where a message arrives after
// the drain has begun.
func (b *pendingBatcher) Schedule(key string, flush func()) {
b.mu.Lock()
if b.stopped {
b.mu.Unlock()
flush()
return
}
b.seq++
gen := b.seq
fire := func() { b.onFire(key, gen) }
if e, ok := b.pending[key]; ok {
// Reset the window: cancel the prior timer and arm a fresh one.
// The gen bump means a stale fire from the cancelled timer is a
// no-op even if Stop loses the race.
e.timer.Stop()
e.flush = flush
e.gen = gen
e.timer = b.afterFunc(b.window, fire)
b.mu.Unlock()
return
}
b.pending[key] = &pendingEntry{
flush: flush,
gen: gen,
timer: b.afterFunc(b.window, fire),
}
b.mu.Unlock()
}
// onFire runs the flush for key if it is still the live, armed generation.
// It is the timer callback; in production it runs on time.AfterFunc's own
// goroutine, so the flush is naturally detached from the inbound path.
func (b *pendingBatcher) onFire(key string, gen uint64) {
b.mu.Lock()
e, ok := b.pending[key]
if !ok || b.stopped || e.gen != gen {
// Superseded by a newer Schedule, already flushed, or draining via
// FlushAll (which owns this entry now). Do nothing.
b.mu.Unlock()
return
}
delete(b.pending, key)
flush := e.flush
b.inflight.Add(1)
b.mu.Unlock()
defer b.inflight.Done()
flush()
}
// FlushAll stops the batcher and runs every still-pending flush exactly
// once, then waits for any concurrently-firing flushes to finish. Intended
// to be called once from the graceful-shutdown path AFTER inbound delivery
// has stopped, so a normal restart does not drop a window's worth of
// triggers. After FlushAll the batcher is terminal: later Schedule calls
// run inline.
func (b *pendingBatcher) FlushAll() {
b.mu.Lock()
b.stopped = true
entries := make([]*pendingEntry, 0, len(b.pending))
for _, e := range b.pending {
e.timer.Stop()
entries = append(entries, e)
}
b.pending = make(map[string]*pendingEntry)
b.mu.Unlock()
for _, e := range entries {
e.flush()
}
// Join flushes whose timer had already fired before we set stopped.
b.inflight.Wait()
}
// pendingCount reports how many sessions currently have an armed window.
// Used by tests and useful for ops visibility.
func (b *pendingBatcher) pendingCount() int {
b.mu.Lock()
defer b.mu.Unlock()
return len(b.pending)
}

View File

@@ -0,0 +1,197 @@
package lark
import (
"sync"
"testing"
"time"
)
// fakeBatchTimer is a manually-fired stand-in for *time.Timer so debounce
// behaviour can be asserted without sleeping real wall-clock seconds.
type fakeBatchTimer struct {
fn func()
stopped bool
fired bool
}
// Stop mirrors *time.Timer.Stop: returns true only if the timer was still
// armed (not already stopped or fired).
func (t *fakeBatchTimer) Stop() bool {
if t.stopped || t.fired {
return false
}
t.stopped = true
return true
}
// fakeTimerFactory hands out fakeBatchTimers and lets a test fire whichever
// ones are currently armed — modelling the wall clock advancing past the
// window for every pending session at once.
type fakeTimerFactory struct {
mu sync.Mutex
all []*fakeBatchTimer
}
func (f *fakeTimerFactory) after(_ time.Duration, fn func()) stoppableTimer {
f.mu.Lock()
defer f.mu.Unlock()
t := &fakeBatchTimer{fn: fn}
f.all = append(f.all, t)
return t
}
// fireArmed invokes every timer that is armed (not stopped, not already
// fired) right now.
func (f *fakeTimerFactory) fireArmed() {
f.mu.Lock()
armed := make([]*fakeBatchTimer, 0, len(f.all))
for _, t := range f.all {
if !t.stopped && !t.fired {
t.fired = true
armed = append(armed, t)
}
}
f.mu.Unlock()
for _, t := range armed {
t.fn()
}
}
func (f *fakeTimerFactory) armedCount() int {
f.mu.Lock()
defer f.mu.Unlock()
n := 0
for _, t := range f.all {
if !t.stopped && !t.fired {
n++
}
}
return n
}
// newTestBatcher builds a batcher whose timers are driven by f. Shared with
// dispatcher_test.go (same package) to drive the debounce coalescing test.
func newTestBatcher(f *fakeTimerFactory) *pendingBatcher {
return &pendingBatcher{
window: DefaultChatRunBatchWindow,
afterFunc: f.after,
pending: make(map[string]*pendingEntry),
}
}
func TestPendingBatcher_DebounceCoalesces(t *testing.T) {
f := &fakeTimerFactory{}
b := newTestBatcher(f)
calls := 0
flush := func() { calls++ }
b.Schedule("s", flush)
b.Schedule("s", flush)
b.Schedule("s", flush)
if got := b.pendingCount(); got != 1 {
t.Fatalf("three Schedules on one session must keep a single pending entry; got %d", got)
}
if got := f.armedCount(); got != 1 {
t.Fatalf("each reschedule must cancel the prior timer, leaving one armed; got %d", got)
}
f.fireArmed()
if calls != 1 {
t.Fatalf("a debounced burst must flush exactly once; got %d", calls)
}
if got := b.pendingCount(); got != 0 {
t.Fatalf("the session entry must be cleaned up after flush; pending=%d", got)
}
}
func TestPendingBatcher_MultiSessionIndependent(t *testing.T) {
f := &fakeTimerFactory{}
b := newTestBatcher(f)
var a, c int
b.Schedule("a", func() { a++ })
b.Schedule("c", func() { c++ })
if got := b.pendingCount(); got != 2 {
t.Fatalf("two distinct sessions must hold two windows; got %d", got)
}
f.fireArmed()
if a != 1 || c != 1 {
t.Fatalf("each session must flush once and not cross-talk; a=%d c=%d", a, c)
}
}
func TestPendingBatcher_StaleTimerFireIsNoop(t *testing.T) {
// Reproduces the AfterFunc race: a timer fires concurrently with the
// Stop() that was meant to cancel it after a reschedule. The
// generation guard must make the stale fire a no-op so the burst still
// flushes exactly once.
f := &fakeTimerFactory{}
b := newTestBatcher(f)
calls := 0
b.Schedule("s", func() { calls++ })
first := f.all[0]
b.Schedule("s", func() { calls++ }) // resets: cancels first, arms a new timer
// First timer fires anyway despite having been Stop()ed.
first.fired = true
first.fn()
if calls != 0 {
t.Fatalf("a superseded timer firing must not flush; got %d", calls)
}
f.fireArmed()
if calls != 1 {
t.Fatalf("the live timer must still flush exactly once; got %d", calls)
}
}
func TestPendingBatcher_FlushAllDrainsPending(t *testing.T) {
f := &fakeTimerFactory{}
b := newTestBatcher(f)
var a, c int
b.Schedule("a", func() { a++ })
b.Schedule("c", func() { c++ })
b.FlushAll()
if a != 1 || c != 1 {
t.Fatalf("FlushAll must flush every pending session once; a=%d c=%d", a, c)
}
if got := b.pendingCount(); got != 0 {
t.Fatalf("FlushAll must clear pending state; got %d", got)
}
// After FlushAll the batcher is terminal: a later Schedule runs inline
// rather than silently dropping (the shutdown-race best-effort path).
ran := false
b.Schedule("d", func() { ran = true })
if !ran {
t.Fatalf("Schedule after FlushAll must run inline")
}
}
func TestNewPendingBatcher_DefaultsWindow(t *testing.T) {
if b := newPendingBatcher(0); b.window != DefaultChatRunBatchWindow {
t.Fatalf("non-positive window must default to %v; got %v", DefaultChatRunBatchWindow, b.window)
}
if b := newPendingBatcher(500 * time.Millisecond); b.window != 500*time.Millisecond {
t.Fatalf("explicit window must be honoured; got %v", b.window)
}
}
func TestPendingBatcher_RealTimerFlushes(t *testing.T) {
// Exercises the production afterFunc (time.AfterFunc) path with a short
// real window so a mis-wired default would be caught.
b := newPendingBatcher(15 * time.Millisecond)
done := make(chan struct{})
b.Schedule("s", func() { close(done) })
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatal("real-timer flush did not fire within 2s")
}
}