mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-24 16:09:19 +02:00
Compare commits
1 Commits
fix/skill-
...
agent/j/0b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a58bfdd5c1 |
@@ -235,7 +235,13 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
Audit: auditLogger,
|
||||
IssueService: h.IssueService,
|
||||
TaskService: h.TaskService,
|
||||
Logger: slog.Default(),
|
||||
}
|
||||
// Debounce the per-session run trigger so a burst of
|
||||
// messages (e.g. "forward a transcript, then type a note")
|
||||
// collapses into one agent run instead of one per message.
|
||||
// MUL-2968.
|
||||
dispatcher.EnableRunBatching(lark.DefaultChatRunBatchWindow)
|
||||
|
||||
// WS Hub: lease + supervisor goroutines per installation.
|
||||
// The WSLongConnConnector talks Lark's long-conn protocol
|
||||
@@ -271,6 +277,11 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
|
||||
Logger: slog.Default(),
|
||||
})
|
||||
h.LarkHub.SetOutcomeReplier(replier)
|
||||
// The agent-offline / agent-archived notice is now decided
|
||||
// at debounce-flush time rather than synchronously from
|
||||
// Handle, so the dispatcher drives that reply itself through
|
||||
// the same replier. MUL-2968.
|
||||
dispatcher.FlushReply = replier.Reply
|
||||
slog.Info("lark inbound pipeline wired", "connector", connectorLabel)
|
||||
|
||||
// One-shot union_id backfill for installations created
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
@@ -111,9 +113,15 @@ type DispatchResult struct {
|
||||
InstallationID pgtype.UUID
|
||||
ChatSessionID pgtype.UUID
|
||||
SenderOpenID OpenID
|
||||
TaskID pgtype.UUID
|
||||
IssueID pgtype.UUID
|
||||
IssueNumber int32
|
||||
// TaskID was populated when the dispatcher enqueued the chat task
|
||||
// synchronously. With the short-window debounce (MUL-2968) the run is
|
||||
// triggered asynchronously at flush time, so Handle no longer knows a
|
||||
// task id — this field is left zero for the chat path. Kept on the
|
||||
// struct because the emit contract still carries it for any future
|
||||
// synchronous enqueue (e.g. /issue follow-ups).
|
||||
TaskID pgtype.UUID
|
||||
IssueID pgtype.UUID
|
||||
IssueNumber int32
|
||||
// IssueIdentifier is the workspace-qualified human key for the
|
||||
// created issue ("MUL-42"). Populated only when /issue produced a
|
||||
// new row. The OutcomeReplier uses this verbatim in the "Created
|
||||
@@ -191,6 +199,63 @@ type Dispatcher struct {
|
||||
Audit AuditLogger
|
||||
IssueService IssueCreator
|
||||
TaskService ChatTaskEnqueuer
|
||||
|
||||
// FlushReply emits the offline/archived notice that EnqueueChatTask
|
||||
// now produces only at debounce-flush time. Before MUL-2968 those
|
||||
// outcomes were returned synchronously from Handle and the hub's
|
||||
// OutcomeReplier sent the card; with the run trigger debounced, the
|
||||
// verdict is not known until the window closes, so the dispatcher
|
||||
// drives the reply itself via this callback. Wired to
|
||||
// OutcomeReplier.Reply in production; nil disables the notice (the
|
||||
// message is still durable, only the card is skipped).
|
||||
FlushReply FlushReplyFunc
|
||||
|
||||
// Logger is used by the detached flush path, which cannot return
|
||||
// errors to a caller and must log them. Defaults to slog.Default().
|
||||
Logger *slog.Logger
|
||||
|
||||
// batcher debounces the per-session run trigger. Installed via
|
||||
// EnableRunBatching in production; when nil (unit tests / degenerate
|
||||
// config) the run fires inline with no debounce — a zero-length
|
||||
// window, not a separate code path.
|
||||
batcher *pendingBatcher
|
||||
}
|
||||
|
||||
// FlushReplyFunc matches OutcomeReplier.Reply so the production replier can
|
||||
// be injected directly. It is invoked from the debounced flush goroutine
|
||||
// to deliver the agent-offline / agent-archived notice.
|
||||
type FlushReplyFunc func(ctx context.Context, inst db.LarkInstallation, msg InboundMessage, res DispatchResult)
|
||||
|
||||
// chatRunFlushTimeout bounds the detached flush (session reload +
|
||||
// EnqueueChatTask + offline/archived notice). The flush runs on its own
|
||||
// fresh context because the inbound request ctx is long cancelled by the
|
||||
// time the window closes.
|
||||
const chatRunFlushTimeout = 10 * time.Second
|
||||
|
||||
// EnableRunBatching installs the in-memory debouncer in front of the
|
||||
// per-session run trigger. Call once at boot. A non-positive window uses
|
||||
// DefaultChatRunBatchWindow. Without this, the dispatcher triggers runs
|
||||
// inline (used by unit tests that assert the immediate effect).
|
||||
func (d *Dispatcher) EnableRunBatching(window time.Duration) {
|
||||
d.batcher = newPendingBatcher(window)
|
||||
}
|
||||
|
||||
// FlushPendingRuns drains every still-pending run trigger immediately and
|
||||
// blocks until in-flight flushes finish. The hub calls this on graceful
|
||||
// shutdown, after inbound delivery has stopped, so a normal restart does
|
||||
// not silently drop a window's worth of messages. No-op when batching is
|
||||
// disabled.
|
||||
func (d *Dispatcher) FlushPendingRuns() {
|
||||
if d.batcher != nil {
|
||||
d.batcher.FlushAll()
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) logger() *slog.Logger {
|
||||
if d.Logger != nil {
|
||||
return d.Logger
|
||||
}
|
||||
return slog.Default()
|
||||
}
|
||||
|
||||
// Handle processes one inbound Lark message end-to-end. It never
|
||||
@@ -467,35 +532,96 @@ func (d *Dispatcher) processClaimed(ctx context.Context, msg InboundMessage, ins
|
||||
}
|
||||
}
|
||||
|
||||
// 8. Enqueue the chat task that triggers the agent run. Only the
|
||||
// productizable rejections from EnqueueChatTask (agent
|
||||
// archived, agent has no runtime configured) are mapped to a
|
||||
// user-visible Outcome; real infra failures bubble up as
|
||||
// errors so the WS adapter can retry or page.
|
||||
// 8. Debounce the run trigger. The chat_message + dedup Mark are
|
||||
// already durable; the agent run reads the WHOLE session at
|
||||
// execution time, so a burst of messages in this session is
|
||||
// collapsed into ONE run by deferring EnqueueChatTask behind a
|
||||
// short silence window (MUL-2968). The synchronous outcome is
|
||||
// OutcomeIngested with NO TaskID — the task row is created later,
|
||||
// at flush. EnqueueChatTask's productizable verdicts (agent
|
||||
// offline / archived) and infra errors are now handled inside the
|
||||
// flush (see flushChatRun), not returned here.
|
||||
//
|
||||
// Note: a daemon that's merely disconnected is NOT an error
|
||||
// here. As long as agent.runtime_id is set, the chat task is
|
||||
// enqueued and waits for the daemon to claim it on next
|
||||
// online; this path returns OutcomeIngested with a TaskID.
|
||||
// Note: a daemon that's merely disconnected is NOT an error. As
|
||||
// long as agent.runtime_id is set, the chat task is enqueued at
|
||||
// flush and waits for the daemon to claim it on next online.
|
||||
d.scheduleRun(inst, msg, sessionID)
|
||||
return res, postAppendFinalize, nil
|
||||
}
|
||||
|
||||
// scheduleRun hands the per-session run trigger to the debouncer (or fires
|
||||
// it inline when batching is disabled). The flush closure captures this
|
||||
// message's installation + InboundMessage so the offline/archived notice,
|
||||
// if any, targets the right chat; the latest message in a window wins.
|
||||
func (d *Dispatcher) scheduleRun(inst db.LarkInstallation, msg InboundMessage, sessionID pgtype.UUID) {
|
||||
flush := func() { d.flushChatRun(inst, msg, sessionID) }
|
||||
if d.batcher == nil {
|
||||
// Batching disabled (unit tests / degenerate config): trigger the
|
||||
// run immediately. Production always installs a batcher via
|
||||
// EnableRunBatching, so this branch does not run in prod.
|
||||
flush()
|
||||
return
|
||||
}
|
||||
d.batcher.Schedule(keyForSession(sessionID), flush)
|
||||
}
|
||||
|
||||
// flushChatRun is the debounced run-trigger. It runs once per silence
|
||||
// window per chat session, detached from the inbound path (on its own
|
||||
// goroutine and fresh context). It reloads the session, enqueues exactly
|
||||
// one chat task for the whole window's worth of messages, and — because
|
||||
// EnqueueChatTask's offline/archived verdict is only known here now —
|
||||
// emits the corresponding notice itself via FlushReply. Errors cannot be
|
||||
// returned to a caller (the message is already ACKed and durable), so they
|
||||
// are logged: a failed enqueue leaves the message in the session to be
|
||||
// picked up by the next message's run.
|
||||
func (d *Dispatcher) flushChatRun(inst db.LarkInstallation, msg InboundMessage, sessionID pgtype.UUID) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), chatRunFlushTimeout)
|
||||
defer cancel()
|
||||
|
||||
session, err := d.Queries.GetChatSession(ctx, sessionID)
|
||||
if err != nil {
|
||||
return DispatchResult{}, postAppendFinalize, fmt.Errorf("reload chat session: %w", err)
|
||||
d.logger().Error("lark dispatcher: flush reload chat session failed",
|
||||
"chat_session_id", uuidString(sessionID),
|
||||
"err", err.Error(),
|
||||
)
|
||||
return
|
||||
}
|
||||
task, err := d.TaskService.EnqueueChatTask(ctx, session)
|
||||
if err != nil {
|
||||
if _, err := d.TaskService.EnqueueChatTask(ctx, session); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, service.ErrChatTaskAgentNoRuntime):
|
||||
res.Outcome = OutcomeAgentOffline
|
||||
return res, postAppendFinalize, nil
|
||||
d.emitFlushReply(ctx, inst, msg, sessionID, OutcomeAgentOffline)
|
||||
case errors.Is(err, service.ErrChatTaskAgentArchived):
|
||||
res.Outcome = OutcomeAgentArchived
|
||||
return res, postAppendFinalize, nil
|
||||
d.emitFlushReply(ctx, inst, msg, sessionID, OutcomeAgentArchived)
|
||||
default:
|
||||
return DispatchResult{}, postAppendFinalize, fmt.Errorf("enqueue chat task: %w", err)
|
||||
// Infra failure (DB down, etc.). Nothing to retry against —
|
||||
// the inbound frame was ACKed long ago. Log so the gap is
|
||||
// visible; the next message in this session re-triggers a run
|
||||
// that will read this message too.
|
||||
d.logger().Error("lark dispatcher: flush enqueue chat task failed",
|
||||
"chat_session_id", uuidString(sessionID),
|
||||
"err", err.Error(),
|
||||
)
|
||||
}
|
||||
}
|
||||
res.TaskID = task.ID
|
||||
return res, postAppendFinalize, nil
|
||||
}
|
||||
|
||||
// emitFlushReply delivers an offline/archived notice for a flushed run.
|
||||
func (d *Dispatcher) emitFlushReply(ctx context.Context, inst db.LarkInstallation, msg InboundMessage, sessionID pgtype.UUID, outcome Outcome) {
|
||||
if d.FlushReply == nil {
|
||||
return
|
||||
}
|
||||
d.FlushReply(ctx, inst, msg, DispatchResult{
|
||||
Outcome: outcome,
|
||||
InstallationID: inst.ID,
|
||||
ChatSessionID: sessionID,
|
||||
SenderOpenID: msg.SenderOpenID,
|
||||
})
|
||||
}
|
||||
|
||||
// keyForSession is the batcher key. chat_session_id is a globally-unique
|
||||
// UUID, so it alone disambiguates sessions across installations.
|
||||
func keyForSession(sessionID pgtype.UUID) string {
|
||||
return string(sessionID.Bytes[:])
|
||||
}
|
||||
|
||||
// applyFinalize flips the in-flight claim row to its terminal state,
|
||||
|
||||
@@ -210,7 +210,7 @@ type fakeChat struct {
|
||||
ensureErr error
|
||||
appendResult AppendResult
|
||||
appendErr error
|
||||
queries *fakeQueries // when set, runs the in-tx Mark
|
||||
queries *fakeQueries // when set, runs the in-tx Mark
|
||||
beforeAppend func(AppendUserMessageParams) // race-injection hook
|
||||
calledEnsure int
|
||||
calledAppend int
|
||||
@@ -445,8 +445,14 @@ func TestDispatcher_PlainMessageEnqueuesTask(t *testing.T) {
|
||||
if res.Outcome != OutcomeIngested {
|
||||
t.Fatalf("expected ingested, got %q", res.Outcome)
|
||||
}
|
||||
if !res.TaskID.Valid || res.TaskID != enq.task.ID {
|
||||
t.Fatalf("task id propagation broken: %+v", res.TaskID)
|
||||
// The run trigger is debounced (MUL-2968): no TaskID is surfaced
|
||||
// synchronously anymore, and with batching disabled the flush fires
|
||||
// inline so the enqueue is observable right here.
|
||||
if res.TaskID.Valid {
|
||||
t.Fatalf("TaskID must not be set synchronously after debounce; got %+v", res.TaskID)
|
||||
}
|
||||
if enq.called != 1 {
|
||||
t.Fatalf("expected exactly one EnqueueChatTask at flush; called=%d", enq.called)
|
||||
}
|
||||
// For p2p the session creator should be the bound user, not the
|
||||
// installer — verifies the chat-type branch in Handle.
|
||||
@@ -492,7 +498,7 @@ func TestDispatcher_DedupHitDoesNotEnqueue(t *testing.T) {
|
||||
queries := &fakeQueries{
|
||||
installationByApp: activeInstallation(),
|
||||
userBinding: boundUser(),
|
||||
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-dup"): {processed: true, token: validUUID(0xAB)}},
|
||||
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-dup"): {processed: true, token: validUUID(0xAB)}},
|
||||
}
|
||||
chat := &fakeChat{
|
||||
ensureID: validUUID(0x66),
|
||||
@@ -542,7 +548,7 @@ func TestDispatcher_DedupHitDoesNotEnqueue(t *testing.T) {
|
||||
func TestDispatcher_DedupBeforeGroupFilter(t *testing.T) {
|
||||
queries := &fakeQueries{
|
||||
installationByApp: activeInstallation(),
|
||||
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-replay"): {processed: true, token: validUUID(0xAB)}},
|
||||
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-replay"): {processed: true, token: validUUID(0xAB)}},
|
||||
}
|
||||
audit := &fakeAudit{}
|
||||
d := &Dispatcher{Queries: queries, Audit: audit}
|
||||
@@ -610,7 +616,7 @@ func TestDispatcher_DedupBeforeIdentityCheck(t *testing.T) {
|
||||
queries := &fakeQueries{
|
||||
installationByApp: activeInstallation(),
|
||||
userBindingErr: pgx.ErrNoRows, // unbound — would normally trigger OutcomeNeedsBinding
|
||||
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-replay"): {processed: true, token: validUUID(0xAB)}},
|
||||
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-replay"): {processed: true, token: validUUID(0xAB)}},
|
||||
}
|
||||
audit := &fakeAudit{}
|
||||
d := &Dispatcher{Queries: queries, Audit: audit}
|
||||
@@ -782,7 +788,24 @@ func TestDispatcher_EmptyTitleSurfacesError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_AgentOfflineFallsThroughCleanly(t *testing.T) {
|
||||
// captureReply is a FlushReply seam: it records every offline/archived
|
||||
// notice the dispatcher emits at flush time so tests can assert what the
|
||||
// user-facing card would say.
|
||||
type captureReply struct {
|
||||
count int
|
||||
results []DispatchResult
|
||||
}
|
||||
|
||||
func (c *captureReply) reply(_ context.Context, _ db.LarkInstallation, _ InboundMessage, res DispatchResult) {
|
||||
c.count++
|
||||
c.results = append(c.results, res)
|
||||
}
|
||||
|
||||
func TestDispatcher_AgentOfflineRepliesAtFlush(t *testing.T) {
|
||||
// With the run trigger debounced (MUL-2968), the agent-offline verdict
|
||||
// is only known at flush time. Handle itself returns OutcomeIngested
|
||||
// (the message is durable + ACKed); the offline notice is delivered
|
||||
// through FlushReply. With batching disabled the flush fires inline.
|
||||
sessionID := validUUID(0x66)
|
||||
queries := &fakeQueries{
|
||||
installationByApp: activeInstallation(),
|
||||
@@ -791,11 +814,13 @@ func TestDispatcher_AgentOfflineFallsThroughCleanly(t *testing.T) {
|
||||
}
|
||||
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
|
||||
enq := &fakeEnqueuer{err: service.ErrChatTaskAgentNoRuntime}
|
||||
cap := &captureReply{}
|
||||
d := &Dispatcher{
|
||||
Queries: queries,
|
||||
Chat: chat,
|
||||
Audit: &fakeAudit{},
|
||||
TaskService: enq,
|
||||
FlushReply: cap.reply,
|
||||
}
|
||||
|
||||
res, err := d.Handle(context.Background(), InboundMessage{
|
||||
@@ -808,15 +833,24 @@ func TestDispatcher_AgentOfflineFallsThroughCleanly(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("offline path should not return error, got %v", err)
|
||||
}
|
||||
if res.Outcome != OutcomeAgentOffline {
|
||||
t.Fatalf("expected OutcomeAgentOffline, got %q", res.Outcome)
|
||||
if res.Outcome != OutcomeIngested {
|
||||
t.Fatalf("synchronous outcome must be ingested, got %q", res.Outcome)
|
||||
}
|
||||
if res.ChatSessionID != sessionID {
|
||||
t.Fatalf("session id not propagated: %+v", res.ChatSessionID)
|
||||
if enq.called != 1 {
|
||||
t.Fatalf("flush must call EnqueueChatTask exactly once; called=%d", enq.called)
|
||||
}
|
||||
if cap.count != 1 {
|
||||
t.Fatalf("expected exactly one flush reply; got %d", cap.count)
|
||||
}
|
||||
if cap.results[0].Outcome != OutcomeAgentOffline {
|
||||
t.Fatalf("expected OutcomeAgentOffline at flush, got %q", cap.results[0].Outcome)
|
||||
}
|
||||
if cap.results[0].ChatSessionID != sessionID {
|
||||
t.Fatalf("session id not propagated to flush reply: %+v", cap.results[0].ChatSessionID)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_AgentArchivedSurfacesDistinctOutcome(t *testing.T) {
|
||||
func TestDispatcher_AgentArchivedRepliesAtFlush(t *testing.T) {
|
||||
sessionID := validUUID(0x66)
|
||||
queries := &fakeQueries{
|
||||
installationByApp: activeInstallation(),
|
||||
@@ -825,11 +859,13 @@ func TestDispatcher_AgentArchivedSurfacesDistinctOutcome(t *testing.T) {
|
||||
}
|
||||
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
|
||||
enq := &fakeEnqueuer{err: service.ErrChatTaskAgentArchived}
|
||||
cap := &captureReply{}
|
||||
d := &Dispatcher{
|
||||
Queries: queries,
|
||||
Chat: chat,
|
||||
Audit: &fakeAudit{},
|
||||
TaskService: enq,
|
||||
FlushReply: cap.reply,
|
||||
}
|
||||
|
||||
res, err := d.Handle(context.Background(), InboundMessage{
|
||||
@@ -842,16 +878,21 @@ func TestDispatcher_AgentArchivedSurfacesDistinctOutcome(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("archived path should not return error, got %v", err)
|
||||
}
|
||||
if res.Outcome != OutcomeAgentArchived {
|
||||
t.Fatalf("expected OutcomeAgentArchived, got %q", res.Outcome)
|
||||
if res.Outcome != OutcomeIngested {
|
||||
t.Fatalf("synchronous outcome must be ingested, got %q", res.Outcome)
|
||||
}
|
||||
if cap.count != 1 || cap.results[0].Outcome != OutcomeAgentArchived {
|
||||
t.Fatalf("expected OutcomeAgentArchived at flush, got count=%d results=%+v", cap.count, cap.results)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_InfraFailureSurfacesError(t *testing.T) {
|
||||
// A DB / load / create failure from TaskService.EnqueueChatTask is
|
||||
// NOT a productizable state — the WS adapter must see a real
|
||||
// error so it can retry or page, not an "offline" card that
|
||||
// silently hides the outage.
|
||||
func TestDispatcher_FlushInfraFailureIsNotReplied(t *testing.T) {
|
||||
// A DB / load / create failure from EnqueueChatTask is NOT a
|
||||
// productizable state. The inbound frame was ACKed and the message is
|
||||
// already durable, so Handle returns no error (nothing for the
|
||||
// connector to retry against), the failure is logged, and NO
|
||||
// offline/archived card is sent — a bogus "offline" card would
|
||||
// silently hide the outage.
|
||||
sessionID := validUUID(0x66)
|
||||
queries := &fakeQueries{
|
||||
installationByApp: activeInstallation(),
|
||||
@@ -861,25 +902,91 @@ func TestDispatcher_InfraFailureSurfacesError(t *testing.T) {
|
||||
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
|
||||
infraErr := errors.New("create chat task: connection refused")
|
||||
enq := &fakeEnqueuer{err: infraErr}
|
||||
cap := &captureReply{}
|
||||
d := &Dispatcher{
|
||||
Queries: queries,
|
||||
Chat: chat,
|
||||
Audit: &fakeAudit{},
|
||||
TaskService: enq,
|
||||
FlushReply: cap.reply,
|
||||
}
|
||||
|
||||
_, err := d.Handle(context.Background(), InboundMessage{
|
||||
res, err := d.Handle(context.Background(), InboundMessage{
|
||||
AppID: "ok",
|
||||
ChatType: ChatTypeP2P,
|
||||
SenderOpenID: "ou_user_a",
|
||||
Body: "hi",
|
||||
MessageID: "msg-infra",
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatalf("infra failure should surface as error, got nil")
|
||||
if err != nil {
|
||||
t.Fatalf("flush infra failure must not surface from Handle, got %v", err)
|
||||
}
|
||||
if !errors.Is(err, infraErr) {
|
||||
t.Fatalf("infra error should propagate (errors.Is), got %v", err)
|
||||
if res.Outcome != OutcomeIngested {
|
||||
t.Fatalf("synchronous outcome must be ingested, got %q", res.Outcome)
|
||||
}
|
||||
if enq.called != 1 {
|
||||
t.Fatalf("flush must attempt EnqueueChatTask once; called=%d", enq.called)
|
||||
}
|
||||
if cap.count != 0 {
|
||||
t.Fatalf("infra failure must not emit any offline/archived card; replies=%d", cap.count)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDispatcher_DebounceCoalescesRunTrigger(t *testing.T) {
|
||||
// Two messages in the same chat_session within the silence window must
|
||||
// produce exactly ONE EnqueueChatTask (one agent run). The run reads
|
||||
// the whole session history, so both messages are covered by it. This
|
||||
// is the core MUL-2968 behaviour: "forward a transcript, then type a
|
||||
// note" stops triggering two runs.
|
||||
sessionID := validUUID(0x66)
|
||||
queries := &fakeQueries{
|
||||
installationByApp: activeInstallation(),
|
||||
userBinding: boundUser(),
|
||||
chatSession: db.ChatSession{ID: sessionID, AgentID: validUUID(0x33)},
|
||||
}
|
||||
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
|
||||
enq := &fakeEnqueuer{task: db.AgentTaskQueue{ID: validUUID(0x77)}}
|
||||
f := &fakeTimerFactory{}
|
||||
d := &Dispatcher{Queries: queries, Chat: chat, Audit: &fakeAudit{}, TaskService: enq}
|
||||
d.batcher = newTestBatcher(f)
|
||||
|
||||
send := func(id string) {
|
||||
res, err := d.Handle(context.Background(), InboundMessage{
|
||||
AppID: "ok",
|
||||
ChatType: ChatTypeP2P,
|
||||
SenderOpenID: "ou_user_a",
|
||||
Body: "hi",
|
||||
MessageID: id,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error for %s: %v", id, err)
|
||||
}
|
||||
if res.Outcome != OutcomeIngested {
|
||||
t.Fatalf("expected ingested for %s, got %q", id, res.Outcome)
|
||||
}
|
||||
}
|
||||
|
||||
send("m1")
|
||||
send("m2")
|
||||
|
||||
// Both messages are durable + ACKed, but the run is still pending.
|
||||
if enq.called != 0 {
|
||||
t.Fatalf("run trigger must be debounced; enqueue called=%d before window closed", enq.called)
|
||||
}
|
||||
if got := d.batcher.pendingCount(); got != 1 {
|
||||
t.Fatalf("both messages share one session window; pending=%d", got)
|
||||
}
|
||||
|
||||
f.fireArmed() // window closes
|
||||
if enq.called != 1 {
|
||||
t.Fatalf("a coalesced burst must enqueue exactly once; called=%d", enq.called)
|
||||
}
|
||||
|
||||
// A message arriving after the window fired is a new burst → new run.
|
||||
send("m3")
|
||||
f.fireArmed()
|
||||
if enq.called != 2 {
|
||||
t.Fatalf("a message after the window must start a new run; called=%d", enq.called)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1051,34 +1158,41 @@ func TestDispatcher_AppendUserMessageFailureReleasesClaim(t *testing.T) {
|
||||
// committed) but a downstream step returns an error, the dispatcher
|
||||
// MUST mark the claim processed. Otherwise a replay would re-process
|
||||
// the message and write a second chat_message row.
|
||||
//
|
||||
// The run-trigger enqueue is now debounced and cannot fail synchronously,
|
||||
// so the synchronous downstream error this pins is the /issue create
|
||||
// path — the remaining step that runs after the chat_message is durable
|
||||
// and can still surface an error to the caller.
|
||||
func TestDispatcher_DurableErrorMarksClaim(t *testing.T) {
|
||||
sessionID := validUUID(0x66)
|
||||
inst := activeInstallation()
|
||||
queries := &fakeQueries{
|
||||
installationByApp: activeInstallation(),
|
||||
installationByApp: inst,
|
||||
userBinding: boundUser(),
|
||||
chatSession: db.ChatSession{ID: sessionID, AgentID: validUUID(0x33)},
|
||||
chatSession: db.ChatSession{ID: sessionID, AgentID: inst.AgentID},
|
||||
}
|
||||
chat := &fakeChat{
|
||||
ensureID: sessionID,
|
||||
appendResult: AppendResult{},
|
||||
appendResult: AppendResult{IssueCommand: &IssueCommand{Title: "boom"}},
|
||||
}
|
||||
infraErr := errors.New("create chat task: connection refused")
|
||||
issueErr := errors.New("create issue: connection refused")
|
||||
d := &Dispatcher{
|
||||
Queries: queries,
|
||||
Chat: chat,
|
||||
Audit: &fakeAudit{},
|
||||
TaskService: &fakeEnqueuer{err: infraErr},
|
||||
Queries: queries,
|
||||
Chat: chat,
|
||||
Audit: &fakeAudit{},
|
||||
IssueService: &fakeIssueCreator{err: issueErr},
|
||||
TaskService: &fakeEnqueuer{},
|
||||
}
|
||||
|
||||
_, err := d.Handle(context.Background(), InboundMessage{
|
||||
AppID: "ok",
|
||||
ChatType: ChatTypeP2P,
|
||||
SenderOpenID: "ou_user_a",
|
||||
Body: "hi",
|
||||
Body: "/issue boom",
|
||||
MessageID: "msg-durable-err",
|
||||
})
|
||||
if !errors.Is(err, infraErr) {
|
||||
t.Fatalf("expected infra error to propagate, got %v", err)
|
||||
if !errors.Is(err, issueErr) {
|
||||
t.Fatalf("expected post-append durable error to propagate, got %v", err)
|
||||
}
|
||||
if queries.calledRelease != 0 {
|
||||
t.Fatalf("must not release: chat_message already committed; calledRelease=%d", queries.calledRelease)
|
||||
@@ -1190,7 +1304,7 @@ func TestDispatcher_StaleInFlightClaimReclaimable(t *testing.T) {
|
||||
installationByApp: activeInstallation(),
|
||||
userBinding: boundUser(),
|
||||
chatSession: db.ChatSession{ID: sessionID, AgentID: validUUID(0x33)},
|
||||
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-stale"): {processed: false, token: validUUID(0xAB)}},
|
||||
dedup: map[string]*fakeDedupRow{seedDedupKey("msg-stale"): {processed: false, token: validUUID(0xAB)}},
|
||||
dedupReclaim: true, // simulates received_at < now() - 60s
|
||||
}
|
||||
chat := &fakeChat{ensureID: sessionID, appendResult: AppendResult{}}
|
||||
|
||||
@@ -336,6 +336,13 @@ func (h *Hub) Run(ctx context.Context) {
|
||||
// returns once those deadlines elapse.
|
||||
func (h *Hub) Wait() {
|
||||
h.wg.Wait()
|
||||
// Supervisors (and thus inbound delivery) have stopped, so no new
|
||||
// run triggers can be scheduled. Drain the debounced pending triggers
|
||||
// before joining replies: the flush may itself emit an offline/archived
|
||||
// notice, and FlushPendingRuns blocks until those finish.
|
||||
if h.dispatcher != nil {
|
||||
h.dispatcher.FlushPendingRuns()
|
||||
}
|
||||
h.replyWg.Wait()
|
||||
}
|
||||
|
||||
|
||||
176
server/internal/integrations/lark/pending_batcher.go
Normal file
176
server/internal/integrations/lark/pending_batcher.go
Normal file
@@ -0,0 +1,176 @@
|
||||
package lark
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// DefaultChatRunBatchWindow is the silence window the inbound debouncer
|
||||
// waits before triggering an agent run for a chat session. Owner-aligned
|
||||
// at 3s on MUL-2968: long enough to absorb a "forward a transcript, then
|
||||
// type a note" burst into one run, short enough that the bot's first
|
||||
// reply is not perceptibly late.
|
||||
const DefaultChatRunBatchWindow = 3 * time.Second
|
||||
|
||||
// stoppableTimer is the slice of *time.Timer the batcher depends on.
|
||||
// Pinned to an interface so unit tests inject a manually-fired fake
|
||||
// instead of sleeping real wall-clock seconds. *time.Timer satisfies it
|
||||
// directly (Stop() bool).
|
||||
type stoppableTimer interface {
|
||||
Stop() bool
|
||||
}
|
||||
|
||||
// pendingBatcher debounces the per-chat_session run trigger. Each inbound
|
||||
// Lark message that lands in a session calls Schedule, which (re)arms a
|
||||
// single timer for that session; when the session goes quiet for the
|
||||
// configured window the latest-registered flush runs exactly once. This
|
||||
// collapses a burst of messages into ONE agent run instead of one run per
|
||||
// message — safe because the chat task reads the WHOLE session history at
|
||||
// run time, so the individually-persisted messages are all visible to the
|
||||
// single run. Only the run TRIGGER is debounced; the chat_message rows,
|
||||
// per-message dedup, and frame ACK already happened synchronously upstream.
|
||||
//
|
||||
// State is in-process only, keyed by chat_session_id (a globally-unique
|
||||
// UUID, so no installation qualifier is needed). The WS lease guarantees a
|
||||
// single active owner per installation, so a session is only ever debounced
|
||||
// by one process. A hard crash inside the window drops the pending trigger
|
||||
// (the messages are already durable in chat_session; they simply do not
|
||||
// fire a run until the next message arrives) — an accepted low-frequency
|
||||
// boundary per MUL-2968 decision 5. Graceful shutdown calls FlushAll so
|
||||
// that boundary is not hit on a normal restart.
|
||||
//
|
||||
// The batcher is goroutine-safe: a single instance is shared across all
|
||||
// supervisor goroutines.
|
||||
type pendingBatcher struct {
|
||||
window time.Duration
|
||||
|
||||
// afterFunc builds a timer that invokes fn after d. Defaults to
|
||||
// time.AfterFunc; tests substitute a fake so flushes are deterministic.
|
||||
afterFunc func(d time.Duration, fn func()) stoppableTimer
|
||||
|
||||
mu sync.Mutex
|
||||
pending map[string]*pendingEntry
|
||||
// seq mints a monotonic generation per (re)schedule. onFire carries
|
||||
// the generation it was armed with and bails if a newer schedule has
|
||||
// superseded it — this fences the classic AfterFunc race where a timer
|
||||
// fires concurrently with the Stop() that was meant to cancel it.
|
||||
seq uint64
|
||||
stopped bool
|
||||
// inflight tracks flush callbacks that are currently executing (timer
|
||||
// already fired) so FlushAll can join them before a graceful shutdown
|
||||
// proceeds.
|
||||
inflight sync.WaitGroup
|
||||
}
|
||||
|
||||
type pendingEntry struct {
|
||||
timer stoppableTimer
|
||||
flush func()
|
||||
gen uint64
|
||||
}
|
||||
|
||||
// newPendingBatcher returns a batcher with the given silence window. A
|
||||
// non-positive window falls back to DefaultChatRunBatchWindow.
|
||||
func newPendingBatcher(window time.Duration) *pendingBatcher {
|
||||
if window <= 0 {
|
||||
window = DefaultChatRunBatchWindow
|
||||
}
|
||||
return &pendingBatcher{
|
||||
window: window,
|
||||
afterFunc: realAfterFunc,
|
||||
pending: make(map[string]*pendingEntry),
|
||||
}
|
||||
}
|
||||
|
||||
// realAfterFunc adapts time.AfterFunc to the stoppableTimer seam.
|
||||
func realAfterFunc(d time.Duration, fn func()) stoppableTimer {
|
||||
return time.AfterFunc(d, fn)
|
||||
}
|
||||
|
||||
// Schedule (re)arms the silence window for key. The most recent flush wins:
|
||||
// only session-level information is needed to fire a run, so keeping the
|
||||
// latest closure (which captures the latest installation/message context
|
||||
// for the offline/archived notice) is sufficient. Calling Schedule after
|
||||
// FlushAll runs the flush inline best-effort rather than silently dropping
|
||||
// it; this only happens on the shutdown race where a message arrives after
|
||||
// the drain has begun.
|
||||
func (b *pendingBatcher) Schedule(key string, flush func()) {
|
||||
b.mu.Lock()
|
||||
if b.stopped {
|
||||
b.mu.Unlock()
|
||||
flush()
|
||||
return
|
||||
}
|
||||
b.seq++
|
||||
gen := b.seq
|
||||
fire := func() { b.onFire(key, gen) }
|
||||
if e, ok := b.pending[key]; ok {
|
||||
// Reset the window: cancel the prior timer and arm a fresh one.
|
||||
// The gen bump means a stale fire from the cancelled timer is a
|
||||
// no-op even if Stop loses the race.
|
||||
e.timer.Stop()
|
||||
e.flush = flush
|
||||
e.gen = gen
|
||||
e.timer = b.afterFunc(b.window, fire)
|
||||
b.mu.Unlock()
|
||||
return
|
||||
}
|
||||
b.pending[key] = &pendingEntry{
|
||||
flush: flush,
|
||||
gen: gen,
|
||||
timer: b.afterFunc(b.window, fire),
|
||||
}
|
||||
b.mu.Unlock()
|
||||
}
|
||||
|
||||
// onFire runs the flush for key if it is still the live, armed generation.
|
||||
// It is the timer callback; in production it runs on time.AfterFunc's own
|
||||
// goroutine, so the flush is naturally detached from the inbound path.
|
||||
func (b *pendingBatcher) onFire(key string, gen uint64) {
|
||||
b.mu.Lock()
|
||||
e, ok := b.pending[key]
|
||||
if !ok || b.stopped || e.gen != gen {
|
||||
// Superseded by a newer Schedule, already flushed, or draining via
|
||||
// FlushAll (which owns this entry now). Do nothing.
|
||||
b.mu.Unlock()
|
||||
return
|
||||
}
|
||||
delete(b.pending, key)
|
||||
flush := e.flush
|
||||
b.inflight.Add(1)
|
||||
b.mu.Unlock()
|
||||
|
||||
defer b.inflight.Done()
|
||||
flush()
|
||||
}
|
||||
|
||||
// FlushAll stops the batcher and runs every still-pending flush exactly
|
||||
// once, then waits for any concurrently-firing flushes to finish. Intended
|
||||
// to be called once from the graceful-shutdown path AFTER inbound delivery
|
||||
// has stopped, so a normal restart does not drop a window's worth of
|
||||
// triggers. After FlushAll the batcher is terminal: later Schedule calls
|
||||
// run inline.
|
||||
func (b *pendingBatcher) FlushAll() {
|
||||
b.mu.Lock()
|
||||
b.stopped = true
|
||||
entries := make([]*pendingEntry, 0, len(b.pending))
|
||||
for _, e := range b.pending {
|
||||
e.timer.Stop()
|
||||
entries = append(entries, e)
|
||||
}
|
||||
b.pending = make(map[string]*pendingEntry)
|
||||
b.mu.Unlock()
|
||||
|
||||
for _, e := range entries {
|
||||
e.flush()
|
||||
}
|
||||
// Join flushes whose timer had already fired before we set stopped.
|
||||
b.inflight.Wait()
|
||||
}
|
||||
|
||||
// pendingCount reports how many sessions currently have an armed window.
|
||||
// Used by tests and useful for ops visibility.
|
||||
func (b *pendingBatcher) pendingCount() int {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
return len(b.pending)
|
||||
}
|
||||
197
server/internal/integrations/lark/pending_batcher_test.go
Normal file
197
server/internal/integrations/lark/pending_batcher_test.go
Normal file
@@ -0,0 +1,197 @@
|
||||
package lark
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// fakeBatchTimer is a manually-fired stand-in for *time.Timer so debounce
|
||||
// behaviour can be asserted without sleeping real wall-clock seconds.
|
||||
type fakeBatchTimer struct {
|
||||
fn func()
|
||||
stopped bool
|
||||
fired bool
|
||||
}
|
||||
|
||||
// Stop mirrors *time.Timer.Stop: returns true only if the timer was still
|
||||
// armed (not already stopped or fired).
|
||||
func (t *fakeBatchTimer) Stop() bool {
|
||||
if t.stopped || t.fired {
|
||||
return false
|
||||
}
|
||||
t.stopped = true
|
||||
return true
|
||||
}
|
||||
|
||||
// fakeTimerFactory hands out fakeBatchTimers and lets a test fire whichever
|
||||
// ones are currently armed — modelling the wall clock advancing past the
|
||||
// window for every pending session at once.
|
||||
type fakeTimerFactory struct {
|
||||
mu sync.Mutex
|
||||
all []*fakeBatchTimer
|
||||
}
|
||||
|
||||
func (f *fakeTimerFactory) after(_ time.Duration, fn func()) stoppableTimer {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
t := &fakeBatchTimer{fn: fn}
|
||||
f.all = append(f.all, t)
|
||||
return t
|
||||
}
|
||||
|
||||
// fireArmed invokes every timer that is armed (not stopped, not already
|
||||
// fired) right now.
|
||||
func (f *fakeTimerFactory) fireArmed() {
|
||||
f.mu.Lock()
|
||||
armed := make([]*fakeBatchTimer, 0, len(f.all))
|
||||
for _, t := range f.all {
|
||||
if !t.stopped && !t.fired {
|
||||
t.fired = true
|
||||
armed = append(armed, t)
|
||||
}
|
||||
}
|
||||
f.mu.Unlock()
|
||||
for _, t := range armed {
|
||||
t.fn()
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeTimerFactory) armedCount() int {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
n := 0
|
||||
for _, t := range f.all {
|
||||
if !t.stopped && !t.fired {
|
||||
n++
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// newTestBatcher builds a batcher whose timers are driven by f. Shared with
|
||||
// dispatcher_test.go (same package) to drive the debounce coalescing test.
|
||||
func newTestBatcher(f *fakeTimerFactory) *pendingBatcher {
|
||||
return &pendingBatcher{
|
||||
window: DefaultChatRunBatchWindow,
|
||||
afterFunc: f.after,
|
||||
pending: make(map[string]*pendingEntry),
|
||||
}
|
||||
}
|
||||
|
||||
func TestPendingBatcher_DebounceCoalesces(t *testing.T) {
|
||||
f := &fakeTimerFactory{}
|
||||
b := newTestBatcher(f)
|
||||
calls := 0
|
||||
flush := func() { calls++ }
|
||||
|
||||
b.Schedule("s", flush)
|
||||
b.Schedule("s", flush)
|
||||
b.Schedule("s", flush)
|
||||
|
||||
if got := b.pendingCount(); got != 1 {
|
||||
t.Fatalf("three Schedules on one session must keep a single pending entry; got %d", got)
|
||||
}
|
||||
if got := f.armedCount(); got != 1 {
|
||||
t.Fatalf("each reschedule must cancel the prior timer, leaving one armed; got %d", got)
|
||||
}
|
||||
|
||||
f.fireArmed()
|
||||
if calls != 1 {
|
||||
t.Fatalf("a debounced burst must flush exactly once; got %d", calls)
|
||||
}
|
||||
if got := b.pendingCount(); got != 0 {
|
||||
t.Fatalf("the session entry must be cleaned up after flush; pending=%d", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPendingBatcher_MultiSessionIndependent(t *testing.T) {
|
||||
f := &fakeTimerFactory{}
|
||||
b := newTestBatcher(f)
|
||||
var a, c int
|
||||
|
||||
b.Schedule("a", func() { a++ })
|
||||
b.Schedule("c", func() { c++ })
|
||||
|
||||
if got := b.pendingCount(); got != 2 {
|
||||
t.Fatalf("two distinct sessions must hold two windows; got %d", got)
|
||||
}
|
||||
f.fireArmed()
|
||||
if a != 1 || c != 1 {
|
||||
t.Fatalf("each session must flush once and not cross-talk; a=%d c=%d", a, c)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPendingBatcher_StaleTimerFireIsNoop(t *testing.T) {
|
||||
// Reproduces the AfterFunc race: a timer fires concurrently with the
|
||||
// Stop() that was meant to cancel it after a reschedule. The
|
||||
// generation guard must make the stale fire a no-op so the burst still
|
||||
// flushes exactly once.
|
||||
f := &fakeTimerFactory{}
|
||||
b := newTestBatcher(f)
|
||||
calls := 0
|
||||
|
||||
b.Schedule("s", func() { calls++ })
|
||||
first := f.all[0]
|
||||
b.Schedule("s", func() { calls++ }) // resets: cancels first, arms a new timer
|
||||
|
||||
// First timer fires anyway despite having been Stop()ed.
|
||||
first.fired = true
|
||||
first.fn()
|
||||
if calls != 0 {
|
||||
t.Fatalf("a superseded timer firing must not flush; got %d", calls)
|
||||
}
|
||||
|
||||
f.fireArmed()
|
||||
if calls != 1 {
|
||||
t.Fatalf("the live timer must still flush exactly once; got %d", calls)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPendingBatcher_FlushAllDrainsPending(t *testing.T) {
|
||||
f := &fakeTimerFactory{}
|
||||
b := newTestBatcher(f)
|
||||
var a, c int
|
||||
|
||||
b.Schedule("a", func() { a++ })
|
||||
b.Schedule("c", func() { c++ })
|
||||
|
||||
b.FlushAll()
|
||||
if a != 1 || c != 1 {
|
||||
t.Fatalf("FlushAll must flush every pending session once; a=%d c=%d", a, c)
|
||||
}
|
||||
if got := b.pendingCount(); got != 0 {
|
||||
t.Fatalf("FlushAll must clear pending state; got %d", got)
|
||||
}
|
||||
|
||||
// After FlushAll the batcher is terminal: a later Schedule runs inline
|
||||
// rather than silently dropping (the shutdown-race best-effort path).
|
||||
ran := false
|
||||
b.Schedule("d", func() { ran = true })
|
||||
if !ran {
|
||||
t.Fatalf("Schedule after FlushAll must run inline")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewPendingBatcher_DefaultsWindow(t *testing.T) {
|
||||
if b := newPendingBatcher(0); b.window != DefaultChatRunBatchWindow {
|
||||
t.Fatalf("non-positive window must default to %v; got %v", DefaultChatRunBatchWindow, b.window)
|
||||
}
|
||||
if b := newPendingBatcher(500 * time.Millisecond); b.window != 500*time.Millisecond {
|
||||
t.Fatalf("explicit window must be honoured; got %v", b.window)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPendingBatcher_RealTimerFlushes(t *testing.T) {
|
||||
// Exercises the production afterFunc (time.AfterFunc) path with a short
|
||||
// real window so a mis-wired default would be caught.
|
||||
b := newPendingBatcher(15 * time.Millisecond)
|
||||
done := make(chan struct{})
|
||||
b.Schedule("s", func() { close(done) })
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("real-timer flush did not fire within 2s")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user