Files
multica/server/pkg/db/generated/lark.sql.go
Bohan Jiang 6ac8314711 feat(lark): support both Feishu and Lark from one deployment (MUL-3083) (#3815)
* feat(lark): serve Feishu and Lark from one deployment, per installation

The Lark integration was locked to a single open-platform host chosen
deployment-wide (MULTICA_LARK_HTTP_BASE_URL / _CALLBACK_BASE_URL,
defaulting to open.feishu.cn), so one deployment could talk to only the
mainland Feishu cloud OR Lark international — never both. Teams on the
other tenant could not use the integration at all.

Make the host per-installation. The device-flow installer already
auto-detects the tenant (Lark emits tenant_brand="lark" mid-poll); we now
persist that as lark_installation.region, carry it on
InstallationCredentials.Region, and resolve the open-platform host per
call (REST + WS bootstrap) from the region. An explicit cfg.BaseURL
(env / httptest) still overrides every region, so existing tests and
staging/proxy setups keep working.

- migration 116: lark_installation.region TEXT NOT NULL DEFAULT 'feishu'
  CHECK (region IN ('feishu','lark')) — existing rows are all mainland.
- lark.Region enum + OpenPlatformBaseURL/RegionOrDefault helpers.
- registration: thread the detected region into finishSuccess so the
  install-time GetBotInfo hits the right cloud AND the row records it.
- every credential-build site (patcher, replier, WS provider, union_id
  backfill) copies region off the installation row.
- region is part of the WS supervisor fingerprint so a re-install that
  switches cloud restarts the connection.
- API: surface region on the installation listing DTO.

MUL-3083

Co-authored-by: multica-agent <github@multica.ai>

* feat(lark): surface installation region in settings UI

Read the per-installation region off the listings response: build the
"Manage in Lark" dev-console host from it (open.feishu.cn vs
open.larksuite.com instead of a hardcoded mainland host) and render a
Feishu / Lark badge on each connected bot. The field is optional and
defaults to Feishu when an older server omits it (API-compat). Adds the
region_feishu / region_lark labels to all four locales.

MUL-3083

Co-authored-by: multica-agent <github@multica.ai>

* docs(lark): document simultaneous Feishu + Lark support

The cloud each bot belongs to is now auto-detected at install and stored
per installation, so one deployment serves both. Replace the old
"point MULTICA_LARK_HTTP_BASE_URL at larksuite for international tenants"
guidance (now just an optional override) in all four locales.

MUL-3083

Co-authored-by: multica-agent <github@multica.ai>

* fix(lark): repair legacy Lark-international installs on upgrade

Review follow-up (MUL-3083). Migration 116 backfilled every existing
lark_installation to region='feishu', assuming all historical rows were
mainland. But self-host deployments could already run Lark international
via the deployment-wide MULTICA_LARK_HTTP_BASE_URL override, so those
rows are really Lark — clearing the override after upgrade (which the new
docs invite) would route them to open.feishu.cn and break them.

Add a one-shot startup repair, BackfillRegionFromLegacyOverride, fired
off the hot path like BackfillBotUnionIDs: when the deployment's global
base-URL override targets open.larksuite.com, relabel the still-default
'feishu' rows to 'lark'. Gating on the deployment-wide override is what
makes it safe — every pre-existing install on such a deployment was Lark.
Idempotent; no-op on mainland / fresh deployments. Verified end-to-end
against a scratch DB (flip then 0-row idempotent re-run).

Also document that a Lark/飞书 app_id is globally unique across both
clouds, which is what makes the app_id-keyed token cache and the
UNIQUE(app_id) constraint safe across regions (review nit).

MUL-3083

Co-authored-by: multica-agent <github@multica.ai>

* docs(lark): fix ops guidance to match auto per-installation region

Review follow-up (MUL-3083). .env.example and docker-compose.selfhost.yml
still told operators that international Lark requires pointing both base
URLs at open.larksuite.com — now wrong, and it would push a fresh
deployment back into a single-cloud override. Rewrite them: the base
URLs are optional deployment-wide overrides; normal dual-cloud operation
keeps them empty. Document the first-boot auto-relabel for deployments
migrating off the old single-cloud override, across the integration docs
(en/zh/ja/ko).

MUL-3083

Co-authored-by: multica-agent <github@multica.ai>

---------

Co-authored-by: J <j@multica.ai>
Co-authored-by: multica-agent <github@multica.ai>
2026-06-05 16:03:13 +08:00

1145 lines
38 KiB
Go

// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.31.1
// source: lark.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const acquireLarkWSLease = `-- name: AcquireLarkWSLease :one
UPDATE lark_installation
SET ws_lease_token = $1,
ws_lease_expires_at = $2,
updated_at = now()
WHERE id = $3
AND status = 'active'
AND (
ws_lease_token IS NULL
OR ws_lease_expires_at < now()
OR ws_lease_token = $1
)
RETURNING id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region
`
type AcquireLarkWSLeaseParams struct {
NewToken pgtype.Text `json:"new_token"`
NewExpiresAt pgtype.Timestamptz `json:"new_expires_at"`
ID pgtype.UUID `json:"id"`
}
// Atomically claims the WebSocket lease for an installation. The CAS
// predicate accepts the lease when (a) no current holder exists, (b)
// the holder's lease has expired, or (c) the holder is us (renewal).
// Returns the row when the lease was successfully claimed; returns no
// rows when another live holder still owns it.
func (q *Queries) AcquireLarkWSLease(ctx context.Context, arg AcquireLarkWSLeaseParams) (LarkInstallation, error) {
row := q.db.QueryRow(ctx, acquireLarkWSLease, arg.NewToken, arg.NewExpiresAt, arg.ID)
var i LarkInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
)
return i, err
}
const backfillLarkInstallationRegionToLark = `-- name: BackfillLarkInstallationRegionToLark :execrows
UPDATE lark_installation
SET region = 'lark',
updated_at = now()
WHERE region = 'feishu'
`
// Upgrade repair: flip every installation still carrying the migration-116
// default ('feishu') to 'lark'. Called ONLY by
// BackfillRegionFromLegacyOverride, and ONLY when the deployment's global
// base-URL override pointed at Lark international — on such a deployment the
// whole integration talked to open.larksuite.com, so every existing install
// is really Lark and the migration's mainland default mislabels it.
// Idempotent: once flipped there is nothing left at 'feishu' to update, and
// new installs already carry the device-flow-detected region.
func (q *Queries) BackfillLarkInstallationRegionToLark(ctx context.Context) (int64, error) {
result, err := q.db.Exec(ctx, backfillLarkInstallationRegionToLark)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const claimLarkInboundDedup = `-- name: ClaimLarkInboundDedup :one
INSERT INTO lark_inbound_message_dedup (installation_id, message_id, claim_token)
VALUES ($1, $2, gen_random_uuid())
ON CONFLICT (installation_id, message_id) DO UPDATE
SET received_at = now(),
claim_token = gen_random_uuid()
WHERE lark_inbound_message_dedup.processed_at IS NULL
AND lark_inbound_message_dedup.received_at < now() - INTERVAL '60 seconds'
RETURNING installation_id, message_id, received_at, processed_at, claim_token
`
type ClaimLarkInboundDedupParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
MessageID string `json:"message_id"`
}
// =====================
// lark_inbound_message_dedup
// =====================
// The two-phase idempotency gate. The dispatcher uses this BEFORE
// group filter / identity check / chat-session lookup so a WebSocket
// reconnect that replays an event cannot re-trigger binding prompts,
// re-write drop audit rows, or re-touch chat_session.
//
// Returns the row when a claim is acquired:
// - newly inserted (first delivery of this message_id), OR
// - re-taken from a stale in-flight claim. A claim is stale when
// processed_at IS NULL AND received_at is older than 60 seconds —
// the previous worker crashed or lost its DB connection between
// claim and finalize, and a retry should be allowed to proceed.
//
// Returns NO rows (pgx.ErrNoRows) when the claim cannot be acquired:
// - the row exists with processed_at IS NOT NULL (terminal: prior
// attempt reached a durable outcome), OR
// - the row exists with processed_at IS NULL AND received_at within
// the last 60 seconds (another worker is actively processing).
//
// Owner fencing: every successful Claim mints a fresh UUID into
// `claim_token`. The Caller passes that token to MarkLarkInbound-
// DedupProcessed / ReleaseLarkInboundDedup; mismatched tokens are
// ignored. A stale-reclaim that re-takes the row ROTATES the token,
// so the previous (slow but still alive) worker can no longer Mark
// the row — its same-tx Mark returns zero rows and the chat_message
// write rolls back. See lark_inbound_message_dedup table comment.
//
// The dispatcher MUST follow up every successful claim with exactly one
// of MarkLarkInboundDedupProcessed (durable outcome) or
// ReleaseLarkInboundDedup (infra failure before durable outcome),
// supplying the returned claim_token. Otherwise the row sits as an
// in-flight claim and the next replay attempt must wait for the
// staleness TTL.
func (q *Queries) ClaimLarkInboundDedup(ctx context.Context, arg ClaimLarkInboundDedupParams) (LarkInboundMessageDedup, error) {
row := q.db.QueryRow(ctx, claimLarkInboundDedup, arg.InstallationID, arg.MessageID)
var i LarkInboundMessageDedup
err := row.Scan(
&i.InstallationID,
&i.MessageID,
&i.ReceivedAt,
&i.ProcessedAt,
&i.ClaimToken,
)
return i, err
}
const consumeLarkBindingToken = `-- name: ConsumeLarkBindingToken :one
UPDATE lark_binding_token
SET consumed_at = now()
WHERE token_hash = $1
AND consumed_at IS NULL
AND expires_at > now()
RETURNING token_hash, workspace_id, installation_id, lark_open_id, expires_at, consumed_at, created_at
`
// Atomic redemption. Returns the row only if (a) the hash exists, (b)
// it has not been consumed, and (c) it has not expired. The UPDATE +
// RETURNING pattern guarantees that two simultaneous redemptions of
// the same token cannot both succeed — exactly one row update wins,
// the other sees zero rows.
func (q *Queries) ConsumeLarkBindingToken(ctx context.Context, tokenHash string) (LarkBindingToken, error) {
row := q.db.QueryRow(ctx, consumeLarkBindingToken, tokenHash)
var i LarkBindingToken
err := row.Scan(
&i.TokenHash,
&i.WorkspaceID,
&i.InstallationID,
&i.LarkOpenID,
&i.ExpiresAt,
&i.ConsumedAt,
&i.CreatedAt,
)
return i, err
}
const createLarkBindingToken = `-- name: CreateLarkBindingToken :one
INSERT INTO lark_binding_token (
token_hash, workspace_id, installation_id, lark_open_id, expires_at
) VALUES (
$1, $2, $3, $4, $5
)
RETURNING token_hash, workspace_id, installation_id, lark_open_id, expires_at, consumed_at, created_at
`
type CreateLarkBindingTokenParams struct {
TokenHash string `json:"token_hash"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
InstallationID pgtype.UUID `json:"installation_id"`
LarkOpenID string `json:"lark_open_id"`
ExpiresAt pgtype.Timestamptz `json:"expires_at"`
}
// =====================
// lark_binding_token
// =====================
// Mints a single-use binding token for an unbound Lark user. The TTL
// cap (`expires_at <= created_at + INTERVAL '15 minutes'`) is enforced
// by the DB CHECK on the table, in lockstep with lark.BindingTokenTTL.
// We store the HASH, not the raw token; the raw value is returned to
// the caller exactly once (in the URL it embeds in the Bot's reply
// card) and never persisted server-side.
func (q *Queries) CreateLarkBindingToken(ctx context.Context, arg CreateLarkBindingTokenParams) (LarkBindingToken, error) {
row := q.db.QueryRow(ctx, createLarkBindingToken,
arg.TokenHash,
arg.WorkspaceID,
arg.InstallationID,
arg.LarkOpenID,
arg.ExpiresAt,
)
var i LarkBindingToken
err := row.Scan(
&i.TokenHash,
&i.WorkspaceID,
&i.InstallationID,
&i.LarkOpenID,
&i.ExpiresAt,
&i.ConsumedAt,
&i.CreatedAt,
)
return i, err
}
const createLarkChatSessionBinding = `-- name: CreateLarkChatSessionBinding :one
INSERT INTO lark_chat_session_binding (
chat_session_id, installation_id, lark_chat_id, lark_chat_type
) VALUES (
$1, $2, $3, $4
)
RETURNING id, chat_session_id, installation_id, lark_chat_id, lark_chat_type, created_at
`
type CreateLarkChatSessionBindingParams struct {
ChatSessionID pgtype.UUID `json:"chat_session_id"`
InstallationID pgtype.UUID `json:"installation_id"`
LarkChatID string `json:"lark_chat_id"`
LarkChatType string `json:"lark_chat_type"`
}
// =====================
// lark_chat_session_binding
// =====================
func (q *Queries) CreateLarkChatSessionBinding(ctx context.Context, arg CreateLarkChatSessionBindingParams) (LarkChatSessionBinding, error) {
row := q.db.QueryRow(ctx, createLarkChatSessionBinding,
arg.ChatSessionID,
arg.InstallationID,
arg.LarkChatID,
arg.LarkChatType,
)
var i LarkChatSessionBinding
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.InstallationID,
&i.LarkChatID,
&i.LarkChatType,
&i.CreatedAt,
)
return i, err
}
const createLarkInstallation = `-- name: CreateLarkInstallation :one
INSERT INTO lark_installation (
workspace_id, agent_id, app_id, app_secret_encrypted,
tenant_key, bot_open_id, bot_union_id, installer_user_id
) VALUES (
$1, $2, $3, $4, $7, $5, $8, $6
)
RETURNING id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region
`
type CreateLarkInstallationParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
AgentID pgtype.UUID `json:"agent_id"`
AppID string `json:"app_id"`
AppSecretEncrypted []byte `json:"app_secret_encrypted"`
BotOpenID string `json:"bot_open_id"`
InstallerUserID pgtype.UUID `json:"installer_user_id"`
TenantKey pgtype.Text `json:"tenant_key"`
BotUnionID pgtype.Text `json:"bot_union_id"`
}
// Lark (飞书) Bot integration queries. The migration that defines these
// tables lives at server/migrations/109_lark_integration.up.sql; the
// architectural boundaries the package enforces on top of them are
// documented in server/internal/integrations/lark/doc.go.
//
// Scoping convention: every public-facing read goes through a
// workspace-scoped variant where one exists. The lookups that take only
// a UUID PK (e.g. GetLarkInstallation) are reserved for internal trusted
// callers (the WS lease scanner, the inbound dispatcher after identity
// resolution); HTTP handlers should prefer the *InWorkspace forms.
// =====================
// lark_installation
// =====================
// Used by the OAuth callback. `app_secret_encrypted` is the ciphertext
// produced by internal/util/secretbox — never plaintext. The
// (workspace_id, agent_id) UNIQUE constraint enforces the spec rule
// "one Multica Agent ↔ one Lark Bot"; re-installing on the same agent
// goes through UpsertLarkInstallation instead.
func (q *Queries) CreateLarkInstallation(ctx context.Context, arg CreateLarkInstallationParams) (LarkInstallation, error) {
row := q.db.QueryRow(ctx, createLarkInstallation,
arg.WorkspaceID,
arg.AgentID,
arg.AppID,
arg.AppSecretEncrypted,
arg.BotOpenID,
arg.InstallerUserID,
arg.TenantKey,
arg.BotUnionID,
)
var i LarkInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
)
return i, err
}
const createLarkOutboundCardMessage = `-- name: CreateLarkOutboundCardMessage :one
INSERT INTO lark_outbound_card_message (
chat_session_id, task_id, lark_chat_id, lark_card_message_id, status
) VALUES (
$1, $5, $2, $3, $4
)
RETURNING id, chat_session_id, task_id, lark_chat_id, lark_card_message_id, status, last_patched_at, created_at
`
type CreateLarkOutboundCardMessageParams struct {
ChatSessionID pgtype.UUID `json:"chat_session_id"`
LarkChatID string `json:"lark_chat_id"`
LarkCardMessageID string `json:"lark_card_message_id"`
Status string `json:"status"`
TaskID pgtype.UUID `json:"task_id"`
}
// =====================
// lark_outbound_card_message
// =====================
func (q *Queries) CreateLarkOutboundCardMessage(ctx context.Context, arg CreateLarkOutboundCardMessageParams) (LarkOutboundCardMessage, error) {
row := q.db.QueryRow(ctx, createLarkOutboundCardMessage,
arg.ChatSessionID,
arg.LarkChatID,
arg.LarkCardMessageID,
arg.Status,
arg.TaskID,
)
var i LarkOutboundCardMessage
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.TaskID,
&i.LarkChatID,
&i.LarkCardMessageID,
&i.Status,
&i.LastPatchedAt,
&i.CreatedAt,
)
return i, err
}
const createLarkUserBinding = `-- name: CreateLarkUserBinding :one
INSERT INTO lark_user_binding (
workspace_id, multica_user_id, installation_id, lark_open_id, union_id
) VALUES (
$1, $2, $3, $4, $5
)
ON CONFLICT (installation_id, lark_open_id) DO UPDATE SET
union_id = COALESCE(EXCLUDED.union_id, lark_user_binding.union_id),
bound_at = now()
WHERE lark_user_binding.multica_user_id = EXCLUDED.multica_user_id
RETURNING id, workspace_id, multica_user_id, installation_id, lark_open_id, union_id, bound_at
`
type CreateLarkUserBindingParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
MulticaUserID pgtype.UUID `json:"multica_user_id"`
InstallationID pgtype.UUID `json:"installation_id"`
LarkOpenID string `json:"lark_open_id"`
UnionID pgtype.Text `json:"union_id"`
}
// =====================
// lark_user_binding
// =====================
// Records that a Lark open_id (per-installation) maps to a Multica
// user.
//
// Two structural guarantees:
// 1. The composite FK to member(workspace_id, user_id) makes this
// statement fail when the redeemer is not (or no longer) a
// workspace member — that is §4.3 of the design.
// 2. ON CONFLICT DO UPDATE is gated on `multica_user_id` matching
// the existing binding, so a second redeemer holding their own
// valid binding token CANNOT silently steal an already-bound
// open_id. If the conflict row points at a different user, the
// UPDATE is skipped and the statement returns ZERO rows — the
// caller (lark.BindingTokenService.RedeemAndBind) translates
// that into ErrBindingAlreadyAssigned.
//
// The same-user case still updates metadata (union_id refresh,
// bound_at bump) so an idempotent re-bind by the original user
// continues to work; only a cross-user re-assignment is rejected.
// True account changes must go through an explicit unbind flow, not
// through a binding token.
func (q *Queries) CreateLarkUserBinding(ctx context.Context, arg CreateLarkUserBindingParams) (LarkUserBinding, error) {
row := q.db.QueryRow(ctx, createLarkUserBinding,
arg.WorkspaceID,
arg.MulticaUserID,
arg.InstallationID,
arg.LarkOpenID,
arg.UnionID,
)
var i LarkUserBinding
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.MulticaUserID,
&i.InstallationID,
&i.LarkOpenID,
&i.UnionID,
&i.BoundAt,
)
return i, err
}
const deleteLarkUserBinding = `-- name: DeleteLarkUserBinding :exec
DELETE FROM lark_user_binding WHERE id = $1
`
func (q *Queries) DeleteLarkUserBinding(ctx context.Context, id pgtype.UUID) error {
_, err := q.db.Exec(ctx, deleteLarkUserBinding, id)
return err
}
const getLarkChatSessionBinding = `-- name: GetLarkChatSessionBinding :one
SELECT id, chat_session_id, installation_id, lark_chat_id, lark_chat_type, created_at FROM lark_chat_session_binding
WHERE installation_id = $1 AND lark_chat_id = $2
`
type GetLarkChatSessionBindingParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
LarkChatID string `json:"lark_chat_id"`
}
// Lookup-by-Lark-chat path. Used by the inbound dispatcher to find the
// existing chat_session before deciding whether to create one. The
// UNIQUE (installation_id, lark_chat_id) constraint means at most one
// row matches.
func (q *Queries) GetLarkChatSessionBinding(ctx context.Context, arg GetLarkChatSessionBindingParams) (LarkChatSessionBinding, error) {
row := q.db.QueryRow(ctx, getLarkChatSessionBinding, arg.InstallationID, arg.LarkChatID)
var i LarkChatSessionBinding
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.InstallationID,
&i.LarkChatID,
&i.LarkChatType,
&i.CreatedAt,
)
return i, err
}
const getLarkChatSessionBindingBySession = `-- name: GetLarkChatSessionBindingBySession :one
SELECT id, chat_session_id, installation_id, lark_chat_id, lark_chat_type, created_at FROM lark_chat_session_binding
WHERE chat_session_id = $1
`
// Reverse lookup: given a chat_session_id, find its Lark binding. Used
// by the outbound card patcher to know which (installation, chat_id)
// to PATCH when an agent emits a stream event for this session.
func (q *Queries) GetLarkChatSessionBindingBySession(ctx context.Context, chatSessionID pgtype.UUID) (LarkChatSessionBinding, error) {
row := q.db.QueryRow(ctx, getLarkChatSessionBindingBySession, chatSessionID)
var i LarkChatSessionBinding
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.InstallationID,
&i.LarkChatID,
&i.LarkChatType,
&i.CreatedAt,
)
return i, err
}
const getLarkInstallation = `-- name: GetLarkInstallation :one
SELECT id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region FROM lark_installation WHERE id = $1
`
func (q *Queries) GetLarkInstallation(ctx context.Context, id pgtype.UUID) (LarkInstallation, error) {
row := q.db.QueryRow(ctx, getLarkInstallation, id)
var i LarkInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
)
return i, err
}
const getLarkInstallationByAgent = `-- name: GetLarkInstallationByAgent :one
SELECT id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region FROM lark_installation
WHERE workspace_id = $1 AND agent_id = $2
`
type GetLarkInstallationByAgentParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
AgentID pgtype.UUID `json:"agent_id"`
}
func (q *Queries) GetLarkInstallationByAgent(ctx context.Context, arg GetLarkInstallationByAgentParams) (LarkInstallation, error) {
row := q.db.QueryRow(ctx, getLarkInstallationByAgent, arg.WorkspaceID, arg.AgentID)
var i LarkInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
)
return i, err
}
const getLarkInstallationByAppID = `-- name: GetLarkInstallationByAppID :one
SELECT id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region FROM lark_installation WHERE app_id = $1
`
// Used by the OAuth callback to detect re-install vs first-install,
// and by the inbound dispatcher to route an event payload (which only
// carries app_id) to its installation row.
func (q *Queries) GetLarkInstallationByAppID(ctx context.Context, appID string) (LarkInstallation, error) {
row := q.db.QueryRow(ctx, getLarkInstallationByAppID, appID)
var i LarkInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
)
return i, err
}
const getLarkInstallationInWorkspace = `-- name: GetLarkInstallationInWorkspace :one
SELECT id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region FROM lark_installation
WHERE id = $1 AND workspace_id = $2
`
type GetLarkInstallationInWorkspaceParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
func (q *Queries) GetLarkInstallationInWorkspace(ctx context.Context, arg GetLarkInstallationInWorkspaceParams) (LarkInstallation, error) {
row := q.db.QueryRow(ctx, getLarkInstallationInWorkspace, arg.ID, arg.WorkspaceID)
var i LarkInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
)
return i, err
}
const getLarkOutboundCardByTask = `-- name: GetLarkOutboundCardByTask :one
SELECT id, chat_session_id, task_id, lark_chat_id, lark_card_message_id, status, last_patched_at, created_at FROM lark_outbound_card_message
WHERE task_id = $1
`
// Most card patches arrive keyed by task_id (we're streaming an agent
// run's output). The partial unique index on (task_id) WHERE task_id IS
// NOT NULL guarantees this returns at most one row.
func (q *Queries) GetLarkOutboundCardByTask(ctx context.Context, taskID pgtype.UUID) (LarkOutboundCardMessage, error) {
row := q.db.QueryRow(ctx, getLarkOutboundCardByTask, taskID)
var i LarkOutboundCardMessage
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.TaskID,
&i.LarkChatID,
&i.LarkCardMessageID,
&i.Status,
&i.LastPatchedAt,
&i.CreatedAt,
)
return i, err
}
const getLarkUserBindingByOpenID = `-- name: GetLarkUserBindingByOpenID :one
SELECT id, workspace_id, multica_user_id, installation_id, lark_open_id, union_id, bound_at FROM lark_user_binding
WHERE installation_id = $1 AND lark_open_id = $2
`
type GetLarkUserBindingByOpenIDParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
LarkOpenID string `json:"lark_open_id"`
}
// The inbound identity check. A row here means: this open_id maps to a
// Multica user who IS currently a workspace member (the composite FK
// cascades the binding away when membership is revoked, so a row's
// existence is itself the membership proof).
func (q *Queries) GetLarkUserBindingByOpenID(ctx context.Context, arg GetLarkUserBindingByOpenIDParams) (LarkUserBinding, error) {
row := q.db.QueryRow(ctx, getLarkUserBindingByOpenID, arg.InstallationID, arg.LarkOpenID)
var i LarkUserBinding
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.MulticaUserID,
&i.InstallationID,
&i.LarkOpenID,
&i.UnionID,
&i.BoundAt,
)
return i, err
}
const listActiveLarkInstallations = `-- name: ListActiveLarkInstallations :many
SELECT id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region FROM lark_installation
WHERE status = 'active'
ORDER BY created_at ASC
`
// Boot path for the WebSocket hub: enumerate every active installation
// so the hub can claim leases and open long connections. Excludes
// revoked rows — their WS should already be torn down.
func (q *Queries) ListActiveLarkInstallations(ctx context.Context) ([]LarkInstallation, error) {
rows, err := q.db.Query(ctx, listActiveLarkInstallations)
if err != nil {
return nil, err
}
defer rows.Close()
items := []LarkInstallation{}
for rows.Next() {
var i LarkInstallation
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listLarkInboundAuditByInstallation = `-- name: ListLarkInboundAuditByInstallation :many
SELECT id, installation_id, lark_chat_id, event_type, lark_event_id, lark_message_id, drop_reason, received_at FROM lark_inbound_audit
WHERE installation_id = $1
ORDER BY received_at DESC
LIMIT $2 OFFSET $3
`
type ListLarkInboundAuditByInstallationParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
Limit int32 `json:"limit"`
Offset int32 `json:"offset"`
}
// Ops debugging view; paged via the (installation_id, received_at) idx.
func (q *Queries) ListLarkInboundAuditByInstallation(ctx context.Context, arg ListLarkInboundAuditByInstallationParams) ([]LarkInboundAudit, error) {
rows, err := q.db.Query(ctx, listLarkInboundAuditByInstallation, arg.InstallationID, arg.Limit, arg.Offset)
if err != nil {
return nil, err
}
defer rows.Close()
items := []LarkInboundAudit{}
for rows.Next() {
var i LarkInboundAudit
if err := rows.Scan(
&i.ID,
&i.InstallationID,
&i.LarkChatID,
&i.EventType,
&i.LarkEventID,
&i.LarkMessageID,
&i.DropReason,
&i.ReceivedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listLarkInstallationsByWorkspace = `-- name: ListLarkInstallationsByWorkspace :many
SELECT id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region FROM lark_installation
WHERE workspace_id = $1
ORDER BY created_at ASC
`
func (q *Queries) ListLarkInstallationsByWorkspace(ctx context.Context, workspaceID pgtype.UUID) ([]LarkInstallation, error) {
rows, err := q.db.Query(ctx, listLarkInstallationsByWorkspace, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []LarkInstallation{}
for rows.Next() {
var i LarkInstallation
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listLarkUserBindingsByInstallation = `-- name: ListLarkUserBindingsByInstallation :many
SELECT id, workspace_id, multica_user_id, installation_id, lark_open_id, union_id, bound_at FROM lark_user_binding
WHERE installation_id = $1
ORDER BY bound_at DESC
`
func (q *Queries) ListLarkUserBindingsByInstallation(ctx context.Context, installationID pgtype.UUID) ([]LarkUserBinding, error) {
rows, err := q.db.Query(ctx, listLarkUserBindingsByInstallation, installationID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []LarkUserBinding{}
for rows.Next() {
var i LarkUserBinding
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.MulticaUserID,
&i.InstallationID,
&i.LarkOpenID,
&i.UnionID,
&i.BoundAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const markLarkInboundDedupProcessed = `-- name: MarkLarkInboundDedupProcessed :execrows
UPDATE lark_inbound_message_dedup
SET processed_at = now()
WHERE installation_id = $1
AND message_id = $2
AND claim_token = $3
AND processed_at IS NULL
`
type MarkLarkInboundDedupProcessedParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
MessageID string `json:"message_id"`
ClaimToken pgtype.UUID `json:"claim_token"`
}
// Locks in a claim as permanently processed. Called by the dispatcher
// after a durable outcome has been reached:
// - a drop audit row was persisted (group filter / unbound user /
// revoked / invalid event), OR
// - chat_message + chat_session.updated_at were committed (ingest
// path, including ingest paths that subsequently fail at issue
// creation / task enqueue — the user-visible message is already in
// the session).
//
// For the chat_message ingest path the dispatcher invokes this query
// INSIDE the chat_message+session transaction (via qtx), so the
// durable write and the Mark commit atomically. A token mismatch
// (another worker has re-claimed the row in the meantime) returns
// zero rows; the caller treats that as a lost claim and rolls back the
// in-tx invocation, so no second chat_message is written.
//
// Guarded by processed_at IS NULL so a successful Mark is itself
// idempotent: replaying it cannot resurrect a row that was already
// terminal.
func (q *Queries) MarkLarkInboundDedupProcessed(ctx context.Context, arg MarkLarkInboundDedupProcessedParams) (int64, error) {
result, err := q.db.Exec(ctx, markLarkInboundDedupProcessed, arg.InstallationID, arg.MessageID, arg.ClaimToken)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const purgeExpiredLarkBindingTokens = `-- name: PurgeExpiredLarkBindingTokens :exec
DELETE FROM lark_binding_token
WHERE expires_at < $1
`
// Tokens are tiny but unbounded over time. The same vacuum cron that
// handles dedup can sweep these too.
func (q *Queries) PurgeExpiredLarkBindingTokens(ctx context.Context, expiresAt pgtype.Timestamptz) error {
_, err := q.db.Exec(ctx, purgeExpiredLarkBindingTokens, expiresAt)
return err
}
const purgeLarkInboundDedup = `-- name: PurgeLarkInboundDedup :exec
DELETE FROM lark_inbound_message_dedup
WHERE received_at < $1
`
// Removes dedup rows older than the supplied cutoff. The vacuum job
// (separate cron) calls this with cutoff = now() - INTERVAL '24h'.
// Sweeps both processed and (very old) abandoned in-flight rows.
func (q *Queries) PurgeLarkInboundDedup(ctx context.Context, receivedAt pgtype.Timestamptz) error {
_, err := q.db.Exec(ctx, purgeLarkInboundDedup, receivedAt)
return err
}
const recordLarkInboundDrop = `-- name: RecordLarkInboundDrop :exec
INSERT INTO lark_inbound_audit (
installation_id, lark_chat_id, event_type,
lark_event_id, lark_message_id, drop_reason
) VALUES (
$3,
$4,
$1,
$5,
$6,
$2
)
`
type RecordLarkInboundDropParams struct {
EventType string `json:"event_type"`
DropReason string `json:"drop_reason"`
InstallationID pgtype.UUID `json:"installation_id"`
LarkChatID pgtype.Text `json:"lark_chat_id"`
LarkEventID pgtype.Text `json:"lark_event_id"`
LarkMessageID pgtype.Text `json:"lark_message_id"`
}
// =====================
// lark_inbound_audit
// =====================
// The ONLY write path for events that fail identity check or the
// group-mention filter. Deliberately accepts no body column — the
// AuditLogger interface in internal/integrations/lark mirrors that
// shape so a caller cannot accidentally hand a body to this row.
func (q *Queries) RecordLarkInboundDrop(ctx context.Context, arg RecordLarkInboundDropParams) error {
_, err := q.db.Exec(ctx, recordLarkInboundDrop,
arg.EventType,
arg.DropReason,
arg.InstallationID,
arg.LarkChatID,
arg.LarkEventID,
arg.LarkMessageID,
)
return err
}
const releaseLarkInboundDedup = `-- name: ReleaseLarkInboundDedup :execrows
DELETE FROM lark_inbound_message_dedup
WHERE installation_id = $1
AND message_id = $2
AND claim_token = $3
AND processed_at IS NULL
`
type ReleaseLarkInboundDedupParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
MessageID string `json:"message_id"`
ClaimToken pgtype.UUID `json:"claim_token"`
}
// Releases an in-flight claim. Called by the dispatcher when an infra
// error occurred BEFORE any durable side effect (e.g. EnsureChatSession
// or AppendUserMessage returned an error and its transaction rolled
// back). Deleting the row lets the WS adapter's retry re-acquire the
// claim immediately, instead of waiting for the 60-second staleness
// TTL. Guarded by processed_at IS NULL so an out-of-order Release
// cannot undo a Mark; guarded by claim_token so a slow-but-alive worker
// whose claim was reclaimed cannot delete the new holder's row.
func (q *Queries) ReleaseLarkInboundDedup(ctx context.Context, arg ReleaseLarkInboundDedupParams) (int64, error) {
result, err := q.db.Exec(ctx, releaseLarkInboundDedup, arg.InstallationID, arg.MessageID, arg.ClaimToken)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const releaseLarkWSLease = `-- name: ReleaseLarkWSLease :exec
UPDATE lark_installation
SET ws_lease_token = NULL,
ws_lease_expires_at = NULL,
updated_at = now()
WHERE id = $1
AND ws_lease_token = $2
`
type ReleaseLarkWSLeaseParams struct {
ID pgtype.UUID `json:"id"`
CurrentToken pgtype.Text `json:"current_token"`
}
// Drops the lease iff we're still the holder. A racing acquirer that
// already took over will not have its lease cleared.
func (q *Queries) ReleaseLarkWSLease(ctx context.Context, arg ReleaseLarkWSLeaseParams) error {
_, err := q.db.Exec(ctx, releaseLarkWSLease, arg.ID, arg.CurrentToken)
return err
}
const setLarkInstallationBotUnionID = `-- name: SetLarkInstallationBotUnionID :exec
UPDATE lark_installation
SET bot_union_id = $2,
updated_at = now()
WHERE id = $1
`
type SetLarkInstallationBotUnionIDParams struct {
ID pgtype.UUID `json:"id"`
BotUnionID pgtype.Text `json:"bot_union_id"`
}
// Operator-only backfill for installations created before the
// bot_union_id column existed (migration 112). Production reads do
// NOT use this — finishSuccess writes union_id during install, and
// the upsert path writes it on re-install. Kept as a focused single-
// column UPDATE so the backfill cannot accidentally overwrite app
// credentials, status, or lease state.
func (q *Queries) SetLarkInstallationBotUnionID(ctx context.Context, arg SetLarkInstallationBotUnionIDParams) error {
_, err := q.db.Exec(ctx, setLarkInstallationBotUnionID, arg.ID, arg.BotUnionID)
return err
}
const setLarkInstallationStatus = `-- name: SetLarkInstallationStatus :exec
UPDATE lark_installation
SET status = $2, updated_at = now()
WHERE id = $1
`
type SetLarkInstallationStatusParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
}
func (q *Queries) SetLarkInstallationStatus(ctx context.Context, arg SetLarkInstallationStatusParams) error {
_, err := q.db.Exec(ctx, setLarkInstallationStatus, arg.ID, arg.Status)
return err
}
const updateLarkOutboundCardStatus = `-- name: UpdateLarkOutboundCardStatus :exec
UPDATE lark_outbound_card_message
SET status = $2,
last_patched_at = now()
WHERE id = $1
`
type UpdateLarkOutboundCardStatusParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
}
func (q *Queries) UpdateLarkOutboundCardStatus(ctx context.Context, arg UpdateLarkOutboundCardStatusParams) error {
_, err := q.db.Exec(ctx, updateLarkOutboundCardStatus, arg.ID, arg.Status)
return err
}
const upsertLarkInstallation = `-- name: UpsertLarkInstallation :one
INSERT INTO lark_installation (
workspace_id, agent_id, app_id, app_secret_encrypted,
tenant_key, bot_open_id, bot_union_id, installer_user_id, region
) VALUES (
$1, $2, $3, $4, $7, $5, $8, $6, $9
)
ON CONFLICT (workspace_id, agent_id) DO UPDATE SET
app_id = EXCLUDED.app_id,
app_secret_encrypted = EXCLUDED.app_secret_encrypted,
tenant_key = EXCLUDED.tenant_key,
bot_open_id = EXCLUDED.bot_open_id,
bot_union_id = EXCLUDED.bot_union_id,
installer_user_id = EXCLUDED.installer_user_id,
region = EXCLUDED.region,
status = 'active',
installed_at = now(),
updated_at = now()
RETURNING id, workspace_id, agent_id, app_id, app_secret_encrypted, tenant_key, bot_open_id, installer_user_id, status, ws_lease_token, ws_lease_expires_at, installed_at, created_at, updated_at, bot_union_id, region
`
type UpsertLarkInstallationParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
AgentID pgtype.UUID `json:"agent_id"`
AppID string `json:"app_id"`
AppSecretEncrypted []byte `json:"app_secret_encrypted"`
BotOpenID string `json:"bot_open_id"`
InstallerUserID pgtype.UUID `json:"installer_user_id"`
TenantKey pgtype.Text `json:"tenant_key"`
BotUnionID pgtype.Text `json:"bot_union_id"`
Region string `json:"region"`
}
// Re-install path: a user who already bound this agent to Lark scans
// the QR again (e.g. they rotated their Lark app secret, or revoked +
// reinstalled). We refresh the app credentials, bot identity, and
// installer attribution, and force status back to 'active'. The WS
// lease is intentionally NOT reset here — the inbound hub owns lease
// lifecycle.
func (q *Queries) UpsertLarkInstallation(ctx context.Context, arg UpsertLarkInstallationParams) (LarkInstallation, error) {
row := q.db.QueryRow(ctx, upsertLarkInstallation,
arg.WorkspaceID,
arg.AgentID,
arg.AppID,
arg.AppSecretEncrypted,
arg.BotOpenID,
arg.InstallerUserID,
arg.TenantKey,
arg.BotUnionID,
arg.Region,
)
var i LarkInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.AppID,
&i.AppSecretEncrypted,
&i.TenantKey,
&i.BotOpenID,
&i.InstallerUserID,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
&i.BotUnionID,
&i.Region,
)
return i, err
}