Files
multica/server/internal/integrations/lark/hub_test.go
Bohan Jiang 8c98940b79 Lark Bot integration MVP: migration + service boundary (MUL-2671) (#3277)
* feat(db): add Lark integration migration (MUL-2671)

Introduces seven tables for the 飞书 Bot integration MVP — per-agent
PersonalAgent installations, user/chat bindings, inbound dedup +
non-content drop audit, outbound card mapping, and short-lived
single-use member binding tokens.

Schema notes:
- chat_session schema unchanged; Lark routes through a separate
  binding table rather than adding a metadata JSONB column.
- Outbound card mapping is task/message scoped so multiple runs on
  the same session can't stomp each other's cards.
- lark_inbound_audit stores routing / identity / drop_reason ONLY,
  never message body — the audit channel for unbound users and group
  messages that don't address the Bot.
- app_secret stores ciphertext (encryption helper lands in a follow-up
  commit on this branch); DB never sees plaintext.

Co-authored-by: multica-agent <github@multica.ai>

* feat(util): add secretbox AES-256-GCM helper for at-rest secrets

First consumer is lark_installation.app_secret (MUL-2671 §4.4), but
the helper is intentionally generic — future per-tenant secrets that
must not appear in a DB dump can reuse it.

Construction: AES-256-GCM with a per-message random nonce, providing
authenticated encryption. Tampered ciphertext fails Open instead of
silently decrypting to garbage. Master key loaded from a base64 env
var via LoadKey; key rotation is not in scope yet.

Co-authored-by: multica-agent <github@multica.ai>

* refactor(issues): extract IssueService.Create as single create entry (MUL-2671)

Establishes the service-layer boundary mandated by Elon's 二审 of
MUL-2671 §4.8: issue creation no longer lives inside the HTTP
handler. Both the HTTP POST /issues handler and the future Lark
/issue command call into service.IssueService.Create, so duplicate
guard, issue numbering, attachment linking, broadcast, analytics,
and agent/squad enqueue stay aligned.

Handler responsibilities shrink to parsing the HTTP request, doing
actor resolution / validation (transport-specific), and converting
service results into the IssueResponse + 201. The transaction-wrapped
core, attachment link, event publish, analytics capture, and
agent/squad enqueue all move into service.IssueService.Create.

A BroadcastPayload callback on the service keeps the WS broadcast
shape (the full IssueResponse) without forcing the service to
depend on handler-layer response types.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations): add Lark package skeleton (MUL-2671)

Establishes the architectural boundaries Elon's 二审 mandated as
first-PR blockers without dragging in OAuth, WebSocket, or
card-patching code (those land in follow-up PRs):

- ChatSessionService interface — channel-aware chat-session entry
  point for Lark, deliberately separate from the HTTP SendChatMessage
  handler. The HTTP handler's single-creator guard (creator_id ==
  request user_id) is correct for the browser client but rejects
  group chat_sessions by construction; Lark needs its own service.
- AuditLogger interface — the only path for recording dropped events.
  Its signature deliberately omits message body, enforcing the
  drop-audit policy (MUL-2671 §4.7) at the type level: unbound users
  and non-addressed group messages can't accidentally end up in
  chat_session.
- Typed IDs (OpenID, ChatID) prevent UUIDs from being conflated with
  Lark-side identifiers at compile time.
- DropReason constants align dashboard/audit queries across callers.

Co-authored-by: multica-agent <github@multica.ai>

* refactor(issues): move parent/project workspace check into IssueService (MUL-2671)

Parent existence and project workspace membership now live inside
IssueService.Create, inside the same transaction as the duplicate guard
and counter increment. The HTTP handler stops re-implementing the
lookup; every future create entry (Lark /issue, MCP, API keys) inherits
the same boundary without copy-pasting the SQL.

Adds two error sentinels (ErrParentIssueNotFound, ErrProjectNotFound)
so transports can translate to their own error shapes. Handler-level
cross-workspace tests guard the boundary against future regressions.

Co-authored-by: multica-agent <github@multica.ai>

* fix(db): harden Lark migration safety底座 — TTL cap + workspace FK (MUL-2671)

Two storage-layer hardenings that move the must-fix line off "the app
layer enforces it" and onto the schema itself, so future write paths or
hand-inserted rows cannot regress the invariants.

1) lark_binding_token TTL cap. The DB CHECK was 1 hour as
   defense-in-depth while the app constant was 15 minutes; the CHECK
   now matches the product cap (15 minutes). Application constant
   docstring updated to reflect that storage enforces the same bound.

2) lark_user_binding workspace membership. The table previously only
   FK'd to workspace / user / installation independently, so a binding
   could exist for a user no longer in the workspace, or claim a
   workspace different from its installation's. Two composite FKs
   close the gap structurally:

   * (installation_id, workspace_id) → lark_installation(id, workspace_id)
     — guarantees a binding's workspace_id always matches its
     installation's workspace_id. A new UNIQUE (id, workspace_id) on
     lark_installation is added as the FK target.

   * (workspace_id, multica_user_id) → member(workspace_id, user_id)
     with ON DELETE CASCADE — when a user is removed from the
     workspace, the binding cascades away in the same transaction.
     There is no longer a path where lark_user_binding outlives
     workspace membership.

These two FKs are the schema-level proof for §4.3's "unbound or
non-workspace members cannot leak content into chat_session" invariant.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): inbound services + /issue dispatcher (MUL-2671)

Lands the inbound service layer for the Lark Bot MVP, sitting on top
of the migration + service-boundary scaffold from the previous commits.
What ships:

- sqlc queries for all seven lark_* tables (idempotent dedup insert,
  CAS WS-lease, single-use binding-token consume, etc.) plus
  GetMostRecentUserChatMessage for the /issue fallback.
- AuditLogger backed by lark_inbound_audit; signature deliberately
  body-free so callers cannot leak content into the drop log.
- ChatSessionService: find-or-create chat_session via the binding
  table (winner-takes-all on the UNIQUE race), append-with-dedup, /issue
  parser, "previous user message" fallback for bare `/issue` invocation.
- Dispatcher orchestrates the inbound pipeline in one place:
  installation routing → group-mention filter → identity check → ensure
  session → append+dedup → /issue → enqueue chat task. Group sessions
  use the installer as creator (stable workspace identity); p2p uses
  the sender. Agent-offline path falls through with OutcomeAgentOffline
  so the WS adapter can reply with the offline notice from §4.6.
- BindingTokenService: random URL-safe token, SHA-256 stored hash,
  15-min TTL pinned at the application AND the DB CHECK; Redeem
  returns the same opaque error for all rejection cases (no timing
  oracle on replay).
- Unit tests for the parser (13 cases), dispatcher (8 cases via fake
  Queries/Chat/Audit/IssueCreator/Enqueuer), and binding-token
  hash/entropy. Real-DB integration tests for OAuth + token redeem
  land alongside the HTTP handlers in the next commit.

Out of scope for this commit (next ones on the same feature branch):
OAuth callback, HTTP routes, WebSocket hub, outbound card patcher,
frontend.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): installation HTTP surface + secretbox-gated wiring (MUL-2671)

Lands the HTTP boundary on top of the inbound services from the
previous commit. What ships:

- InstallationService.Upsert: the only path that writes
  lark_installation. Encrypts app_secret with the secretbox passed in
  at construction time; refuses to fall back to plaintext storage
  (returns an error from the constructor if no Box is supplied), so a
  misconfigured dev environment cannot accidentally land a row with
  cleartext credentials. Revoke flips status without DELETE so audit
  trail survives.

- HTTP handlers under /api/workspaces/{id}/lark/:
  * GET  /installations           — member-visible (Integrations tab
                                    renders for non-admins). Soft 200
                                    with empty list + configured:false
                                    when MULTICA_LARK_SECRET_KEY is
                                    unset, so the tab does not error
                                    on self-host that has not opted in.
  * POST /installations           — admin-only; 503 when not configured.
                                    Re-validates agent_id ∈ workspace
                                    before accepting credentials so a
                                    cross-workspace agent UUID is
                                    rejected.
  * DELETE /installations/{id}    — admin-only; workspace-scoped lookup
                                    so one workspace cannot revoke
                                    another's installation by UUID
                                    guess.

- POST /api/lark/binding/redeem (user-scoped, no workspace context):
  the only path that mints a lark_user_binding row from user action.
  Redeemer identity comes from the session, not the token, so a stolen
  link cannot bind an open_id to an attacker's Multica user. The
  composite FK on lark_user_binding cascades the binding away if the
  user is not (or no longer) a workspace member, so a non-member who
  steals the link gets 403 at the DB layer.

- Two new event-bus types in protocol.events:
  EventLarkInstallationCreated, EventLarkInstallationRevoked.

- Router wiring: MULTICA_LARK_SECRET_KEY drives a conditional
  initialization of h.LarkInstallations + h.LarkBindingTokens. When
  unset, the integration disables itself with an INFO log and the
  rest of the server boots normally.

- Handler tests cover all four not-configured short-circuits.
  Happy-path integration tests (real DB, full create→list→revoke
  cycle and token mint→redeem) ship alongside the WS hub PR.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): close binding-token rebind & typed task errors (MUL-2671)

Two must-fixes from PR review on HEAD 87ad15e1:

1. Binding-token redeem could be used to grab an already-bound Lark
   open_id. Two changes harden the path:
   - lark.sql `CreateLarkUserBinding` now gates ON CONFLICT DO UPDATE
     on `multica_user_id = EXCLUDED.multica_user_id`, so a cross-user
     rebind via a second valid token returns zero rows instead of
     silently switching ownership.
   - `BindingTokenService.RedeemAndBind` consumes the token and writes
     the binding row inside one transaction. A failed bind no longer
     burns the token; a successful bind never leaves a consumed-but-
     unused token. Distinct typed errors: ErrBindingTokenInvalid (410),
     ErrBindingAlreadyAssigned (409), ErrBindingNotWorkspaceMember
     (403). The handler maps each to its own status code.

2. Dispatcher collapsed every `EnqueueChatTask` error to
   `OutcomeAgentOffline`, hiding infra failure and misusing the
   "offline" label for cases (e.g. archived agent) where it doesn't
   fit. Now:
   - `service.EnqueueChatTask` returns `ErrChatTaskAgentNoRuntime` and
     `ErrChatTaskAgentArchived` as sentinel errors; DB / load / insert
     failures stay wrapped as ordinary errors.
   - Dispatcher uses `errors.Is` to map only the productizable cases
     (`OutcomeAgentOffline`, new `OutcomeAgentArchived`); any other
     error is returned to the WS adapter so it can retry or page
     instead of disguising the outage as an offline card.

A daemon that's merely disconnected is still NOT an error — as long
as `agent.runtime_id` is set the chat task enqueues and waits for the
daemon to claim it on next online (returns `OutcomeIngested`).

Co-authored-by: multica-agent <github@multica.ai>

* ci: re-trigger workflow on lark MVP must-fix HEAD

Co-authored-by: multica-agent <github@multica.ai>

* ci: re-trigger workflow on lark MVP must-fix HEAD (retry)

Co-authored-by: multica-agent <github@multica.ai>

* test(integrations/lark): guard binding-token sentinel contract (MUL-2671)

Two unit tests that document and protect the must-fix invariants
without requiring a DB:

1. TestRedeemAndBindRequiresTxStarter — if a future refactor wires
   up BindingTokenService without a TxStarter, RedeemAndBind must
   fail fast with a clear error rather than nil-panic on Begin.
   The atomicity contract (consume + bind commit together) depends
   on that transaction existing.

2. TestBindingErrorSentinelsAreDistinct — the HTTP handler maps
   ErrBindingTokenInvalid → 410, ErrBindingAlreadyAssigned → 409,
   ErrBindingNotWorkspaceMember → 403. Accidentally aliasing them
   (e.g. var ErrBindingAlreadyAssigned = ErrBindingTokenInvalid)
   would silently regress the response codes without any other
   test catching it.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): WS hub orchestrator + outbound card patcher (MUL-2671)

The hub owns one supervisor goroutine per active installation. Each
supervisor acquires the WS lease via the existing CAS query, runs an
EventConnector (interface — real Lark wire protocol lands in a follow-up
behind it), renews the lease on a tighter cadence than the TTL, and
backs off (with jitter) on connector failure. Lease loss tears the
connector down cleanly; revocation is reaped on the next sweep. Per-
process node id satisfies §4.4 multi-replica safety: at most one Hub
globally holds the lease for any installation.

The patcher subscribes to task / chat-done events on the existing
events.Bus and keeps the per-task Lark interactive card in sync
(thinking → streaming → final | error). Card binding is per-task as
required by §4.5; throttled patches via an in-memory last-patched map;
final / error transitions bypass the throttle so the user always sees
the terminal state. The Renderer is plug-replaceable so the product
card template can evolve without touching transport.

The APIClient interface centralizes the Lark Open Platform surface
this package needs (send card, patch card, send binding prompt,
exchange OAuth code). The default stubAPIClient returns
ErrAPIClientNotConfigured for every transport call so a misconfigured
deployment fails loudly instead of dropping cards silently. Real
implementation lands in a follow-up; OAuth callback + frontend entries
land in the next commits on this branch.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): OAuth install start / callback (MUL-2671)

OAuthService builds a signed-state Lark authorization URL the frontend
can render as a QR (or open directly), then on callback verifies the
HMAC-protected state, exchanges the OAuth code for installation
credentials via APIClient.ExchangeOAuthCode, and persists the row via
InstallationService.Upsert (which keeps app_secret encryption inside a
single chokepoint).

State token format: workspaceID.agentID.initiatorID.expiresUnix.nonce.sig
— HMAC-SHA256 over the first five fields with a deployment-level
secret. TTL defaults to 10 minutes (covered by tests). Three failure
modes (invalid state / expired state / missing code) map to typed
errors so the HTTP handler can emit a single lark_error= query param
the frontend uses to pick copy.

Both endpoints degrade cleanly: the at-rest key gate (already in place)
returns 503 from /install/start when the InstallationService is nil,
and the OAuth gate (MULTICA_LARK_OAUTH_APP_ID / _SECRET / _REDIRECT_URI
/ _STATE_SECRET) returns configured:false from /install/start so the
frontend can render "configure manually instead" without an error
banner. /install/callback always finishes with a redirect to
/settings?tab=lark carrying either lark_installed=1 or lark_error=<code>.

Tests cover signed-URL shape, missing-config rejection, tampered state,
expired state, propagated exchange error, and the no-config redirect
path on the HTTP handler.

Co-authored-by: multica-agent <github@multica.ai>

* feat(views/lark): settings tab + agent bind button + /lark/bind redemption page (MUL-2671)

Adds the user-facing Lark surface across the shared packages:

- packages/core/types/lark.ts — wire shapes that mirror server/internal/
  handler/lark.go. Optional fields default to undefined so older desktop
  builds keep parsing if the server adds new keys (CLAUDE.md → API
  Response Compatibility).
- packages/core/lark/{queries,index}.ts — Tanstack Query options keyed
  by workspace id; realtime sync invalidates `installations(wsId)` on
  `lark_installation:*` events.
- packages/core/api/client.ts — listLarkInstallations,
  getLarkInstallURL, deleteLarkInstallation, redeemLarkBindingToken.
- packages/views/settings/components/lark-tab.tsx — Settings → Lark
  panel. Listing is member-visible (matches backend); disconnect is
  admin-only. Empty state points users at the per-Agent bind entry,
  matching the (workspace_id, agent_id) UNIQUE: there is no
  "pick an agent" UI here because the bind URL is per-agent.
- LarkAgentBindButton (same file) is the per-Agent CTA the Agent
  detail page imports. Opens the OAuth URL in a new tab; the callback
  bounces back to /settings?tab=lark with a query param the panel
  reads for inline confirmation copy.
- packages/views/lark/bind-page.tsx — the Bot's "you need to bind"
  destination. Requires session before redeeming, distinguishes the
  410/409/403 backend responses into distinct copy.
- apps/web/app/lark/bind/page.tsx — Next.js route wrapping the shared
  bind page in a Suspense boundary (Next 15 useSearchParams rule).

i18n: all user-facing strings land in en/zh-Hans, settings tab nav
includes a Sparkles-iconed Lark entry, bind-page copy lives under
common.lark_bind so it works pre-workspace-context too. typecheck +
lint clean.

Co-authored-by: multica-agent <github@multica.ai>

* chore(integrations/lark): wire outbound Patcher into server bootstrap (MUL-2671)

Constructs the Patcher next to the existing Installation/BindingToken
wiring in router.go and Register()s it on the event bus. With the stub
APIClient any actual transport call surfaces ErrAPIClientNotConfigured;
once the real Lark client lands, swap NewStubAPIClient for the real
implementation here without touching the Patcher's subscription logic.

doc.go updated to reflect everything the package now contains (Hub,
Patcher, OAuthService, APIClient interface). The Hub itself is NOT
booted here yet — it needs an EventConnector implementation for the
Lark long-connection wire protocol, which lands in a follow-up; the
orchestrator code and its unit tests are in place so that follow-up
can focus on the WS protocol rather than lifecycle plumbing.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): address Elon 二审 5 must-fix items (MUL-2671)

- Hub: renewer cancels run ctx on lease loss so the connector exits
  even if its wire I/O is blocked, keeping the §4.4 ownership
  invariant intact under lease theft.
- Hub: EventEmitter returns (DispatchResult, error) so the real
  connector can post the matching Lark-side card (needs_binding,
  agent_offline, agent_archived) and react to infra failures instead
  of silently logging at the seam.
- Dispatcher: top-level message_id dedup runs before group filter
  and identity check, so a reconnect storm cannot re-fire binding
  prompts or re-spam not_addressed_in_group audit rows; the in-
  AppendUserMessage dedup is removed since the table-level UNIQUE
  is the ultimate backstop.
- OAuth: HandleCallback auto-binds the installer via the new
  InstallerBinder seam (BindingTokenService implements it), so the
  §2.1 "scan to bind, you're done" promise holds end-to-end.
  validateExchangeResult now requires installer open_id; new error
  reason codes wired through the callback redirect.
- Frontend / handler: install_supported listing field + StartLark-
  Install short-circuit on stub APIClient hide install entry points
  (Settings tab + per-agent button) while no real Lark HTTP client
  is wired, so users do not land in an OAuth flow that fails at
  exchange.

Includes tests for each fix (lease-loss cancel, emit error
propagation, dedup ordering, OAuth installer-bind contract, stub-
client install gate) and i18n strings for the new preview state.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): two-phase dedup so infra failures do not swallow messages (MUL-2671)

The pre-fix top-level dedup wrote the lark_inbound_message_dedup row before
EnsureChatSession / AppendUserMessage. An infra error in either step left
the row in place and a WS-adapter retry was mis-classified as a duplicate,
so the user's Lark message was permanently lost without ever landing in
chat_session.

Make dedup two-phase:

  - ClaimLarkInboundDedup acquires an in-flight claim (processed_at NULL).
    Stale claims older than 60 s are re-takeable so a process crash does
    not strand the message_id.
  - MarkLarkInboundDedupProcessed flips processed_at on durable success
    (audit row OR chat_message + session touch).
  - ReleaseLarkInboundDedup deletes the in-flight row on infra failure
    before any durable side effect, so the retry can re-claim immediately.

Dispatcher.Handle now finalizes the claim exactly once based on whether
the inner pipeline reached a durable outcome — chat_message commit being
the transition point (errors past it Mark, errors before it Release).

Regression tests cover the two failure variants Elon flagged plus the
inverse invariants (durable-error Marks, drops Mark, in-flight replays
drop, stale claims re-claim).

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): owner-fence dedup claim to close the double-write windows (MUL-2671)

The two-phase Claim/Mark/Release fix from the previous commit closed the
"infra error swallows a replay" gap but left two windows that could still
write a chat_message twice for the same Lark message_id:

  1. Stale-reclaim race. Worker A claims at t=0, runs slowly past the
     60 s staleness TTL but is still alive. Worker B sees the row as
     stale and re-takes the claim. A reaches AppendUserMessage and
     commits a second chat_message.

  2. Mark window. Worker A commits chat_message but the post-pipeline
     MarkLarkInboundDedupProcessed fails (DB hiccup) or the process
     crashes before it runs. 60 s later a retry treats the in-flight
     row as stale, re-claims it, and writes a second chat_message.

Close both with owner fencing + same-tx Mark:

  - lark_inbound_message_dedup now carries a `claim_token` UUID;
    ClaimLarkInboundDedup mints a fresh one on insert and on stale
    re-take, so a reclaim ROTATES the token.

  - MarkLarkInboundDedupProcessed and ReleaseLarkInboundDedup are
    fenced on (message_id, claim_token, processed_at IS NULL) and
    return rowsAffected. Zero means our token is no longer live, and
    the caller treats it as a no-op (not an error).

  - AppendUserMessage invokes MarkLarkInboundDedupProcessed INSIDE its
    chat_message+session tx (qtx). If the token has been rotated by a
    concurrent reclaim, the Mark matches zero rows and the method
    returns ErrClaimLost; the deferred Rollback unwinds the
    chat_message insert, so the other holder is the sole writer. The
    durable write and the Mark therefore commit (or roll back)
    atomically — there is no "committed but not yet Marked" window
    for a crash or retry to exploit.

Dispatcher.processClaimed now returns a tri-state dedupFinalize directive
(none / mark / release): finalizeNone for the in-tx Mark path (and
ErrClaimLost), finalizeMark for audit-drop branches and the defensive
post-Append-success fallback, finalizeRelease for pre-durable infra
errors. ErrClaimLost is translated into OutcomeDropped + DropReason-
Duplicate at the Handle boundary, matching what the WS adapter expects
for a "another worker is the writer" outcome.

Regression tests:

  - TestDispatcher_StaleReclaimRaceDoesNotDoubleWrite injects worker
    B's reclaim via a beforeAppend hook so the claim_token rotates
    between Claim and AppendUserMessage. Asserts worker A's
    AppendUserMessage returns ErrClaimLost (no chat_message
    committed), the dispatcher surfaces a duplicate drop, the token
    rotated to a value distinct from A's original, and a follow-up
    replay still duplicate-drops.

  - TestDispatcher_InTxMarkPreventsPostCommitReclaim verifies the
    "Mark window" case is unreachable: a successful in-tx Mark
    produces exactly one Mark call (no post-finalize duplicate), the
    row is terminal, and a retry with dedupReclaim=true still
    duplicate-drops without re-rotating the token.

  - TestDispatcher_InTxMarkSucceedsAndSkipsPostFinalize pins the
    positive contract: DedupMarked=true must make applyFinalize a
    no-op (no extra Mark, no Release).

fakeQueries gains a fakeDedupRow model carrying (processed, token,
rotations) so the test seam matches production's UPDATE-with-WHERE
semantics; fakeChat gains a beforeAppend hook to inject race timing.

go test ./... and go vet ./... pass.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): real Lark HTTP APIClient for IM v1 send/patch (MUL-2671)

Lands the production Lark Open Platform HTTP APIClient that replaces
the stub for outbound transport. The patcher's "thinking → streaming
→ final | error" card lifecycle and the dispatcher's binding-prompt
card both now reach Lark for real once MULTICA_LARK_HTTP_ENABLED=true.

Scope of this stage:

  - tenant_access_token retrieval via /open-apis/auth/v3/
    tenant_access_token/internal, cached in-process per app_id with a
    60s safety margin against Lark's `expire` value. Sub-2-minute
    expires are clamped to 120s so we never cache an entry that's
    already past its safe window.
  - SendInteractiveCard: POST /open-apis/im/v1/messages?receive_id_type=chat_id
    returning the Lark message_id the Patcher persists in
    lark_outbound_card_message for later patches.
  - PatchInteractiveCard: PATCH /open-apis/im/v1/messages/:id with
    the full re-rendered card body (Lark's update endpoint replaces,
    not deep-merges).
  - SendBindingPromptCard: open_id-targeted interactive card with a
    primary "去绑定" CTA pointing at the redemption URL. Template is
    co-located with the transport so the dispatcher never has to know
    about Lark's card schema.
  - Token-error invalidation: Lark codes 99991663 (expired) /
    99991664 (invalid) drop the cached token so the next call
    refreshes from /tenant_access_token/internal instead of looping
    on a stale entry.

Out of scope (deferred to follow-up stages):

  - ExchangeOAuthCode stays unimplemented behind
    ErrAPIClientNotConfigured. The PersonalAgent install handshake's
    response shape (returning per-installation app credentials in a
    single call) is not yet verified against the production endpoint,
    and a silent mis-fill of OAuthExchangeResult would corrupt
    lark_installation rows past validateExchangeResult. Operators
    continue to use the manual-paste InstallationService path until
    the OAuth stage lands.
  - Inbound WS EventConnector — Hub's ConnectorFactory still needs a
    real wire-protocol implementation.

Wiring:

  - MULTICA_LARK_HTTP_ENABLED=true switches router.go from the stub
    to the real client. MULTICA_LARK_HTTP_BASE_URL overrides the
    default open.feishu.cn host (set to open.larksuite.com for the
    Lark international tenant, or to an httptest URL for integration
    tests).
  - The OAuth handler now also receives the real client (its
    ExchangeOAuthCode still surfaces ErrAPIClientNotConfigured, so
    callback behavior is unchanged until that stage lands).

Tests (19 new cases against an httptest.Server fake):

  - happy path send/patch/binding-prompt round trips, asserting URL
    query params, body shape, Authorization header
  - token cache: 3 sends share one /tenant_access_token/internal hit
  - token refresh after clock-driven expiry
  - sub-margin expire clamping (10s expire → cached for >= safety
    margin of wall-clock)
  - Lark error code surfacing (230001 send, 230002 patch, 10003 auth)
  - token-expired (99991663) invalidates the cache; caller's retry
    re-fetches and succeeds
  - non-2xx HTTP status surfaces "http 500: …"
  - input validation: missing chat_id short-circuits BEFORE auth
    round-trip, missing card json / open_id / bind url all fail
    pre-flight without hitting Lark
  - ExchangeOAuthCode still returns ErrAPIClientNotConfigured
  - binding-prompt template carries the BindURL and the localized
    "去绑定" CTA in valid JSON

go build ./..., go vet ./..., and go test ./internal/integrations/lark/...
pass. Pre-existing handler/router integration tests that require a
real Postgres connection are unaffected by this change.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): split outbound vs OAuth-install capability + card update_multi (MUL-2671)

Address Elon's two must-fix items from the HEAD a09993b1 review:

1. HTTP outbound and OAuth-install are now distinct APIClient
   capabilities. The new SupportsOAuthInstall() reports whether the
   install flow can succeed end-to-end (i.e. ExchangeOAuthCode is
   implemented); the real httpAPIClient still returns IsConfigured()
   = true (send / patch / binding prompt work) but
   SupportsOAuthInstall() = false until the PersonalAgent install-time
   response shape is pinned. Handler-side `install_supported` and
   StartLarkInstall now gate on SupportsOAuthInstall, so a half-wired
   client never reveals the scan-to-bind UI. larkOAuthErrorReason also
   maps ErrAPIClientNotConfigured to a dedicated
   `oauth_exchange_unimplemented` reason so a raw callback hit no
   longer masquerades as `internal_error`.

2. defaultRenderer now emits config.update_multi=true on every Kind.
   Lark refuses to apply PatchInteractiveCard to a card whose initial
   config doesn't declare it shared/updatable, so the absent flag
   would make every patch after the first send silently no-op on the
   wire while the local outbound status row still flipped to
   streaming/final.

Tests cover both halves of each fix:
- TestHTTPClient_SupportsOAuthInstall_FalseUntilExchangeLands +
  TestHTTPClient_StubReportsBothCapabilitiesFalse pin the new
  capability surface.
- TestStartLarkInstall_TransportOnlyClientReportsNotConfigured +
  TestListLarkInstallations_TransportOnlyClientReportsInstallNotSupported
  pin the handler gate at exactly the half-wired state.
- TestLarkOAuthErrorReason_APIClientNotConfigured pins the mapping
  for both the bare sentinel and the fmt.Errorf-wrapped form
  HandleCallback produces.
- TestDefaultRendererConfigCarriesUpdateMulti covers every CardKind.
- TestHTTPClient_(Send|Patch)InteractiveCard_DefaultRendererBodyHasUpdateMulti
  verify the wire body Lark actually receives carries update_multi
  through both send and patch transport paths.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): real OAuth code exchange + agent-detail bind entry (MUL-2671)

Stages the install side of the MVP critical path on top of the real
HTTP outbound work:

- httpAPIClient.ExchangeOAuthCode runs the production Lark v2 OAuth
  flow: POST /authen/v2/oauth/token to swap the authorization code
  for the installer's open_id, then GET /bot/v3/info under the parent
  app's tenant_access_token to fetch bot_open_id. Result feeds
  InstallationParams unchanged so OAuthService.HandleCallback's
  auto-bind step lights up automatically.

- HTTPClientConfig gains OAuthAppID/OAuthAppSecret, read from the same
  MULTICA_LARK_OAUTH_APP_ID/_APP_SECRET env vars the OAuthConfig
  consumes. SupportsOAuthInstall now mirrors that pair so the install
  capability gate is honest: outbound transport without OAuth creds
  reports configured-but-not-install-supported, exactly like before.

- Agent detail inspector wires the LarkAgentBindButton in a new
  Integrations section, viewer-hidden by canEdit. The button still
  self-hides when SupportsOAuthInstall is false, so a deployment
  without OAuth creds renders the section empty rather than CTA-broken.

- Capability wording cleaned across handler / router / lark-tab to say
  "OAuth-install capability" instead of "real APIClient wired", and
  the misleading TransportOnly... test was renamed/refocused on the
  early-return branch it actually exercises (Elon non-blocking note).

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): identity-only OAuth + atomic bind (MUL-2671)

Addresses Elon's round-4 must-fix items on PR #3277:

1. OAuth v2 token → user_info chain now matches Lark's official
   user-OAuth shape. `httpAPIClient.ExchangeOAuthCode` POSTs
   /open-apis/authen/v2/oauth/token (RFC 6749: top-level
   access_token, NO open_id), then GETs /open-apis/authen/v1/user_info
   with the user_access_token as Bearer to obtain the installer's
   open_id / union_id. The test fixture now reflects the real
   wire shape (separate user_info handler; no synthetic open_id in
   the token response).

2. `OAuthExchangeResult` is identity-only — drops the synthesized
   shared-parent AppID / AppSecret / BotOpenID return that broke
   the UNIQUE(app_id) constraint and the dispatcher's per-app_id
   routing. `OAuthService.HandleCallback` no longer Upserts an
   installation row: it looks up the lark_installation already
   provisioned via the manual-paste POST /lark/installations route
   and binds the installer onto it. Two new typed errors —
   ErrInstallationNotProvisioned and ErrInstallationRevoked — map
   to `installation_not_provisioned` / `installation_revoked`
   reasons at the HTTP boundary so the UI can guide the admin.
   The PersonalAgent install API (which would deliver
   per-installation bot credentials at scan time) remains a
   follow-up; until it lands the OAuth flow is identity-binding
   only and the agent-detail bind button stays hidden on
   deployments without OAuth env (capability gate unchanged).

3. The installation lookup + installer bind run inside a single
   DB transaction so a concurrent revoke / re-provision between
   the read and the binding insert cannot leak a half-applied
   state. `InstallerBinder.BindInstaller` is renamed to
   `BindInstallerTx` and accepts the OAuth-service-owned
   transaction's qtx; the binding_token redemption path is
   unchanged.

4. `validateExchangeResult` is simplified to require only the
   installer's open_id; the obsolete ErrExchangeMissingAppID /
   AppSecret / BotOpenID sentinels are removed (no caller can
   trip them now). The oauth_test suite is rewritten to use a
   stub failTxStarter so tests covering state-token verification
   and exchange-error propagation remain DB-free, while a new
   TestOAuthCallbackOpensTxAfterValidExchange pins the post-must-fix
   order (state ok + exchange ok ⇒ Begin runs before any lookup
   or bind, and a Begin failure aborts cleanly with no bind).

Verified locally:
  - go build ./... / go vet ./... clean
  - go test ./internal/integrations/lark/... ✓
  - go test ./internal/handler -run 'Lark|Binding|OAuth' ✓
  - go test ./internal/util/secretbox/... ./internal/service/... ✓

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): device-flow scan-to-install (MUL-2671)

Replaces the manual paste-credentials install path + identity-only
OAuth callback (rejected in product review: too many steps before a
user sees value) with a true single-step scan-to-install built on
Lark's RFC 8628 device-flow registration endpoint
(POST accounts.feishu.cn/oauth/v1/app/registration) — the same
protocol the official larksuite/oapi-sdk-go/scene/registration
package and zarazhangrui/feishu-claude-code-bridge use.

User journey: admin clicks "Bind to Lark" on the Agent detail page
→ QR dialog opens → admin scans in the Lark app on their phone →
authorizes the new PersonalAgent → dialog auto-closes with the new
installation visible. No app_id / app_secret to copy, no Lark
developer console visit, no Multica-side OAuth env to configure.

Backend (server/internal/integrations/lark):
- registration.go — inline ~280-line RFC 8628 client. Begin posts
  archetype=PersonalAgent / auth_method=client_secret /
  request_user_info=open_id; Poll follows the upstream SDK's
  state machine including the tenant-brand mid-stream domain swap
  to accounts.larksuite.com when a Lark-international account
  authorizes. SDK is NOT vendored — one endpoint isn't worth
  dragging the full oapi-sdk-go + transitive deps.
- registration_service.go — owns the in-process session store
  + background polling goroutine. On success calls APIClient.GetBotInfo
  (the new IM-side endpoint added below) and writes
  lark_installation + the installer's lark_user_binding inside
  one DB transaction so a half-applied install can never land.
  Stable error_reason codes (expired / access_denied /
  lark_protocol_error / bot_info_failed / installation_conflict /
  installer_bind_failed / internal_error) drive the UI copy
  without parsing prose.
- client.go / http_client.go — drops ExchangeOAuthCode and
  SupportsOAuthInstall (no longer applicable: device-flow returns
  identity alongside credentials in one response); adds GetBotInfo
  which mints a tenant_access_token from the freshly-minted
  client_id / client_secret and calls /open-apis/bot/v3/info for
  the bot_open_id. install_supported now gates on IsConfigured()
  (real HTTP client wired) instead of a separate OAuth capability.
- binding_token.go — absorbs InstallerBindParams / InstallerBinder
  (previously in oauth.go), retargets the doc-comment from the
  OAuth caller to the device-flow caller.
- Deletes oauth.go + oauth_test.go entirely.

Handler & router (server/internal/handler, server/cmd/server):
- POST /api/workspaces/{id}/lark/install/begin — opens a new
  registration session, returns {session_id, qr_code_url,
  expires_in_seconds, poll_interval_seconds}. Admin-only.
- GET /api/workspaces/{id}/lark/install/{sessionId}/status —
  polling endpoint, returns {status, installation_id?, error_reason?,
  error_message?}. Workspace-scoped lookup so a stolen session_id
  cannot be polled from another workspace. Admin-only.
- Removes POST /lark/installations (paste form),
  GET /lark/install/start (OAuth-redirect entry), and
  GET /api/lark/install/callback (OAuth redirect target).
- Removes MULTICA_LARK_OAUTH_APP_ID / _APP_SECRET / _REDIRECT_URI /
  _STATE_SECRET / _AUTHORIZE_URL / _SUCCESS_URL env vars. Self-host
  operators no longer need a parent Lark app at all.

Frontend (packages/core, packages/views):
- New types BeginLarkInstallResponse / LarkInstallStatusResponse
  + matching API methods (beginLarkInstall / getLarkInstallStatus);
  drops getLarkInstallURL.
- LarkAgentBindButton opens LarkInstallDialog instead of a
  window.open() to Lark's authorize page. The dialog uses
  react-qr-code (catalog) to render the verification_uri_complete
  inline as SVG (no external CDN image), polls status at the
  server-supplied cadence, auto-closes on success, offers
  "scan again" on terminal failure. Per CLAUDE.md "Enum drift
  downgrades, not crashes", error_reason switch has a default
  fallback so an older desktop build on a newer server still
  renders the generic failure copy.
- Adds the device-flow strings to en + zh-Hans settings.json;
  removes the obsolete OAuth-not-configured copy.

Verified locally:
  - go build ./... / go vet ./... clean
  - go test ./internal/integrations/lark/... — all green
    (existing tests + 15 new registration / GetBotInfo tests)
  - go test ./internal/handler -run 'Lark|Binding' — all green
  - pnpm typecheck — all 6 packages clean
  - pnpm lint — 0 errors (15 pre-existing warnings, none in changed files)
  - pnpm --filter @multica/views test — 859/859 pass

Pre-existing failures in server/internal/middleware (column
"profile_description" missing from local test DB) reproduce against
the parent commit and are unrelated to this change.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): gate bind CTA to workspace admins, terminate QR polling on 4xx (MUL-2671)

Two frontend must-fixes from the PR #3277 二审:

1. LarkAgentBindButton now self-hides for non-admin viewers in addition
   to the existing install_supported check. The agent-detail page mounts
   the button under `canEdit`, which canEditAgent lets agent owners
   through even when they are not workspace admins — but the backend
   gates POST /lark/install/begin and the status poll on owner/admin
   (router.go:478-487), so the previous behavior shipped a CTA that was
   guaranteed to 403. The new gate reads workspace role from the same
   member list the settings tab already uses.

2. The status polling loop now terminates on 404 (session gone — server
   restarted, multi-instance routing, or in-process GC swept it) and
   403/401 (permission revoked mid-session). Previously every error
   path scheduled another setTimeout, which trapped the user on a stale
   QR forever. ApiError gives us the HTTP status verbatim; terminal
   responses set status=error with stable error_reason codes
   (session_lost, forbidden) that flow through the existing dialog
   switch + retry/close affordances. 5xx + network blips still retry.

i18n: new install_error_session_lost / install_error_forbidden in en
and zh-Hans, with default fallback preserved per the enum-drift rule.

Coverage: 6 new vitest cases — admin/owner allow, member deny,
unsupported-install deny, and the two terminal-error polling paths
using fake timers to assert the loop stops scheduling.

Also clears a handful of stale OAuth/manual-install doc comments
flagged in the review (non-blocker cleanup): doc.go's §10 now points
at RegistrationService, installation.go's input-shape doc loses the
OAuth-callback half, and client.go's stubAPIClient comments no longer
reference OAuth callbacks.

Co-authored-by: multica-agent <github@multica.ai>

* docs(integrations/lark): describe gate as device-flow install in agent-detail integrations comment (MUL-2671)

The comment block above the agent-detail Integrations section still
described the capability gate as 'server-side OAuth-install'. The
OAuth path is gone — install is now device-flow per RFC 8628 — so the
comment now reads 'server-side device-flow install capability gate'.

Pure comment change; behavior is unchanged. Cleans up the nit Elon
called out in PR #3277 二审 (MUL-2671).

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): wire inbound pipeline + WS Hub at boot (MUL-2671)

Stage 3.a of MUL-2671. Hub class, Dispatcher, ChatSessionService and
AuditLogger have all been implemented and tested in prior PRs but
none of them was constructed at boot, so the in-process plumbing
was never exercised end-to-end. This change wires them together
behind the same `MULTICA_LARK_SECRET_KEY` gate that already gates
InstallationService / RegistrationService, and starts the Hub under
the existing `sweepCtx` so it winds down alongside the other
long-running workers after HTTP drain.

The real long-conn EventConnector is still pending; the factory
hands every supervisor a shared NoopConnector that holds the lease
and emits nothing. That lets staging exercise the lease /
supervisor / shutdown lifecycle against real DB rows without
committing to the Lark wire protocol implementation. Swapping in
the real connector is a single line change in the same router
block; the Dispatcher / ChatSessionService / Hub seams stay frozen.

## Why a noop placeholder, not a stub-or-skip

The Hub's value is mostly its lifecycle: §4.4 ownership lease,
LeaseRenewInterval / LeaseTTL, supervisor reap on revoke, clean
release on shutdown. None of that runs unless the Hub is actually
started. Holding off until the real connector lands means the next
PR has to debut both pieces simultaneously; wiring the supervisor
loop first lets the real connector PR be a focused, reviewable
swap.

## Changes

- `internal/integrations/lark/noop_connector.go` — `NoopConnector`
  implementing `EventConnector`: blocks on ctx until the Hub
  cancels (lease loss / shutdown / revoke), emits no events, logs
  on enter/exit so operators see exactly which installation the
  supervisor is holding the lease for.
- `internal/integrations/lark/noop_connector_test.go` — verifies
  the connector blocks until ctx cancel, returns nil on clean exit,
  never invokes the emit callback, and the factory shares a single
  connector instance across installations.
- `internal/handler/handler.go` — new `LarkHub *lark.Hub` field on
  `Handler`. Nil when the Lark integration is disabled.
- `cmd/server/router.go` — inside the existing Lark wiring block,
  construct `AuditLogger`, `ChatSessionService` (with `*pgxpool.Pool`
  for the in-tx dedup Mark), `Dispatcher` (wiring `h.IssueService`
  and `h.TaskService` so `/issue`-created issues share counter /
  duplicate guard / project boundary / broadcast / analytics with
  the rest of the product), and the `Hub` with the
  `NoopConnectorFactory`. `NewRouterWithOptions` now returns
  `(chi.Router, *handler.Handler)` so main.go can drive Hub
  lifecycle; `NewRouter` discards the handler.
- `cmd/server/main.go` — start the Hub under `sweepCtx` after the
  other background workers, and `Wait` on it after HTTP drain +
  sweep cancel so the lease renewer can issue a final release
  before exit. Skipped entirely when `h.LarkHub == nil`.

## Test plan

- [x] `go build ./...` clean
- [x] `go vet ./...` clean
- [x] `go test ./internal/integrations/lark/...` (new noop tests +
      existing hub / dispatcher / chat_service / registration /
      binding_token / outbound / issue_command suites) — all pass
- [x] `go test ./internal/handler -run 'TestLark|TestRedeemLarkBinding'`
      pass — handler-side Lark surfaces unchanged
- [x] `go test ./internal/service/... ./internal/util/secretbox/...`
      pass
- [x] `pnpm --filter @multica/views exec vitest run settings/components/lark-tab`
      pass (6/6) — frontend lark surfaces unchanged
- [ ] Local broad `go test ./internal/handler/...` still blocked by
      the pre-existing test DB schema drift Elon flagged in the
      previous round (`column "metadata" does not exist`,
      unrelated to this change); CI is the authoritative check.
- [ ] Manual end-to-end deferred until the real long-conn
      EventConnector lands (next stage).

MUL-2671

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): bound Hub lease release + shutdown wait (MUL-2671)

Lease release used context.Background(); a stalled DB pool could pin
shutdown indefinitely. Add LeaseReleaseTimeout (5s default) and
ShutdownTimeout (15s default) to HubConfig, route releaseLease through
a bounded context, and expose WaitWithTimeout for main.go so a wedged
supervisor degrades to LeaseTTL expiry on the next replica instead of
blocking process exit. Also correct the LarkHub field comment in
handler.go: the Hub is wired whenever the at-rest secret key is set,
independent of whether the outbound HTTP APIClient is configured.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): real WS long-conn connector + ctx-cancel-breaks-read (MUL-2671)

Replaces NoopConnectorFactory with a production EventConnector that
opens Lark's event-subscription WebSocket. Gated behind
MULTICA_LARK_WS_ENABLED so staging boots stay on the noop path until
operators opt in, and falls back to noop with a warning when the WS
flag is set without MULTICA_LARK_HTTP_ENABLED (the real connector
needs the cached tenant_access_token).

Why this connector exists separately from the Hub: gorilla/websocket
ReadMessage blocks on the underlying TCP socket and does not observe
context. The watchdog goroutine inside WSLongConnConnector.Run closes
the conn the moment ctx fires, so lease loss / shutdown breaks the
blocking read in bounded time — exactly the invariant Hub
renewLeaseUntil's runCancel depends on for the "at most one active WS
per installation across replicas" guarantee. Tests cover this
explicitly (TestWSConnectorRunReturnsOnCtxCancelEvenWhenReadIsBlocked).

The Lark wire surface is split into three swappable seams so the
transport layer stays tested in isolation:

  - EndpointFetcher (POST /event-subscription/v1/connection_token)
    resolves a one-shot wss URL per Run. No caching — replaying a
    one-shot token would look like a Lark outage.
  - FrameDecoder turns one raw JSON envelope into an InboundMessage
    or a "control / heartbeat / drop" verdict. Decoder errors log
    + drop the frame; they do NOT tear down the connection.
  - CredentialsProvider wraps InstallationService.DecryptAppSecret
    so plaintext app_secret lives in memory only during a Run.

Also fixes the handler.go LarkHub comment: it still said "joins on
Wait during graceful shutdown" but main.go has used WaitWithTimeout
(bounded wait) for several commits. Comment now matches.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): align WS to official binary Frame protocol + DispatchResult outbound replies (MUL-2671)

Two must-fix items from Elon's review of PR #3277:

1. WS protocol layer rewritten to match the official Lark Go SDK
   (`larksuite/oapi-sdk-go/v3/ws`):
   - Bootstrap is `POST /callback/ws/endpoint` with AppID/AppSecret
     in the body (no tenant_access_token bearer). Response carries
     wss URL + ClientConfig (PingInterval / ReconnectInterval /
     ReconnectNonce / ReconnectCount).
   - `service_id` is parsed from the wss URL query and used as
     Frame.Service on every outbound frame.
   - Wire envelope is the binary protobuf `pbbp2.Frame` (hand-rolled
     via protowire to avoid pulling the whole SDK in, byte-identical
     field tags). JSON payloads are nested inside Frame.Payload.
   - Inbound data frames are ACKed with a `Response{code:200,...}`
     JSON payload that reuses the inbound headers; infra failures
     produce code=500 so Lark retries.
   - Ping is the app-layer binary `NewPingFrame(serviceID)` at the
     server-supplied cadence; WebSocket protocol PING is removed
     (Lark ignores it). Server-initiated pings get a pong reply.
   - ctx-cancel-breaks-read invariant preserved via the watchdog
     goroutine that closes the conn on ctx.Done; the read loop and
     ping goroutine serialize their writes through a single mutex.

2. `DispatchResult` outbound replies wired via a new `OutcomeReplier`:
   - `OutcomeNeedsBinding` mints a one-shot binding token and sends
     the binding prompt card to the sender's open_id.
   - `OutcomeAgentOffline` / `OutcomeAgentArchived` push a notice
     card into the chat with the agent name + Chinese copy matching
     §4.6.
   - `OutcomeIngested` stays owned by the Patcher; `OutcomeDropped`
     is silent.
   - The replier is best-effort: outbound failures are logged and
     swallowed so a Lark outage cannot stall the inbound pipeline.
   - Hub installs the noop replier by default; router wires the
     production `LarkOutcomeReplier` when APIClient.IsConfigured().

PersonalAgent long-conn risk surfaced (open per Feishu docs:
`长连接模式仅支持企业自建应用`). The implementation works for any
app archetype; the open question is whether `/callback/ws/endpoint`
accepts PersonalAgent credentials in practice. Surfacing the Lark
code+msg verbatim from the bootstrap response so an operator running
the smoke test sees the exact failure rather than a generic timeout.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): byte-compat Frame marshal, chunk reassembly, ACK off reply critical path (MUL-2671)

Three protocol blockers from Elon's review of 9540008a:

1. Frame.Marshal is now byte-identical to oapi-sdk-go/v3/ws/pbbp2.Frame:
   - SeqID/LogID/Service/Method (proto2 req) emit unconditionally even at zero
   - PayloadEncoding/PayloadType/LogIDNew emit unconditionally per gogo
     generated MarshalToSizedBuffer (no zero-guard)
   - Payload uses the SDK's `!= nil` guard (nil omits, []byte{} emits 0-length)
   - ACK payload JSON matches SDK's NewResponseByCode + json.Marshal output
     ({"code":N,"headers":null,"data":null})

   Golden tests pin exact byte sequences for ping/pong/ACK/full/zero
   frames; verified against the real SDK pbbp2.pb.go MarshalToSizedBuffer
   producing identical bytes.

2. Multi-frame events (sum>1) are reassembled via the new chunkAssembler:
   - 5s sliding TTL (matches SDK combine() cache TTL)
   - Lazy GC on admit (no separate sweeper goroutine)
   - Out-of-order seq + duplicate seq idempotent
   - Partial chunks are NOT ACKed (SDK behaviour: only the final chunk's
     ACK confirms the whole event so Lark can retry on partial loss)
   - Connector wires assembler per-Run; state dies with the session

3. OutcomeReplier detached from ACK critical path:
   - HubConfig.ReplyTimeout default 2.5s, strictly under Lark's 3s ACK deadline
   - handleEvent dispatches synchronously (fast DB path), then spawns the
     replier under a fresh background ctx with WithTimeout(ReplyTimeout)
   - Hub.replyWg tracks in-flight replies; Hub.Wait / WaitWithTimeout
     drain them so shutdown is bounded
   - Noop replier short-circuits inline (no goroutine cost when outbound
     APIClient isn't configured)

   Proof tests:
   - TestHubScheduleReplyReturnsImmediately: scheduleReply with a 10s
     slow replier returns in <50ms
   - TestHubReplyTimeoutCancelsHungReplier: hung replier ctx fires at
     ReplyTimeout
   - TestHubWaitDrainsInFlightReplies: Wait blocks until replies finish
   - TestHubACKNotBlockedByOutboundReply: end-to-end through the
     connector — data-frame ACK lands within 500ms even when the
     replier hangs 5s

PersonalAgent real-env smoke remains Bohan's decision; this PR closes
the technical blockers Elon flagged.

Co-authored-by: multica-agent <github@multica.ai>

* docs(service/issue): narrow position concurrency claim to create-create (MUL-2671)

Elon's review of the merge resolution flagged that the comment on the
new NextTopPosition call promised more than the code guarantees:
concurrent manual reorder via UpdateIssue(position) does NOT take the
workspace row lock that IncrementIssueCounter holds, so a create
racing a reorder can still land on the same position. Rewrite the
comment to only claim create-create serialization, which is the
behaviour the lock actually delivers. No code change.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): keep device-flow polling on RFC 8628 HTTP 400 (MUL-2671)

Lark's device-flow polling endpoint returns HTTP 400 with the JSON
body `{"error":"authorization_pending"}` while the user hasn't scanned
the QR yet — this is the RFC 8628 spec, and the upstream oapi-sdk-go
implements the same handling. Our previous doForm treated ANY non-2xx
as a terminal protocol error, so every install session was killed by
the first poll (~5s after begin) and the install dialog appeared
silently empty: the frontend received status=error +
lark_protocol_error before the user could even read the description.

Fix: doForm now decodes the JSON body first; if it parses, the caller
(Begin / Poll) routes on the body's `error` field, where the existing
switch correctly maps authorization_pending / slow_down to "keep
polling" and access_denied / expired_token to terminal failure. Only
unparseable bodies (5xx HTML proxy pages, gateway timeouts) still
surface as a typed http_NNN RegistrationError.

Three regression tests pin the new behaviour:
- HTTP 400 + authorization_pending → res.Status="authorization_pending"
- HTTP 400 + access_denied → res.Err.Code="access_denied" (terminal)
- HTTP 502 + HTML body → http_502 RegistrationError

Verified against the live local env: install/begin -> 200, status
stays "pending" through the first poll cycle, no longer flips to
"error" within seconds.

Co-authored-by: multica-agent <github@multica.ai>

* fix(views/lark): reset closedRef on every mount so StrictMode double-mount renders QR (MUL-2671)

Empty QR dialog body in the dev env: Bohan opened the bind dialog and
got an empty white area where the QR should have been — no QR, no
"starting" placeholder, no error text. Backend was returning the QR
URL correctly; the bug was on the frontend.

Root cause: React 19 / Next.js dev StrictMode mounts every component
twice (mount → cleanup → mount). The component instance is REUSED
across the simulated remount, which means useRef objects are
preserved. The dialog's `closedRef` lifecycle:

  1. Mount #1: closedRef={current:false}, beginSession() kicked off
     (HTTP request still in flight)
  2. Cleanup runs: closedRef.current=true
  3. Mount #2: beginSession() kicked off again, BUT the ref still
     reads {current:true} from step 2
  4. Both promises resolve. Both hit the post-await guard
     `if (closedRef.current) return;` and bail out before setSession().
  5. Result: session stays null forever. Every conditional in the
     dialog body (beginning/session-pending/success/error) is false →
     empty body.

Fix: reset closedRef.current=false at the START of the effect, not
just at component construction. The cleanup-then-mount pair now
re-arms the guard so subsequent setSession calls actually land.

Regression test wraps the dialog in <StrictMode> and asserts the
QR appears within 2s with the correct value — fails closed if anyone
removes the reset.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): drop EventTaskCompleted subscription so the chat reply doesn't get overwritten by "Done." (MUL-2671)

Bohan reproduced on the live dev env: agent replies show only a card
saying "Done." in Lark, even though Multica's own chat panel has the
real "Hello! I'm cc…" reply. Tasks succeed end-to-end, but the user
loses the reply on the Lark side.

Root cause: TaskService.CompleteTask publishes two events for every
chat task IN ORDER:

  1. broadcastChatDone(...)       → ChatDonePayload{Content: "Hello!..."}
  2. broadcastTaskEvent(Completed) → map[string]any{task_id, agent_id,...}
                                     (no `content` key)

The Patcher subscribed to BOTH and routed each to finalize(). The
first patch correctly rendered the reply text, the second
patched the same card with an empty payload — chatDoneContent()
returned "" and the renderer fell back to "Done." (default empty-body
copy). The second patch wins because Lark stores whatever was last
applied.

Fix: stop subscribing to EventTaskCompleted in the Patcher and remove
the corresponding switch arm. EventChatDone is the canonical "agent
finished replying" signal for the Lark card path; EventTaskCompleted
is still emitted to the bus for other listeners (web UI, analytics,
task usage) where the lack of content doesn't matter.

Regression test TestPatcherIgnoresEventTaskCompletedForChatTasks
emits ChatDone followed by TaskCompleted on a streaming card and
asserts: exactly one patch, body contains the agent reply, body does
NOT contain "Done.". If anyone re-adds the EventTaskCompleted
subscription, this fails immediately.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): chat replies as plain text IM messages, not card chrome (MUL-2671)

Bohan reported on the live dev env that even with the agent's reply
shown correctly, every message is wrapped in an interactive card with
the agent name as the header — it feels like a system notification,
not a normal chat reply. He wants the reply to land as a regular Lark
text bubble.

Changes:

- Add APIClient.SendTextMessage backed by Lark's
  /open-apis/im/v1/messages with msg_type=text. JSON-encodes the
  {"text": ...} envelope Lark requires so callers pass raw strings.
- Patcher.Register no longer subscribes to EventTaskQueued /
  EventTaskRunning. There is no more thinking → running → final
  card lifecycle on the success path: it added card chrome without
  buying anything for free-form chat.
- On EventChatDone, the new sendChatReply path posts the assistant
  message content as plain text. Empty content is silently dropped
  rather than rendered as "Done." (the prior fallback that
  confused Bohan).
- Failure path keeps a one-shot error card on EventTaskFailed —
  the visual distinction from a normal reply is genuinely useful,
  and failures are rare enough that the chrome isn't noisy.
- Throttle / lastPatched map / MinPatchInterval / shouldPatch /
  markPatched / loadCardOrSkip are all removed; nothing in the new
  flow patches.

Tests:

- TestPatcherSendsPlainTextOnChatDone pins the new contract: exactly
  one SendTextMessage call, no card sends or patches, content
  matches the ChatDonePayload.
- TestPatcherDropsEmptyChatReply pins the "no more Done. fallback"
  decision — empty content drops, period.
- TestPatcherFailEventSendsErrorCard pins the failure path still
  uses a card (one-shot, no patching).
- TestPatcherIgnoresEventTaskCompletedForChatTasks rewritten for
  text path: ChatDone then TaskCompleted yields exactly one text
  send, no duplicate.
- TestPatcherSkipsWhenNoChatSessionBinding and
  TestPatcherSwallowsInstallationLoadErrors rewritten to drive
  EventChatDone (the new entry point) instead of TaskQueued.
- TestPatcherSendsThinkingCardOnTaskQueued deleted (no more
  thinking card).

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): pre-fill PersonalAgent bot name as "<agent> - Multica" (MUL-2823) (#3520)

The device-flow install left the bot at Lark's auto-generated
"{用户姓名}的智能助手". Lark's registration scene supports pre-filling the
name via a `name` query param on the verification/QR URL (mirrors the
upstream SDK's AppPreset.Name) — a user-editable default that rides on
the QR URL, not the begin POST body (which has no name field).

BeginInstall already loads the agent for its ownership check, so we keep
it and thread `<agent.Name> - Multica` through Begin → decorateQRCodeURL.
A blank name degrades to plain "Multica".

There is no post-install rename API (bot/v3 is read-only; no
bot/v3/update), so the install-time pre-fill is the only programmatic
lever; the user can still edit the name on the creation form.

Co-authored-by: J <j@multica.ai>
Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): restore /issue confirmation + pin SendTextMessage wire (MUL-2671)

Two recovered/added contracts off Trump's review of HEAD fe381a07:

1) /issue confirmation in Lark was a casualty of the plain-text
   refactor. The pre-refactor `RenderInput.IssueNumber` field was
   declared but never actually rendered into the card body, so even
   in the original card-based flow the user never saw a "Created
   [MUL-42]" confirmation. Now the OutcomeReplier handles
   OutcomeIngested + IssueID.Valid by sending a plain text message:

     Created MUL-42 — fix login bug
     https://multica.example/issues/MUL-42

   Composed from a new DispatchResult.IssueIdentifier +
   IssueTitle, populated by the Dispatcher from
   workspace.IssuePrefix + issue.Number / issue.Title. Workspace
   lookup is best-effort: a Postgres blip on workspace gets a "#42"
   fallback rather than silently dropping the confirmation.

   The agent's own chat reply (if any) continues to land separately
   via the Patcher on EventChatDone — these are two semantically
   distinct messages and the user benefits from seeing both.

2) SendTextMessage is the wire layer Trump flagged for missing
   coverage. Three new wire tests pin:
   - happy path: POST /open-apis/im/v1/messages?receive_id_type=chat_id,
     msg_type=text, Bearer <tenant_access_token>, double-JSON
     content envelope
   - special-character round trip: newlines, double quotes,
     backslashes, tabs, Chinese + emoji, JSON-lookalike strings.
     The inner {"text": ...} is encoded once at JSON.Marshal time
     and once again when the outer body serializes; losing either
     pass corrupts the message and the bug is invisible without a
     contract pin.
   - Lark error path: non-zero `code` surfaces as a wrapped error
     with the code embedded.

Tests:
- TestDispatcher_IssueCreationFromCommand asserts IssueIdentifier
  ("MUL-42") and IssueTitle propagate through DispatchResult.
- TestDispatcher_IssueIdentifierFallsBackToNumberOnWorkspaceLookupErr
  pins the "#7" degrade-graceful fallback.
- TestLarkOutcomeReplierIssueCreatedSendsConfirmation pins the
  text body (identifier + title + deep link) and asserts no card
  send on this path.
- TestLarkOutcomeReplierOutcomeIngestedSilentWithoutIssue pins
  the silent-on-plain-chat default so we don't accidentally start
  emitting a confirmation for every message.
- TestHTTPClient_SendTextMessage_* covers the wire contract.

Frontend locale parity (en + zh-Hans, 53 tests) is currently green
on this HEAD; no changes needed.

Co-authored-by: multica-agent <github@multica.ai>

* fix(views/locales): add missing ko keys for Lark MVP (MUL-2671)

Trump flagged on PR #3277 review that the ko bundle was missing the
Lark-MVP-only keys that en + zh-Hans both carry. The parity test
caught it cleanly after main was merged in (Korean PR landed on main
between the prior review and this one):

  common.lark_bind.*                       (13 keys)
  settings.page.tabs.lark                  (1 key)
  settings.lark.*                          (45 keys)
  agents.inspector.section_integrations    (1 key)

Korean translations are professional/concise — "Lark" stays as the
brand name (matches how en keeps "Lark" + "(飞书)" parenthetically;
ko/users searching for the product expect "Lark"), and product copy
follows the zh-Hans tone where Multica nouns ("에이전트", "워크스페이스")
are romanized loan words consistent with the rest of the ko bundle.

Slot ordering preserved against EN:
  - page.tabs.lark sits between github and integrations
  - inspector.section_integrations sits right after section_skills

Verified: pnpm exec vitest run locales/parity → 105/105 pass.
Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): /issue origin_type CHECK + Hub restart on credentials rotation (MUL-2671)

Two live-env bugs Bohan reproduced:

1) /issue command crashed the WS connector. Dispatcher writes
   origin_type='lark_chat' on issues born from `/issue`, but the
   issue_origin_type_check CHECK constraint was last extended in
   migration 060 for quick_create — it doesn't list lark_chat, so
   every Lark /issue tripped SQLSTATE 23514 and bubbled up as an
   infra error. The infra error tore down the WS connector, Lark
   retried the same message, the new connector tripped the same
   constraint and crashed again. Repro in the live env: three
   crashes from the same /issue event over ~40s, each leaving the
   user with no confirmation in Lark.

   Migration 111 extends the CHECK list:
     CHECK (origin_type IN ('autopilot', 'quick_create', 'lark_chat'))

2) Re-scanning an already-bound agent silenced the bot. The device
   flow re-registers with Lark, which mints a brand-new bot (fresh
   app_id + app_secret); RegistrationService.finishSuccess upserts
   into lark_installation by agent_id, so the row's credentials
   rotate in place. But the running supervisor held the OLD inst
   struct by value and kept a WS open against the OLD bot's app_id —
   so all events to the NEW bot went nowhere. Bohan's "claude code
   现在不能在飞书里回复了" symptom maps exactly to this:

      log timeline:
      16:29:57  cc connector connected with app_id=cli_aa9398dd...  (OLD)
      16:34:07  lark registration: install complete                  (rotation)
      → row.app_id is now cli_aa93f36f...                            (NEW)
      → old WS still subscribed to OLD app_id; new app_id receives nothing

   Fix: Hub.sweep now compares each installation row's credentials
   fingerprint (app_id + bot_open_id + sha256(app_secret_encrypted))
   against the snapshot the running supervisor was started with. On
   diff, cancel the old supervisor and start a fresh one inline. A
   monotonic gen counter on the supervisor entry disambiguates the
   old goroutine's deferred cleanup from the new entry the rotation
   path already swapped in.

Tests:
- TestHubRestartsSupervisorOnCredentialsRotation pins the new path:
  starts hub on app_one, rotates the row to app_two, asserts the
  connector factory is called again with the fresh AppID.
- TestHubDoesNotRestartSupervisorOnUnchangedRow pins the negative
  case so an unchanged row doesn't degenerate into a per-sweep
  busy-loop.
- Existing hub tests (lease, supervise, shutdown, ACK timing,
  noop replier) all green.

Verification:
- go test ./internal/integrations/lark/... -race -count=1   ok
- go build ./... clean
- migration applied locally; \d+ issue confirms lark_chat in CHECK

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): per-supervisor lease token to fence rotation handoff (MUL-2671)

Elon flagged a race in HEAD be8d4cef's rotation path: both the old
and the new supervisors of the same Hub used the hub-wide nodeID as
their WS lease token, so an old supervisor's post-cancel
releaseLease(nodeID) would CAS-match the lease row the successor had
just acquired with the SAME token and DELETE it. Symptom would be a
silently empty lease row a few hundred ms after every device-flow
re-scan — no replica owning the install, no events delivered, the
"bot goes quiet" pattern Bohan hit the first time but now from the
fencing side rather than the credentials side.

Fix: leaseToken(nodeID, gen) composes "<nodeID>-g<gen>", where gen is
the monotonic counter already attached to each supervisorEntry. The
nodeID prefix keeps cross-replica observability (an operator
inspecting lark_installation.ws_lease_token can still map back to a
process) while the -g suffix makes the OLD supervisor's release
target the OLD row state. Once the rotation path swaps in the new
supervisor, the row's CurrentToken is the new -g(N+1) token, so the
old -gN release's WHERE clause no-ops instead of clobbering.

acquireLease / renewLeaseUntil / releaseLease now take an explicit
token argument; supervise threads its leaseToken through. The
plumbing isn't pretty, but having an explicit argument at every call
site is the only way the rotation invariant survives subsequent
refactors — without it, a future caller could quietly reintroduce
"just use h.nodeID" and the race is back.

Two regression tests:

- TestHubRotationStaleReleaseDoesNotClearSuccessorLease drives the
  fake lease state machine directly:
    1. old acquires(tokenA)
    2. rotation lands; new acquires(tokenB)
    3. old's stale release(tokenA) fires
  Asserts owner ends up still tokenB. Hub-wide-nodeID code would fail
  step 3 by clearing the entry.

- TestHubRotationEndToEndKeepsSuccessorLeased runs the same scenario
  through the live supervise loop: starts hub, rotates the row, waits
  for sup2 to take over with a distinct token, sleeps past sup1's
  unwind, asserts the row is still held by a non-sup1 token. Catches
  the bug even when the goroutine timing is non-deterministic.

Verification: go test ./internal/integrations/lark/... -race -count=1   ok
  go build ./...                                            clean
  go vet ./...                                              clean
Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): route group @-mentions via union_id, not open_id (MUL-2671)

In a Lark group with multiple Multica bots installed, the bot whose WS
received the event sometimes failed to recognize that it was the @-target
while the OTHER bot's supervisor falsely fired. Bohan's controlled three-
message test (only @A, only @B, @both) hit this: @A and @B alone went
unanswered, @both got picked up by A only.

Root cause: the `mentions[].id.open_id` field Lark puts on the WS event
is structurally INVERSE to `/bot/v3/info`'s `bot.open_id` across the two
WSes. From A's WS perspective, the wire-form open_id for "A was @-ed"
is NOT equal to A's API-side open_id, but IS equal to what B's WS sees
on its side, and vice versa. The decoder's `mention.open_id ==
inst.BotOpenID` match therefore fires on the wrong bot in multi-bot
groups. Only `union_id` (the Lark-tenant-scoped stable identifier) is
consistent across both WSes.

Changes:
- migration 112 adds nullable `lark_installation.bot_union_id`
- sqlc query exposes UpsertLarkInstallation/CreateLarkInstallation
  with bot_union_id, plus a focused SetLarkInstallationBotUnionID for
  the backfill path
- httpAPIClient.GetBotInfo now follows /bot/v3/info with /contact/v3/
  users/{open_id}?user_id_type=open_id and returns both identifiers
  on BotInfo. Soft-fails on contact-scope denial: install still
  succeeds with an empty UnionID, and the decoder falls back to the
  legacy open_id match for single-bot deployments.
- RegistrationService.finishSuccess persists union_id alongside
  open_id during the device-flow finalize.
- ws_frame_decoder.containsMention prefers union_id and only walks
  open_id when the installation row has not been backfilled yet.
- BackfillBotUnionIDs runs once at server boot for installations
  created before migration 112; bounded per-row 10s timeout and a
  pure soft-fail policy so a slow Lark round-trip cannot block
  startup.
- regression tests cover the three decoder paths: union_id match
  wins over open_id mismatch, union_id mismatch overrides open_id
  match, and open_id fallback when union_id is unknown.

Co-authored-by: multica-agent <github@multica.ai>

* chore: drop trailing blank lines at EOF on four files (MUL-2671)

git diff --check origin/main..origin/pr-3277 flagged these as new
blank lines at EOF; clearing so the diff stays clean for review.

Co-authored-by: multica-agent <github@multica.ai>

* fix(views/locales): add missing ja keys for Lark MVP + section_integrations (MUL-2671)

CI frontend job tripped on the ja locale parity check: ja is missing
the lark_bind block in common.json, the lark block + page.tabs.lark
in settings.json, and inspector.section_integrations in agents.json.
The ko fix earlier covered Korean; ja was added separately on main
and the merge surfaced these gaps. Translations mirror the en source
and follow the same voice as the existing ja bundle.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): rewrite @_user_N placeholders into clean body (MUL-2671)

When Lark dispatches a group `im.message.receive_v1`, the message
text contains opaque `@_user_1`, `@_user_2`, … placeholders and the
real identity is in `mentions[]`. We were forwarding the raw text to
the agent, so a Bohan-typed "@Bot ping test" arrived as "@_user_1
ping test" — neither human-readable nor useful as LLM context, and
the agent was paying tokens to figure out which `@_user_N` was even
itself.

The new resolveMentions pass:
  * strips the bot's own mention entirely (the dispatcher already
    routes the event on AddressedToBot; re-emitting @<self> in front
    of every message adds zero signal and pollutes context),
  * substitutes other participants with `@<displayName>` so a
    follow-up "@Alice" reads naturally,
  * collapses horizontal whitespace introduced by the strip while
    preserving original newlines.

Bot identity check uses the same union_id-preferred + open_id
fallback as containsMention, so the rewrite stays consistent with
the routing path. Tests cover the four shapes: bot self-mention,
mixed bot + other-user mention, multi-line body with stripped
mention, and a no-mention body that should be left untouched.

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): union_id-first self mention strip + token-aware scan + local whitespace cleanup (MUL-2671)

Three review blockers on the mention rewrite from PR review:

1. isBotMention now mirrors containsMention's union_id-first policy.
   When the installation row knows our union_id, we trust it
   exclusively (open_id is structurally inverted in multi-bot
   groups — matching on it would re-introduce the routing bug we
   fixed two commits ago). open_id fallback fires only when
   union_id is absent. New tests: @-ing both bots in one message
   correctly strips only self and renders the sibling as @<name>;
   open_id-matches-but-union_id-differs does NOT strip.

2. resolveMentions no longer collapses or trims whitespace globally.
   Indentation, tabs, code blocks, tables — all preserved verbatim.
   When the self mention is removed we eat exactly one adjacent
   horizontal space (the one after the placeholder, or, when the
   mention sits at end-of-input, a single space already emitted
   right before it). New test exercises a multi-line indented +
   tabbed body and asserts the whole shape survives.

3. Prefix-collision-safe replacement. A chat with 11+ participants
   exposes both `@_user_1` and `@_user_10`; naive ReplaceAll for
   `@_user_1` would mangle the substring of `@_user_10`. The
   resolver now does a single-pass token scan with the mention
   list sorted longest-key-first, so the longer placeholder always
   wins at any scan position. New test covers the @_user_1 /
   @_user_10 case explicitly.

Also drops the temporary INFO-level diag logging the previous
commit added — root cause was confirmed (union_id swap in the
manual backfill; not a decoder bug).

Co-authored-by: multica-agent <github@multica.ai>

* fix(integrations/lark): scope inbound dedup per (installation_id, message_id) (MUL-2671)

Root cause of the residual "@Cc gets dropped as not_addressed_in_group"
even after the union_id swap landed: lark_inbound_message_dedup was
keyed on `message_id` alone. In a Lark group chat where the workspace
has multiple Multica bots installed, Lark delivers the SAME message_id
to every bot's WS supervisor. Whichever WS claimed first then ran its
own AddressedToBot check; the bot that was actually @-ed lost the dedup
race, found the row already terminal (`processed_at IS NOT NULL`), and
was dropped as `duplicate` BEFORE it could evaluate its own mention.
Net: every @ silently disappeared if Lark happened to route the OTHER
bot's WS first.

The dedup gate's original purpose (idempotency against WS reconnect
replay) is per-installation by definition, so the right key is
composite (installation_id, message_id).

Changes:
- migration 113 drops + recreates lark_inbound_message_dedup with
  installation_id NOT NULL REFERENCES lark_installation(id) ON DELETE
  CASCADE and PRIMARY KEY (installation_id, message_id). The table is
  a 24h transient cache, so dropping existing rows is safe.
- sqlc queries: ClaimLarkInboundDedup / MarkLarkInboundDedupProcessed /
  ReleaseLarkInboundDedup all now take installation_id.
- AppendUserMessageParams carries InstallationID through to the
  in-tx Mark call so the chat_message+dedup atomicity stays intact.
- Dispatcher passes inst.ID to claim + applyFinalize + AppendUserMessage.
- Test fakes key dedup state on (installation_id, message_id) via a
  composite map key; all existing pre-seeded rows use a seedDedupKey
  helper bound to the default activeInstallation fixture so the prior
  staleness / token-rotation / in-tx mark tests still exercise the
  same regression they did before.
- New regression TestDispatcher_DedupIsScopedPerInstallation pins the
  multi-bot invariant: a row pre-seeded for installation A does NOT
  block installation B's first delivery of the same message_id; B
  runs through its own group-filter / identity / ingest pipeline.

Co-authored-by: multica-agent <github@multica.ai>

* feat(integrations/lark): render markdown chat replies via schema-2.0 card (MUL-2671)

The agent's chat replies were going out as msg_type=text, so every
`**bold**`, fenced code block, list, table, and link in the body
showed up as literal markdown characters in Lark — the user saw raw
asterisks, hashes, pipes instead of formatted text. Bohan reported
this and pointed at zarazhangrui/lark-coding-agent-bridge as the
shape to emulate.

The bridge repo uses Lark interactive cards with the schema-2.0
envelope and a `tag: "markdown"` body element; Lark's client
renders that to formatted text (GFM-ish: bold/italic, headings,
lists, links, fenced code blocks, tables, blockquotes). They expose
multiple reply modes (card / markdown-as-post / text) gated by user
config; we go a step simpler — auto-detect markdown syntax in the
agent's body and route accordingly:

- containsMarkdown(): cheap substring + regex pass for fenced code
  blocks, headings, list markers, bold/italic, tables, links,
  blockquotes, horizontal rules, inline code. Biases toward false-
  positive — wrapping prose in a card still renders fine, but
  missing a real markdown block leaves raw characters visible.

- APIClient gains SendMarkdownCard / SendMarkdownCardParams.
  Implementation marshals the schema-2.0 envelope verbatim:
  {schema:"2.0", body:{elements:[{tag:"markdown", content: md}]}}.
  Stub returns ErrAPIClientNotConfigured.

- Patcher.sendChatReply now branches on containsMarkdown:
  markdown → SendMarkdownCard, plain prose → SendTextMessage. A
  one-liner "sure, on it" stays as a normal IM bubble (no card
  chrome); anything with markdown gets the rendered card.

Tests: TestContainsMarkdown pins the heuristic across plain prose
and ten markdown shapes; TestPatcherRoutesMarkdownReplyToCard and
TestPatcherRoutesPlainReplyToText cover the router; new HTTP wire
test TestHTTPClient_SendMarkdownCard_HappyPath contract-pins the
card envelope (msg_type=interactive, schema 2.0, markdown tag,
verbatim body). Full lark suite passes.

Co-authored-by: multica-agent <github@multica.ai>

* fix(service/issue): route analytics.IssueCreated through obsmetrics.RecordEvent (MUL-2671)

CI's TestNoNakedAnalyticsCaptureInHandlersOrServices guard caught the
post-merge analytics call in IssueService.captureCreatedAnalytics
that still used s.Analytics.Capture(...) directly. Main added that
lint to prevent the Prometheus and PostHog sides from drifting — any
new analytics.* event must go through obsmetrics.RecordEvent so the
business-metrics collector and the PostHog client fire from the same
call site.

Fix mirrors how TaskService handles it: IssueService gains a
Metrics *obsmetrics.BusinessMetrics field (router wires it via
h.IssueService.Metrics = opts.BusinessMetrics next to the existing
TaskService line), and the in-service Capture call becomes
obsmetrics.RecordEvent(s.Analytics, s.Metrics, ...). nil-safe by
construction — RecordEvent treats a nil Metrics as PostHog-only.

Co-authored-by: multica-agent <github@multica.ai>

* feat(views/lark): swap Bind CTA for Connected+Manage link when agent already has an installation (MUL-2671)

Bohan reported the agent-detail Bind button keeps inviting the user to
re-scan the QR even when the agent already has an active Lark
PersonalAgent connected — and re-scanning silently upserts the
installation row, leaving the previously-created Lark bot dangling
as a zombie. Frustrating UX and an actual product footgun.

Anti-zombie guard at the only entry point: LarkAgentBindButton now
checks the cached installations listing for an active row pinned to
this agent_id. When one exists, the install CTA is gone — replaced
by a small Connected pill + an "Manage in Lark" link that opens the
Bot's app page in Lark's developer console (open.feishu.cn/app/<app_id>)
in a new tab. That's where scopes, display name, and additional
permission requests actually live; re-scanning never was the right
answer for managing an existing bot.

Scoping is per-agent: an active installation on a DIFFERENT agent
in the same workspace doesn't affect this agent's button, and a
revoked installation falls back to the bind CTA so the user can
re-create. Tests cover all four states (own-active / own-revoked /
other-agent-active / no-installation) and pin the Manage link's
href + target=_blank + noopener.

i18n: three new keys in settings.json (en / zh-Hans / ja / ko):
agent_bot_connected_label, agent_bot_manage_link,
agent_bot_manage_tooltip. Locale parity test still 157/157.

The dev console host is hardcoded to open.feishu.cn — operators
on the Lark international tenant currently get the wrong host;
future-proof fix wants the backend to surface a per-installation
dev_console_url on the listings response, called out in a code
comment.

Co-authored-by: multica-agent <github@multica.ai>

* feat(views/settings): collapse Lark into Integrations + render agent identity (MUL-2671)

Lark was its own top-level workspace settings tab while Integrations sat
empty next to it. As more integrations land, the sidebar would balloon
with one tab per provider. Move the Lark surface into Integrations as
the first hosted integration; the old ?tab=lark URL redirects through
LEGACY_WORKSPACE_TAB_REDIRECTS so bookmarks still resolve.

The Connected bots list was leaking the raw Lark app_id (cli_…) as the
row title with bot_open_id (ou_…) underneath — meaningless to product
users. Since the binding is 1:1 with a Multica Agent, join on agent_id
and render the agent's avatar + name via the workspace-standard
ActorAvatar + useActorName.getAgentName. Deleted agents fall back to
"Unknown Agent" so the row is still actionable for cleanup.

Tests: stub useActorName + ActorAvatar in lark-tab.test.tsx and add
LarkTab connected-bot tests covering the agent identity render and the
deleted-agent fallback. Drop the now-dead integrations.* + page.tabs.lark
+ lark.bot_open_id_label keys across all four locales — parity still
157/157, views suite 1141/1141.

Co-authored-by: multica-agent <github@multica.ai>

* feat(views/settings): wrap Lark in a named section inside Integrations (MUL-2671)

Integrations is meant to host multiple providers (Slack, Linear etc. as
they land), so the Lark content should sit under a Lark heading rather
than fill the tab directly — otherwise the first additional integration
would feel like it broke the IA. Add a "Lark" / "飞书" section heading
above LarkTab using the same h2 chrome the other settings tabs use, and
pin lark.section_title across all four locales (parity 169/169).

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: multica-agent <github@multica.ai>
Co-authored-by: J <j@multica.ai>
2026-06-03 19:12:14 +08:00

1294 lines
45 KiB
Go

package lark
import (
"bytes"
"context"
"errors"
"io"
"log/slog"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/jackc/pgx/v5/pgtype"
db "github.com/multica-ai/multica/server/pkg/db/generated"
)
// fakeHubQueries is the unit-test seam for HubQueries. The lease state
// is held in memory so a single fake can play both "we hold the lease"
// and "another replica holds the lease" scenarios across one test.
type fakeHubQueries struct {
mu sync.Mutex
installations []db.LarkInstallation
listErr error
leaseOwner map[string]string // installation_id -> ws_lease_token
leaseExpiresAt map[string]time.Time // installation_id -> expiry
acquireErr error
releaseErr error
now func() time.Time
acquireCount int32
// releaseBlock, if non-nil, makes ReleaseLarkWSLease block until
// either the channel is closed/sent on OR the caller's ctx fires.
// Used to simulate a frozen DB pool so the bounded-release timeout
// can be exercised without standing up real infrastructure.
releaseBlock chan struct{}
// releaseObservedCtxErr captures the ctx error (typically
// context.DeadlineExceeded) the blocked release call observed
// when its bounded ctx fired. Inspected by tests to prove the
// bound actually fired instead of the test happening to win the
// race naturally.
releaseObservedCtxErr error
}
func newFakeHubQueries() *fakeHubQueries {
return &fakeHubQueries{
leaseOwner: make(map[string]string),
leaseExpiresAt: make(map[string]time.Time),
now: time.Now,
}
}
func (f *fakeHubQueries) ListActiveLarkInstallations(ctx context.Context) ([]db.LarkInstallation, error) {
f.mu.Lock()
defer f.mu.Unlock()
if f.listErr != nil {
return nil, f.listErr
}
out := make([]db.LarkInstallation, len(f.installations))
copy(out, f.installations)
return out, nil
}
func (f *fakeHubQueries) AcquireLarkWSLease(ctx context.Context, arg db.AcquireLarkWSLeaseParams) (db.LarkInstallation, error) {
atomic.AddInt32(&f.acquireCount, 1)
f.mu.Lock()
defer f.mu.Unlock()
if f.acquireErr != nil {
return db.LarkInstallation{}, f.acquireErr
}
id := uuidString(arg.ID)
owner, hasOwner := f.leaseOwner[id]
exp := f.leaseExpiresAt[id]
now := f.now()
// CAS: accept when no holder, holder expired, or holder is us.
if !hasOwner || exp.Before(now) || owner == arg.NewToken.String {
f.leaseOwner[id] = arg.NewToken.String
f.leaseExpiresAt[id] = arg.NewExpiresAt.Time
// Return the (synthetic) row — the supervise loop only checks
// the error, not the row contents.
return db.LarkInstallation{ID: arg.ID}, nil
}
// Live lease held by someone else.
return db.LarkInstallation{}, errPgxNoRows
}
func (f *fakeHubQueries) ReleaseLarkWSLease(ctx context.Context, arg db.ReleaseLarkWSLeaseParams) error {
f.mu.Lock()
block := f.releaseBlock
f.mu.Unlock()
if block != nil {
select {
case <-block:
// Released by the test — fall through to the normal path.
case <-ctx.Done():
f.mu.Lock()
f.releaseObservedCtxErr = ctx.Err()
f.mu.Unlock()
return ctx.Err()
}
}
f.mu.Lock()
defer f.mu.Unlock()
if f.releaseErr != nil {
return f.releaseErr
}
id := uuidString(arg.ID)
if f.leaseOwner[id] == arg.CurrentToken.String {
delete(f.leaseOwner, id)
delete(f.leaseExpiresAt, id)
}
return nil
}
// presetLease forcibly assigns a lease to a holder other than the hub
// under test. Used to verify "another replica owns it" branches.
func (f *fakeHubQueries) presetLease(id pgtype.UUID, token string, expires time.Time) {
f.mu.Lock()
defer f.mu.Unlock()
f.leaseOwner[uuidString(id)] = token
f.leaseExpiresAt[uuidString(id)] = expires
}
// fakeConnector counts how many times Run was invoked and behaves
// according to the script provided per-call. The default behavior
// (script nil) blocks on ctx.Done — useful for the "owns lease, stays
// connected" test.
type fakeConnector struct {
mu sync.Mutex
runs int
script []func(ctx context.Context, emit EventEmitter) error
emit EventEmitter
}
func (f *fakeConnector) Run(ctx context.Context, _ db.LarkInstallation, emit EventEmitter) error {
f.mu.Lock()
idx := f.runs
f.runs++
if idx < len(f.script) {
fn := f.script[idx]
f.mu.Unlock()
return fn(ctx, emit)
}
f.mu.Unlock()
// Default: hold until cancelled.
f.emit = emit
<-ctx.Done()
return nil
}
func (f *fakeConnector) Runs() int {
f.mu.Lock()
defer f.mu.Unlock()
return f.runs
}
func uuidFromString(t *testing.T, s string) pgtype.UUID {
t.Helper()
var u pgtype.UUID
if err := u.Scan(s); err != nil {
t.Fatalf("scan uuid %q: %v", s, err)
}
return u
}
func newDiscardLogger() *slog.Logger {
return slog.New(slog.NewTextHandler(io.Discard, nil))
}
func TestHubAcquiresLeaseAndStartsSupervisor(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "11111111-1111-1111-1111-111111111111")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
conn := &fakeConnector{}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 50 * time.Millisecond,
PollInterval: 10 * time.Millisecond,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 50 * time.Millisecond,
ResetBackoffAfter: 1 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
// Wait until the supervisor has started the connector at least once.
if !waitFor(200*time.Millisecond, func() bool { return conn.Runs() >= 1 }) {
t.Fatalf("expected connector to start; runs=%d", conn.Runs())
}
cancel()
hub.Wait()
// After shutdown the lease should be released so another replica
// can take over without waiting for the TTL to elapse.
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.leaseOwner[uuidString(instID)]; ok {
t.Fatalf("lease should be released after shutdown, got owner %q", q.leaseOwner[uuidString(instID)])
}
}
func TestHubSkipsInstallationWhenAnotherReplicaHoldsLease(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "22222222-2222-2222-2222-222222222222")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
// Another replica already owns the lease for the next 10 seconds.
q.presetLease(instID, "other-replica", time.Now().Add(10*time.Second))
conn := &fakeConnector{}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 20 * time.Millisecond,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 1 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
// Give the hub plenty of opportunity to try to take over.
time.Sleep(150 * time.Millisecond)
if conn.Runs() != 0 {
t.Fatalf("connector should not run while another replica owns lease; runs=%d", conn.Runs())
}
cancel()
hub.Wait()
}
func TestHubReclaimsLeaseAfterAnotherReplicaExpires(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "33333333-3333-3333-3333-333333333333")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
// Set the other replica's lease to expire in 80ms so the hub
// (which polls/renews on 20ms intervals) will pick it up.
q.presetLease(instID, "other-replica", time.Now().Add(80*time.Millisecond))
conn := &fakeConnector{}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 20 * time.Millisecond,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 1 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
if !waitFor(500*time.Millisecond, func() bool { return conn.Runs() >= 1 }) {
t.Fatalf("expected connector to start after lease expiry; runs=%d", conn.Runs())
}
cancel()
hub.Wait()
}
func TestHubReapsSupervisorWhenInstallationRevoked(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "44444444-4444-4444-4444-444444444444")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
conn := &fakeConnector{}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 20 * time.Millisecond,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 1 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
defer func() { cancel(); hub.Wait() }()
if !waitFor(200*time.Millisecond, func() bool { return conn.Runs() >= 1 }) {
t.Fatalf("expected connector to start; runs=%d", conn.Runs())
}
// Simulate revocation: the installation disappears from
// ListActiveLarkInstallations. The Hub should cancel its
// supervisor on the next sweep, which releases the lease.
q.mu.Lock()
q.installations = nil
q.mu.Unlock()
if !waitFor(500*time.Millisecond, func() bool {
q.mu.Lock()
defer q.mu.Unlock()
_, stillHeld := q.leaseOwner[uuidString(instID)]
return !stillHeld
}) {
t.Fatalf("expected lease to be released after revocation")
}
}
// TestHubRestartsSupervisorOnCredentialsRotation pins the rotation
// invariant Bohan hit on the live env: re-scanning the same agent
// runs the device flow again, which mints a brand-new Lark bot with
// a fresh app_id / encrypted app_secret. The lark_installation row is
// updated in place (UNIQUE(agent_id)), but the running supervisor
// holds the OLD inst struct by value. Without a fingerprint-driven
// restart the connector keeps a WS open against the OLD bot's app_id
// and the new bot silently goes dark — exactly the "claude code 没反应"
// symptom Bohan reported.
//
// Repro: start the hub with installation A (app_id=app_one), wait for
// the connector factory to be called, then mutate the row to a new
// app_id (app_two). The next sweep MUST cancel the first supervisor
// and start a second one with the new credentials.
func TestHubRestartsSupervisorOnCredentialsRotation(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "abcdabcd-abcd-abcd-abcd-abcdabcdabcd")
q.installations = []db.LarkInstallation{{
ID: instID,
Status: "active",
AppID: "app_one",
BotOpenID: "bot_open_id_one",
AppSecretEncrypted: []byte("encrypted_one"),
}}
type seenInst struct{ AppID string }
var mu sync.Mutex
var seen []seenInst
factory := func(inst db.LarkInstallation) (EventConnector, error) {
mu.Lock()
seen = append(seen, seenInst{AppID: inst.AppID})
mu.Unlock()
return &fakeConnector{}, nil
}
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 50 * time.Millisecond,
PollInterval: 20 * time.Millisecond,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 1 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
defer func() { cancel(); hub.Wait() }()
if !waitFor(300*time.Millisecond, func() bool {
mu.Lock()
defer mu.Unlock()
return len(seen) >= 1
}) {
t.Fatalf("expected initial connector start; got %d", len(seen))
}
// Rotate credentials in place — same installation_id, new app_id +
// new encrypted secret. This is exactly what device-flow re-scan
// produces.
q.mu.Lock()
q.installations[0].AppID = "app_two"
q.installations[0].BotOpenID = "bot_open_id_two"
q.installations[0].AppSecretEncrypted = []byte("encrypted_two")
q.mu.Unlock()
if !waitFor(500*time.Millisecond, func() bool {
mu.Lock()
defer mu.Unlock()
for _, s := range seen {
if s.AppID == "app_two" {
return true
}
}
return false
}) {
mu.Lock()
got := append([]seenInst(nil), seen...)
mu.Unlock()
t.Fatalf("expected a second connector start with rotated app_id; got %+v", got)
}
}
// TestHubDoesNotRestartSupervisorOnUnchangedRow pins the negative case:
// a sweep that observes an installation row identical to the fingerprint
// the supervisor was started with MUST NOT restart. Without this guard
// the rotation logic would degrade into a busy-loop, tearing the
// connector down on every poll tick.
func TestHubDoesNotRestartSupervisorOnUnchangedRow(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "bcdebcde-bcde-bcde-bcde-bcdebcdebcde")
q.installations = []db.LarkInstallation{{
ID: instID,
Status: "active",
AppID: "app_stable",
BotOpenID: "bot_stable",
AppSecretEncrypted: []byte("stable"),
}}
starts := int32(0)
factory := func(_ db.LarkInstallation) (EventConnector, error) {
atomic.AddInt32(&starts, 1)
return &fakeConnector{}, nil
}
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 50 * time.Millisecond,
PollInterval: 10 * time.Millisecond,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 1 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
defer func() { cancel(); hub.Wait() }()
// Wait until the supervisor has called the factory at least once.
if !waitFor(200*time.Millisecond, func() bool { return atomic.LoadInt32(&starts) >= 1 }) {
t.Fatalf("expected initial connector start; got %d", starts)
}
// Let several sweep ticks happen with an unchanged row.
time.Sleep(120 * time.Millisecond)
got := atomic.LoadInt32(&starts)
if got > 2 {
// Allow one extra start in case the fakeConnector returns
// immediately and the supervise loop re-enters the connector
// factory under backoff. More than that is the busy-loop bug.
t.Fatalf("supervisor restarted unexpectedly on unchanged row: %d factory calls in 320ms", got)
}
}
// TestHubRotationStaleReleaseDoesNotClearSuccessorLease pins the
// fix for Elon's review of HEAD be8d4cef. Before per-supervisor lease
// tokens, both old and new supervisors of the same Hub used the same
// hub-wide nodeID as their lease token. The rotation race went:
//
// 1. Old supervisor cancelled by maybeRestartOnRotation.
// 2. New supervisor started; acquireLease takes the lease.
// 3. Old supervisor finishes post-cancel unwind and calls
// releaseLease(nodeID).
// 4. DB row's CurrentToken still equals nodeID (because new
// supervisor wrote the SAME token). DELETE matches → DB row
// cleared → new supervisor's lease silently lost.
//
// Per-supervisor tokens (nodeID + "-g" + gen) make step 3's
// CurrentToken belong to the OLD supervisor, and the DB row's actual
// CurrentToken belongs to the NEW supervisor — so the DELETE no-ops
// and the successor keeps its lease.
//
// This test drives the lease state machine directly through the
// fakeHubQueries, simulating the exact ordering: old acquires
// (token_A) → rotation → new acquires (token_B) → old's stale
// release(token_A). The lease must remain held by token_B.
func TestHubRotationStaleReleaseDoesNotClearSuccessorLease(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "deadbeef-dead-beef-dead-beefdeadbeef")
expires := time.Now().Add(time.Minute)
tokenA := leaseToken("node_xyz", 1)
tokenB := leaseToken("node_xyz", 2)
if tokenA == tokenB {
t.Fatalf("per-supervisor tokens must differ; got %q for both", tokenA)
}
// Old supervisor acquires.
if _, err := q.AcquireLarkWSLease(context.Background(), db.AcquireLarkWSLeaseParams{
ID: instID,
NewToken: pgtype.Text{String: tokenA, Valid: true},
NewExpiresAt: pgtype.Timestamptz{Time: expires, Valid: true},
}); err != nil {
t.Fatalf("old acquire: %v", err)
}
q.mu.Lock()
if got := q.leaseOwner[uuidString(instID)]; got != tokenA {
t.Fatalf("after old acquire, owner = %q; want %q", got, tokenA)
}
q.mu.Unlock()
// Rotation: new supervisor takes over. acquireLease's CAS in the
// production query accepts "matching token" as a renewal — but
// here we want to exercise the post-cancel succession where the
// old lease is gone or being replaced. The fake's CAS also accepts
// expired rows, so we simulate the old supervisor's lease having
// been released cleanly (its renewal hasn't fired yet on rotation
// boundary, but in production a rotation cancels the old loop and
// the old supervisor's defer eventually calls releaseLease — let
// the new supervisor acquire BEFORE that release lands).
q.mu.Lock()
delete(q.leaseOwner, uuidString(instID))
delete(q.leaseExpiresAt, uuidString(instID))
q.mu.Unlock()
if _, err := q.AcquireLarkWSLease(context.Background(), db.AcquireLarkWSLeaseParams{
ID: instID,
NewToken: pgtype.Text{String: tokenB, Valid: true},
NewExpiresAt: pgtype.Timestamptz{Time: expires, Valid: true},
}); err != nil {
t.Fatalf("new acquire: %v", err)
}
q.mu.Lock()
if got := q.leaseOwner[uuidString(instID)]; got != tokenB {
t.Fatalf("after new acquire, owner = %q; want %q", got, tokenB)
}
q.mu.Unlock()
// Old supervisor's defer / cleanup fires AFTER new acquired.
// Without per-supervisor tokens this would clobber tokenB's lease.
if err := q.ReleaseLarkWSLease(context.Background(), db.ReleaseLarkWSLeaseParams{
ID: instID,
CurrentToken: pgtype.Text{String: tokenA, Valid: true},
}); err != nil {
t.Fatalf("old release: %v", err)
}
// Successor's lease must still be held by tokenB.
q.mu.Lock()
got, ok := q.leaseOwner[uuidString(instID)]
q.mu.Unlock()
if !ok {
t.Fatalf("successor lease silently cleared by stale release — the rotation race is back")
}
if got != tokenB {
t.Fatalf("after stale release, owner = %q; want %q (successor's token)", got, tokenB)
}
}
// TestHubRotationEndToEndKeepsSuccessorLeased exercises the same
// rotation race through the live supervise loop — not just the lease
// state machine in isolation. Drives a hub through:
//
// 1. install row with credentials A → supervisor1 acquires lease(A)
// 2. credentials rotate to B → maybeRestartOnRotation cancels sup1
// 3. supervisor2 starts, acquires lease(B)
// 4. sup1's post-cancel releaseLease(A) runs; must NOT clear lease(B)
//
// Even with the timing being non-deterministic (real goroutines), the
// fake's lease map either ends up with sup2's token or empty — empty
// means the successor lost its lease and would never deliver events,
// which is exactly the bug Elon flagged. We assert the lease ends up
// held by sup2's token.
func TestHubRotationEndToEndKeepsSuccessorLeased(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "feedfeed-feed-feed-feed-feedfeedfeed")
q.installations = []db.LarkInstallation{{
ID: instID,
Status: "active",
AppID: "app_one",
BotOpenID: "bot_one",
AppSecretEncrypted: []byte("secret_one"),
}}
// Use a fakeConnector that blocks on ctx.Done so the renewer keeps
// running and refreshes the lease at each tick, mirroring
// production timing for the rotation handoff.
factoryCalls := int32(0)
factory := func(_ db.LarkInstallation) (EventConnector, error) {
atomic.AddInt32(&factoryCalls, 1)
return &fakeConnector{}, nil
}
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 30 * time.Millisecond,
PollInterval: 15 * time.Millisecond,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 1 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
defer func() { cancel(); hub.Wait() }()
if !waitFor(300*time.Millisecond, func() bool { return atomic.LoadInt32(&factoryCalls) >= 1 }) {
t.Fatalf("expected sup1 to start; got %d factory calls", factoryCalls)
}
q.mu.Lock()
sup1Token := q.leaseOwner[uuidString(instID)]
q.mu.Unlock()
if sup1Token == "" {
t.Fatalf("sup1 did not write a lease token")
}
// Rotation.
q.mu.Lock()
q.installations[0].AppID = "app_two"
q.installations[0].BotOpenID = "bot_two"
q.installations[0].AppSecretEncrypted = []byte("secret_two")
q.mu.Unlock()
// Wait for sup2 to start AND its lease token to differ from sup1's.
// The successor's token must end up owning the row regardless of
// when sup1's stale release lands.
if !waitFor(500*time.Millisecond, func() bool {
if atomic.LoadInt32(&factoryCalls) < 2 {
return false
}
q.mu.Lock()
defer q.mu.Unlock()
curr, ok := q.leaseOwner[uuidString(instID)]
return ok && curr != sup1Token
}) {
q.mu.Lock()
got, ok := q.leaseOwner[uuidString(instID)]
q.mu.Unlock()
t.Fatalf("successor lease never present; ok=%v owner=%q factoryCalls=%d",
ok, got, atomic.LoadInt32(&factoryCalls))
}
// Give sup1's deferred release a chance to land, then re-check.
time.Sleep(150 * time.Millisecond)
q.mu.Lock()
owner, ok := q.leaseOwner[uuidString(instID)]
q.mu.Unlock()
if !ok {
t.Fatalf("successor lease cleared after sup1's stale release — rotation fencing broken")
}
if owner == sup1Token {
t.Fatalf("lease still held by sup1 token %q; successor never took over", sup1Token)
}
}
func TestHubBacksOffOnFactoryError(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "55555555-5555-5555-5555-555555555555")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
factoryCalls := int32(0)
factory := func(_ db.LarkInstallation) (EventConnector, error) {
atomic.AddInt32(&factoryCalls, 1)
return nil, errors.New("boom")
}
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 20 * time.Millisecond,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 1 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
// Let the supervisor retry under backoff. We want > 1 call to
// prove the loop is alive but the increasing delay should keep
// the rate sane.
if !waitFor(200*time.Millisecond, func() bool { return atomic.LoadInt32(&factoryCalls) >= 2 }) {
t.Fatalf("expected factory retries under backoff; got %d", atomic.LoadInt32(&factoryCalls))
}
calls := atomic.LoadInt32(&factoryCalls)
cancel()
hub.Wait()
if calls > 200 {
t.Fatalf("backoff appears broken: %d factory calls in 200ms", calls)
}
}
// TestHubLeaseLossCancelsConnector pins the §4.4 ownership invariant.
// When another replica steals the lease, the renewer must cancel the
// connector's run context so the connector exits even if its wire I/O
// is currently blocked. Without that cancel, replica A could keep
// reading Lark events for an unbounded window while replica B already
// believes it is the sole owner — duplicate consumption, exactly what
// the lease is supposed to prevent.
func TestHubLeaseLossCancelsConnector(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "66666666-6666-6666-6666-666666666666")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
// fakeConnector default behavior blocks on ctx.Done — perfect for
// "simulate a socket that never returns until we explicitly cancel
// it" scenarios. We capture the ctx the supervisor handed it so we
// can wait on its done channel directly.
connCtxCh := make(chan context.Context, 1)
conn := &fakeConnector{
script: []func(ctx context.Context, emit EventEmitter) error{
func(ctx context.Context, _ EventEmitter) error {
connCtxCh <- ctx
<-ctx.Done()
return ctx.Err()
},
},
}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 1 * time.Hour, // disable sweep noise; we drive lease state by hand.
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 10 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
defer func() { cancel(); hub.Wait() }()
go hub.Run(ctx)
// Wait for the supervisor to hand the connector a run context.
var runCtx context.Context
select {
case runCtx = <-connCtxCh:
case <-time.After(500 * time.Millisecond):
t.Fatalf("connector never started")
}
// Simulate lease theft: rewrite the lease row to point at another
// replica with a fresh expiry. The next renewal CAS will fail
// because the token no longer matches our nodeID, the renewer
// returns leased=false, and (with the fix) cancels the run ctx.
q.presetLease(instID, "thief-replica", time.Now().Add(10*time.Second))
select {
case <-runCtx.Done():
// Expected: renewer cancelled runCtx within a few renewal ticks.
case <-time.After(500 * time.Millisecond):
t.Fatalf("renewer did not cancel run ctx after lease loss")
}
}
// TestHubEmitReturnsDispatchResultAndError pins the connector-facing
// emit contract: the supervisor's emit shim wraps the Dispatcher and
// surfaces both the typed DispatchResult and any infra error so the
// real Lark connector can post the right outbound (binding card,
// offline card, etc.) and react to infra failures.
func TestHubEmitReturnsDispatchResultAndError(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "77777777-7777-7777-7777-777777777777")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
// Capture what emit returned on the first invocation so the
// connector goroutine can stash it for the test.
var (
gotRes DispatchResult
gotErr error
gotMu sync.Mutex
)
emitDone := make(chan struct{})
conn := &fakeConnector{
script: []func(ctx context.Context, emit EventEmitter) error{
func(ctx context.Context, emit EventEmitter) error {
res, err := emit(ctx, InboundMessage{
EventID: "evt-1",
EventType: "im.message.receive_v1",
})
gotMu.Lock()
gotRes = res
gotErr = err
gotMu.Unlock()
close(emitDone)
<-ctx.Done()
return ctx.Err()
},
},
}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
// No dispatcher wired -> emit must return ErrDispatcherNotConfigured.
// The point is the error surfaces back to the connector instead of
// being silently dropped at the Hub.
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 1 * time.Hour,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 10 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
defer func() { cancel(); hub.Wait() }()
go hub.Run(ctx)
select {
case <-emitDone:
case <-time.After(500 * time.Millisecond):
t.Fatalf("connector never invoked emit")
}
gotMu.Lock()
defer gotMu.Unlock()
if !errors.Is(gotErr, ErrDispatcherNotConfigured) {
t.Fatalf("emit should propagate dispatcher errors; got %v", gotErr)
}
if gotRes.Outcome != "" {
t.Fatalf("emit should not invent an outcome on dispatcher error; got %q", gotRes.Outcome)
}
}
// TestHubReleaseLeaseBoundedByTimeout pins the shutdown-safety
// invariant: a frozen DB pool must NOT keep the supervisor blocked
// on releaseLease past the configured LeaseReleaseTimeout. Without
// the bound, ctx.Background()-rooted release calls could hang
// forever on a stalled pool, dragging out process shutdown well
// past the operator's expected drain budget.
func TestHubReleaseLeaseBoundedByTimeout(t *testing.T) {
q := newFakeHubQueries()
q.releaseBlock = make(chan struct{}) // never closed; release always sees ctx.Done
instID := uuidFromString(t, "88888888-8888-8888-8888-888888888888")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
conn := &fakeConnector{}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
releaseTimeout := 50 * time.Millisecond
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 1 * time.Hour,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 10 * time.Second,
LeaseReleaseTimeout: releaseTimeout,
ShutdownTimeout: 2 * time.Second, // generous; we want WaitWithTimeout to succeed
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
if !waitFor(500*time.Millisecond, func() bool { return conn.Runs() >= 1 }) {
cancel()
hub.Wait()
t.Fatalf("expected connector to start; runs=%d", conn.Runs())
}
start := time.Now()
cancel()
// WaitWithTimeout MUST return true: the bound on releaseLease
// has to let the supervisor unwind even though our fake release
// never returns on its own.
if !hub.WaitWithTimeout(2 * time.Second) {
t.Fatalf("supervisor stuck despite bounded release; lease release timeout did not fire")
}
elapsed := time.Since(start)
// Sanity bound: shutdown must complete in roughly the release
// timeout plus a small jitter, NOT seconds. If the bound regressed
// (e.g. someone reintroduced ctx.Background() without a deadline),
// this assertion catches it.
if elapsed > 500*time.Millisecond {
t.Fatalf("shutdown took %s; expected ≈ %s + slack", elapsed, releaseTimeout)
}
q.mu.Lock()
gotErr := q.releaseObservedCtxErr
q.mu.Unlock()
if !errors.Is(gotErr, context.DeadlineExceeded) {
t.Fatalf("release should have observed DeadlineExceeded from its bounded ctx; got %v", gotErr)
}
}
// TestHubWaitWithTimeoutReturnsTrueWhenSupervisorsExit covers the
// happy path: everything stops cleanly within the deadline, so the
// caller can proceed without logging a timeout warning.
func TestHubWaitWithTimeoutReturnsTrueWhenSupervisorsExit(t *testing.T) {
q := newFakeHubQueries()
instID := uuidFromString(t, "99999999-9999-9999-9999-999999999999")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
conn := &fakeConnector{}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 1 * time.Hour,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 10 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
if !waitFor(500*time.Millisecond, func() bool { return conn.Runs() >= 1 }) {
cancel()
hub.Wait()
t.Fatalf("expected connector to start; runs=%d", conn.Runs())
}
cancel()
if !hub.WaitWithTimeout(1 * time.Second) {
t.Fatalf("WaitWithTimeout returned false despite supervisor exiting promptly")
}
}
// TestHubWaitWithTimeoutReturnsFalseWhenSupervisorStuck pins the
// bound on the join itself: if a (future real) connector or release
// path ignores ctx and refuses to exit, WaitWithTimeout MUST return
// false so main.go can log + proceed with shutdown rather than block
// the process forever.
func TestHubWaitWithTimeoutReturnsFalseWhenSupervisorStuck(t *testing.T) {
q := newFakeHubQueries()
q.releaseBlock = make(chan struct{}) // never closed
instID := uuidFromString(t, "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")
q.installations = []db.LarkInstallation{{ID: instID, Status: "active"}}
conn := &fakeConnector{}
factory := func(_ db.LarkInstallation) (EventConnector, error) { return conn, nil }
// LeaseReleaseTimeout > ShutdownTimeout so the release is still
// blocked when the join deadline expires. This pins the "join
// deadline trips before the supervisor unwinds" branch.
hub := NewHub(q, factory, nil, HubConfig{
LeaseTTL: 500 * time.Millisecond,
LeaseRenewInterval: 20 * time.Millisecond,
PollInterval: 1 * time.Hour,
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 20 * time.Millisecond,
ResetBackoffAfter: 10 * time.Second,
LeaseReleaseTimeout: 5 * time.Second,
Logger: newDiscardLogger(),
})
ctx, cancel := context.WithCancel(context.Background())
go hub.Run(ctx)
if !waitFor(500*time.Millisecond, func() bool { return conn.Runs() >= 1 }) {
cancel()
hub.Wait() // unbounded fallback; test will time out instead of hanging
t.Fatalf("expected connector to start; runs=%d", conn.Runs())
}
cancel()
if hub.WaitWithTimeout(50 * time.Millisecond) {
t.Fatalf("WaitWithTimeout returned true while release was still blocked")
}
// Unblock the release so the supervisor can finally exit and the
// test doesn't leak a goroutine.
close(q.releaseBlock)
hub.Wait()
}
// TestHubConfigDefaultsCoverShutdownKnobs documents that callers
// that omit the new shutdown knobs still get sensible defaults
// (matching the behavior router.go relies on by passing HubConfig{}).
// If the defaults regress to zero, releaseLease would derive a
// 0-deadline ctx that fails instantly — the real symptom would be
// "release lease failed: context deadline exceeded" warnings on
// every shutdown.
func TestHubConfigDefaultsCoverShutdownKnobs(t *testing.T) {
c := HubConfig{}.withDefaults()
if c.LeaseReleaseTimeout <= 0 {
t.Fatalf("LeaseReleaseTimeout default must be > 0; got %s", c.LeaseReleaseTimeout)
}
if c.ShutdownTimeout <= 0 {
t.Fatalf("ShutdownTimeout default must be > 0; got %s", c.ShutdownTimeout)
}
}
// waitFor polls cond until it returns true or the deadline is reached.
// Returns true on success. Tests use this instead of time.Sleep so they
// remain robust on slow CI runners without slowing fast ones down.
func waitFor(timeout time.Duration, cond func() bool) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if cond() {
return true
}
time.Sleep(time.Millisecond)
}
return cond()
}
// slowReplier blocks Reply for the configured duration unless the ctx
// fires first. Used to prove the Hub's reply-off-critical-path
// invariant: a Reply that exceeds ReplyTimeout MUST get its ctx
// cancelled instead of running unbounded.
type slowReplier struct {
delay time.Duration
startCh chan struct{}
finishCh chan struct{}
mu sync.Mutex
callCount int
lastCtxErr error // ctx.Err() observed when Reply returned
}
func newSlowReplier(delay time.Duration) *slowReplier {
return &slowReplier{
delay: delay,
startCh: make(chan struct{}, 16),
finishCh: make(chan struct{}, 16),
}
}
func (s *slowReplier) Reply(ctx context.Context, _ db.LarkInstallation, _ InboundMessage, _ DispatchResult) {
s.mu.Lock()
s.callCount++
s.mu.Unlock()
select {
case s.startCh <- struct{}{}:
default:
}
select {
case <-time.After(s.delay):
case <-ctx.Done():
}
s.mu.Lock()
s.lastCtxErr = ctx.Err()
s.mu.Unlock()
select {
case s.finishCh <- struct{}{}:
default:
}
}
func (s *slowReplier) calls() int {
s.mu.Lock()
defer s.mu.Unlock()
return s.callCount
}
func (s *slowReplier) ctxErr() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.lastCtxErr
}
// TestHubScheduleReplyReturnsImmediately verifies the core invariant
// behind the OutcomeReplier refactor: a slow replier MUST NOT block
// the dispatch caller. handleEvent is on the ACK critical path — the
// connector ACKs as soon as it returns — so coupling it to outbound
// Lark HTTP would let any slow card-send stall ACK past Lark's 3s
// deadline. This test puts a 10s sleep in the replier and asserts the
// scheduling call still returns in < 50ms.
func TestHubScheduleReplyReturnsImmediately(t *testing.T) {
t.Parallel()
rep := newSlowReplier(10 * time.Second)
hub := NewHub(nil, nil, nil, HubConfig{
ReplyTimeout: 100 * time.Millisecond,
Logger: newDiscardLogger(),
})
hub.SetOutcomeReplier(rep)
start := time.Now()
hub.scheduleReply(db.LarkInstallation{}, InboundMessage{EventID: "e1"},
DispatchResult{Outcome: OutcomeNeedsBinding}, newDiscardLogger())
elapsed := time.Since(start)
if elapsed > 50*time.Millisecond {
t.Fatalf("scheduleReply took %s; ACK critical path would be blocked by outbound HTTP", elapsed)
}
// The replier should still be in flight — proves it really did
// detach into a goroutine.
select {
case <-rep.startCh:
// good — Reply was invoked async
case <-time.After(500 * time.Millisecond):
t.Fatal("detached replier never ran")
}
// Drain the timeout-bounded reply so the test doesn't leak goroutines.
if !hub.WaitWithTimeout(1 * time.Second) {
t.Fatal("reply goroutine did not exit after ReplyTimeout fired")
}
if !errors.Is(rep.ctxErr(), context.DeadlineExceeded) {
t.Fatalf("replier ctx.Err() = %v; want DeadlineExceeded", rep.ctxErr())
}
}
// TestHubReplyTimeoutCancelsHungReplier pins the bound on the detached
// reply: a replier that ignores ctx (or its outbound HTTP that hasn't
// noticed yet) MUST be cancelled at ReplyTimeout. The test gives the
// replier a deliberately long sleep and a short timeout, then asserts
// the reply goroutine exits within roughly the timeout — not the sleep.
func TestHubReplyTimeoutCancelsHungReplier(t *testing.T) {
t.Parallel()
timeout := 80 * time.Millisecond
rep := newSlowReplier(10 * time.Second)
hub := NewHub(nil, nil, nil, HubConfig{
ReplyTimeout: timeout,
Logger: newDiscardLogger(),
})
hub.SetOutcomeReplier(rep)
start := time.Now()
hub.scheduleReply(db.LarkInstallation{}, InboundMessage{EventID: "e2"},
DispatchResult{Outcome: OutcomeAgentOffline}, newDiscardLogger())
select {
case <-rep.finishCh:
case <-time.After(1 * time.Second):
t.Fatal("replier never exited; ReplyTimeout did not fire")
}
elapsed := time.Since(start)
// Sanity bound: shutdown must complete in roughly timeout + jitter.
if elapsed > 500*time.Millisecond {
t.Fatalf("replier exit took %s; expected ≈ %s", elapsed, timeout)
}
if !errors.Is(rep.ctxErr(), context.DeadlineExceeded) {
t.Fatalf("replier should observe DeadlineExceeded; got %v", rep.ctxErr())
}
hub.Wait()
}
// TestHubWaitDrainsInFlightReplies verifies that Hub.Wait (and
// WaitWithTimeout) joins on the replier goroutines, not just the
// supervisor goroutines. Without this, shutdown could close the
// process while a binding-card send is still in flight — the user
// gets no card, no log entry, and the binding token they were going
// to receive is orphaned with no observability.
func TestHubWaitDrainsInFlightReplies(t *testing.T) {
t.Parallel()
rep := newSlowReplier(30 * time.Millisecond) // shorter than ReplyTimeout
hub := NewHub(nil, nil, nil, HubConfig{
ReplyTimeout: 1 * time.Second,
Logger: newDiscardLogger(),
})
hub.SetOutcomeReplier(rep)
hub.scheduleReply(db.LarkInstallation{}, InboundMessage{EventID: "e3"},
DispatchResult{Outcome: OutcomeNeedsBinding}, newDiscardLogger())
// Wait should block until the reply finishes its 30ms work.
start := time.Now()
hub.Wait()
elapsed := time.Since(start)
if elapsed < 20*time.Millisecond {
t.Fatalf("Wait returned in %s; should have blocked until reply completed", elapsed)
}
if rep.calls() != 1 {
t.Fatalf("reply call count = %d; want 1", rep.calls())
}
// Reply finished naturally (sleep returned), not via cancellation.
if rep.ctxErr() != nil {
t.Errorf("reply ctxErr = %v; want nil (slept normally)", rep.ctxErr())
}
}
// TestHubNoopReplierInlineNoGoroutine verifies the optimisation: the
// noop replier (used when outbound APIClient isn't configured) runs
// inline, not under a goroutine. This avoids the cost of a goroutine
// per inbound event on a deployment that hasn't wired outbound replies
// yet. Indirectly proven by observing replyWg is not bumped (Wait
// returns immediately without any reply goroutine to drain).
func TestHubNoopReplierInlineNoGoroutine(t *testing.T) {
t.Parallel()
hub := NewHub(nil, nil, nil, HubConfig{
Logger: newDiscardLogger(),
})
// hub.replier defaults to noop. Call scheduleReply many times — if
// it spawned goroutines, Wait would wait at least until those
// goroutines schedule, but with the fast-path it must return
// instantly.
for i := 0; i < 1000; i++ {
hub.scheduleReply(db.LarkInstallation{}, InboundMessage{EventID: "e"},
DispatchResult{Outcome: OutcomeNeedsBinding}, newDiscardLogger())
}
done := make(chan struct{})
go func() { hub.Wait(); close(done) }()
select {
case <-done:
case <-time.After(100 * time.Millisecond):
t.Fatal("Wait should return immediately when no goroutine replies were scheduled")
}
}
// TestHubReplyTimeoutDefaultIsUnder3s pins the value the production
// path uses — Lark requires ACK within 3 seconds, and the replier
// runs ON TOP of the dispatch latency, so it must complete strictly
// under 3s even if dispatch took 500ms. Defaulting to 2.5s leaves
// headroom both for outbound HTTP and for shutdown to drain replies
// without hitting the broader ShutdownTimeout.
func TestHubReplyTimeoutDefaultIsUnder3s(t *testing.T) {
t.Parallel()
c := HubConfig{}.withDefaults()
if c.ReplyTimeout <= 0 {
t.Fatalf("ReplyTimeout default must be > 0; got %s", c.ReplyTimeout)
}
if c.ReplyTimeout >= 3*time.Second {
t.Fatalf("ReplyTimeout default %s is too close to Lark's 3s ACK deadline; outbound HTTP would race ACK", c.ReplyTimeout)
}
}
// TestHubACKNotBlockedByOutboundReply proves the full integrated
// invariant: even when the OutcomeReplier hangs for far longer than
// the Lark ACK deadline, the connector's data-frame ACK still lands
// on the wire promptly. This is the end-to-end version of the unit
// test above, exercising connector -> Hub.handleEvent -> scheduleReply
// against a fakeWSConn so we can observe the actual binary ACK frame
// timing.
//
// Construct a dispatcher manually that returns OutcomeNeedsBinding
// (the outcome most prone to expensive outbound work — token mint +
// card send). Wire a slowReplier that sleeps 5s. Push one event.
// Assert: an ACK frame appears in conn.writes within 500ms (well
// under Lark's 3s budget).
func TestHubACKNotBlockedByOutboundReply(t *testing.T) {
t.Parallel()
conn := newFakeWSConn()
decoder := FrameDecoderFunc(func(payload []byte, _ db.LarkInstallation) (InboundMessage, bool, error) {
return InboundMessage{EventID: string(payload)}, true, nil
})
c := quietConnector(t, conn, decoder, time.Hour) // disable ping
// Slow replier that would block ACK if the critical path coupling
// regressed. 5s sleep, ReplyTimeout 2.5s — replier must be
// cancelled at ~2.5s and the ACK must NOT have waited for it.
rep := newSlowReplier(5 * time.Second)
hub := NewHub(nil, nil, nil, HubConfig{
ReplyTimeout: 2500 * time.Millisecond,
Logger: newDiscardLogger(),
})
hub.SetOutcomeReplier(rep)
emit := func(ctx context.Context, msg InboundMessage) (DispatchResult, error) {
// Simulate the dispatcher's "successful binding-needed" verdict
// without standing up the full Dispatcher (concrete struct +
// service deps). The async reply is the only behaviour under
// test here, and scheduleReply is what the real handleEvent
// hands to the replier.
res := DispatchResult{
Outcome: OutcomeNeedsBinding,
SenderOpenID: "ou_user_42",
}
hub.scheduleReply(db.LarkInstallation{}, msg, res, newDiscardLogger())
return res, nil
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan error, 1)
go func() {
done <- c.Run(ctx, db.LarkInstallation{AppID: "test_app"}, emit)
}()
start := time.Now()
pushDataFrame(conn, []byte("evt-binding"), "om-binding")
// The data-frame ACK MUST appear well under Lark's 3s budget.
if !waitFor(500*time.Millisecond, func() bool {
for _, w := range conn.snapshot() {
f, err := UnmarshalFrame(w)
if err != nil {
continue
}
if f.Method == FrameMethodData && bytes.Contains(f.Payload, []byte(`"code":200`)) {
return true
}
}
return false
}) {
t.Fatalf("data-frame ACK did not land within 500ms; outbound reply blocked the critical path (replier still running? %v)", rep.calls() == 1)
}
ackLatency := time.Since(start)
if ackLatency >= 3*time.Second {
t.Fatalf("ACK landed in %s, past Lark's 3s deadline", ackLatency)
}
// Verify the replier was indeed launched (proves we didn't accidentally
// short-circuit it).
select {
case <-rep.startCh:
case <-time.After(200 * time.Millisecond):
t.Fatal("replier never ran; the reply path is silently broken")
}
cancel()
<-done
hub.WaitWithTimeout(5 * time.Second)
}