Compare commits

...

2 Commits

Author SHA1 Message Date
J
37ca88be7e feat(channel): generalize lark_* tables into channel_* (DB layer)
Migration 123 creates channel_installation / channel_user_binding /
channel_chat_session_binding / channel_inbound_message_dedup /
channel_inbound_audit / channel_outbound_card_message /
channel_binding_token. Each carries a channel_type discriminator and a
JSONB config for platform-specific identifiers/credentials; cross-platform
columns stay flat. Existing Feishu rows are backfilled (channel_type=
'feishu', app_secret_encrypted via base64). NO foreign keys / cascades
(MUL-3515 §4) — integrity moves to the app layer in the cutover.

queries/channel.sql ports the lark query surface to channel_*, JSONB-aware,
plus DeleteChannelUserBindingsByWorkspaceMember /
DeleteChannelChatSessionBindingBySession for the app-layer cleanup that
replaces the removed cascades.

lark_* tables/queries are left in place here and removed once the Go
cutover lands, so this commit ships green on its own.

Verified: sqlc generate, go build ./..., full migrate chain (1..123) on
Postgres 17, and a real-data backfill spot-check (base64 round-trip,
NULL-strip, functional unique index on (channel_type, app_id)).

MUL-3515

Co-authored-by: multica-agent <github@multica.ai>
2026-06-22 18:21:06 +08:00
J
5374c21a99 feat(integrations): add platform-agnostic channel foundation
Introduce server/internal/integrations/channel — the contract every
inbound IM integration implements, so the core never learns a platform's
event JSON. Four pieces:

- Channel interface (Type/Connect/Disconnect/Send/Capabilities) + Factory
  + Config (channel_type + opaque JSON blob, maps to channel_installation).
- Normalized InboundMessage/OutboundMessage envelopes + Source/MediaRef/
  ReplyCtx/MsgType/ChatType. Envelope holds only cross-platform-true
  fields; platform specifics live in Raw, read only by the adapter.
- Capability bitmask: declaration only, no degrade logic in core.
- Registry: Type->Factory map, last-writer-wins, concurrency-safe.

Pure package (no DB/network/platform deps). Foundation for MUL-3515; the
lark cutover + lark_*->channel_* generalization land in follow-up PRs.

MUL-3515

Co-authored-by: multica-agent <github@multica.ai>
2026-06-22 17:54:06 +08:00
13 changed files with 2397 additions and 0 deletions

View File

@@ -0,0 +1,94 @@
package channel
import "strings"
// Capability is a bitmask a Channel uses to DECLARE what it supports. It
// is declaration only: this package contains no degrade logic. A caller
// that wants to degrade output (rich card → plain text when CapRichCard
// is absent) reads the bitmask and decides for itself, so adding a new
// platform never forces a branch into the core. Capabilities() returns a
// Channel's fixed set; the zero value declares nothing.
type Capability uint64
const (
// CapText — can deliver a plain text message. Every Channel is
// expected to declare at least this.
CapText Capability = 1 << iota
// CapRichCard — can render a rich / interactive card (Lark
// interactive card, Slack Block Kit, …).
CapRichCard
// CapThreadReply — can post a reply into a thread / topic.
CapThreadReply
// CapQuoteReply — can quote-reply to a specific message.
CapQuoteReply
// CapAttachment — can send and/or receive media attachments.
CapAttachment
// CapVoice — can handle voice / audio messages.
CapVoice
// CapTypingIndicator — can show a typing / "thinking" indicator.
CapTypingIndicator
// CapMessageEdit — can edit a message after it was sent (Lark card
// patch, Slack chat.update, …).
CapMessageEdit
)
// capabilityNames maps single-bit capabilities to a stable, lower-case
// name for String(). Order matches the bit order above so String() reads
// least-significant-bit first.
var capabilityNames = []struct {
bit Capability
name string
}{
{CapText, "text"},
{CapRichCard, "rich_card"},
{CapThreadReply, "thread_reply"},
{CapQuoteReply, "quote_reply"},
{CapAttachment, "attachment"},
{CapVoice, "voice"},
{CapTypingIndicator, "typing_indicator"},
{CapMessageEdit, "message_edit"},
}
// Has reports whether c declares every capability in want. Has(0) is
// true (the empty requirement is always satisfied). Because want may be a
// combination of bits, this is an "includes all of" test, not "any of".
func (c Capability) Has(want Capability) bool {
return c&want == want
}
// String renders the set bits as a "|"-joined list of names ("text|
// thread_reply"), "none" for the zero value, and appends any unknown
// high bits as a hex remainder so a forgotten name never silently
// vanishes from logs. It is for diagnostics only.
func (c Capability) String() string {
if c == 0 {
return "none"
}
var (
parts []string
remaining = c
)
for _, cn := range capabilityNames {
if remaining&cn.bit == cn.bit {
parts = append(parts, cn.name)
remaining &^= cn.bit
}
}
if remaining != 0 {
parts = append(parts, "0x"+strings.TrimLeft(hex(uint64(remaining)), "0"))
}
return strings.Join(parts, "|")
}
// hex formats v as a fixed lower-case hex string without importing fmt
// (keeps this leaf file dependency-free). Only used by String() for the
// rare unknown-bit remainder.
func hex(v uint64) string {
const digits = "0123456789abcdef"
var buf [16]byte
for i := 15; i >= 0; i-- {
buf[i] = digits[v&0xf]
v >>= 4
}
return string(buf[:])
}

View File

@@ -0,0 +1,67 @@
package channel
import "testing"
func TestCapability_Has(t *testing.T) {
c := CapText | CapThreadReply
if !c.Has(CapText) {
t.Errorf("Has(CapText) = false, want true")
}
if !c.Has(CapText | CapThreadReply) {
t.Errorf("Has(text|thread) = false, want true (includes-all)")
}
if c.Has(CapRichCard) {
t.Errorf("Has(CapRichCard) = true, want false")
}
if c.Has(CapText | CapRichCard) {
t.Errorf("Has(text|rich) = true, want false (one bit missing)")
}
// The empty requirement is always satisfied.
if !c.Has(0) {
t.Errorf("Has(0) = false, want true")
}
}
func TestCapability_String(t *testing.T) {
tests := []struct {
name string
cap Capability
want string
}{
{"zero", 0, "none"},
{"single", CapText, "text"},
{"ordered_lsb_first", CapThreadReply | CapText, "text|thread_reply"},
{"all_named", CapMessageEdit, "message_edit"},
{"unknown_high_bit", CapText | (1 << 40), "text|0x10000000000"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.cap.String(); got != tt.want {
t.Errorf("Capability(%#x).String() = %q, want %q", uint64(tt.cap), got, tt.want)
}
})
}
}
// TestCapability_BitsDistinct guards against two constants accidentally
// sharing a bit after an edit to the iota block.
func TestCapability_BitsDistinct(t *testing.T) {
all := []Capability{
CapText, CapRichCard, CapThreadReply, CapQuoteReply,
CapAttachment, CapVoice, CapTypingIndicator, CapMessageEdit,
}
var seen Capability
for i, c := range all {
if c == 0 {
t.Fatalf("capability index %d is zero", i)
}
if c&(c-1) != 0 {
t.Fatalf("capability index %d (%#x) is not a single bit", i, uint64(c))
}
if seen&c != 0 {
t.Fatalf("capability index %d (%#x) overlaps an earlier bit", i, uint64(c))
}
seen |= c
}
}

View File

@@ -0,0 +1,81 @@
package channel
import (
"context"
"encoding/json"
)
// Type identifies an inbound channel platform — the discriminator the
// Registry keys on and the value persisted in the channel_type column of
// the generalized channel_* tables. Use the lower-case platform slug
// ("feishu", "slack", "wecom", …); keep it stable, it is durable data.
type Type string
const (
// TypeFeishu is the Feishu / Lark adapter — the only implementation
// in phase 1. It serves both the mainland Feishu cloud and the Lark
// international cloud; the cloud (region) is per-installation config,
// not a separate Type.
TypeFeishu Type = "feishu"
)
// Channel is the platform-agnostic contract every IM integration
// implements. An adapter keeps ALL platform specifics behind these five
// methods: the core supervisor calls Connect/Disconnect to manage the
// link, Send to deliver an outbound reply, and reads Capabilities to
// decide how to render; it never touches platform SDKs or wire formats.
//
// Inbound is intentionally NOT on this interface. A Channel pushes
// normalized InboundMessage values into the core router via the wiring
// established at construction (the adapter owns its receive loop); the
// core does not poll the Channel for messages.
type Channel interface {
// Type reports the platform discriminator. It MUST equal the Type
// the Channel was registered under, and is stable for the lifetime
// of the instance.
Type() Type
// Connect establishes the platform link (e.g. dials the outbound
// WebSocket long-conn, or starts the inbound HTTP listener). The
// connection mode is the implementation's choice and invisible to
// the core. Connect blocks only until the link is established (or
// fails); the ongoing receive loop runs in the background and is
// torn down by Disconnect or by ctx cancellation.
Connect(ctx context.Context) error
// Disconnect tears the platform link down and releases its
// resources. It is safe to call after a failed Connect and safe to
// call more than once; a Channel that is already disconnected
// returns nil.
Disconnect(ctx context.Context) error
// Send delivers a single outbound message and returns the platform's
// identifier for the delivered message. A non-nil error is reserved
// for real delivery failures (network, auth, rate limit) that the
// caller may retry.
Send(ctx context.Context, out OutboundMessage) (SendResult, error)
// Capabilities declares what this Channel supports. It is a pure
// declaration with no side effects and a stable result; callers read
// it to choose a rendering and degrade on their own (this package
// performs no degradation — see the Capability docs).
Capabilities() Capability
}
// Config is the normalized per-installation configuration a Factory
// consumes. Type is the platform discriminator; Raw is the platform's
// own credential/config blob (Feishu's app_id / encrypted app_secret /
// tenant_key / region, Slack's bot/app tokens, …), carried opaquely so
// the foundation never grows a per-platform field. It maps directly onto
// the channel_type column + JSONB config of a channel_installation row
// (MUL-3515 decision §3).
type Config struct {
Type Type
Raw json.RawMessage
}
// Factory builds a Channel from its per-installation Config. Each adapter
// registers exactly one Factory under its Type; the Registry calls it to
// instantiate a per-installation Channel. A Factory should validate Raw
// and return an error rather than a half-built Channel.
type Factory func(cfg Config) (Channel, error)

View File

@@ -0,0 +1,45 @@
// Package channel is the platform-agnostic foundation for Multica's
// inbound IM integrations (Feishu/Lark, Slack, WeCom, …). It owns the
// contract every integration implements so the core never learns what a
// given platform's event JSON looks like — design tracked in MUL-3506,
// phase-1 foundation in MUL-3515.
//
// The contract has four pieces:
//
// 1. Channel — the per-integration interface
// (Type / Connect / Disconnect / Send / Capabilities). An adapter
// translates platform payloads in both directions and owns its own
// connection mode (outbound WebSocket long-conn, inbound HTTP, …);
// the core only calls these five methods.
//
// 2. InboundMessage / OutboundMessage — the normalized message
// envelopes. Every platform's inbound payload is translated by its
// adapter into one InboundMessage; the core routes, dedups, and
// persists only this struct. Outbound is the minimal text reply
// (ChatID + Text + optional thread / reply target) — rich cards,
// media, and outbound webhooks are deliberately out of scope here
// and stay inside the adapter that supports them.
//
// 3. Capability — a bitmask each Channel uses to DECLARE what it can
// do (rich cards, threads, attachments, …). This package only
// models the declaration; it intentionally contains no degrade
// logic. Callers that want to degrade (rich card → plain text)
// read the bitmask and decide for themselves, so adding a platform
// never forces an if/else into the core.
//
// 4. Registry — a Type→Factory map with last-writer-wins semantics.
// Adding a platform is "register a factory", not "edit the core".
//
// Boundary rule (MUL-3515 decision §2): the envelope holds ONLY fields
// that are true across every platform — Text, a normalized message-type
// enum, media references, the reply/thread anchors, the routing Source,
// and the event/message ids used for dedup. Anything platform-specific
// (a Lark raw msg_type, parent_id, root_id, …) lives in Raw and is read
// ONLY by the adapter that produced it. The core never reads Raw.
//
// This package is pure: it has no database, network, or platform
// dependencies, and nothing in it imports another integration package.
// The concrete Feishu/Lark adapter, the DB-backed installation/identity/
// session resolvers, and the supervisor that drives Connect/Disconnect
// are wired in the follow-up cutover (see MUL-3515).
package channel

View File

@@ -0,0 +1,175 @@
package channel
import "encoding/json"
// ChatType discriminates a 1:1 direct conversation with the bot from a
// multi-party group chat. Product behavior differs: direct chats ingest
// every message; group chats only ingest messages explicitly addressed
// to the bot (@-mention or reply to a bot message). The wire values match
// the existing lark_chat_session_binding.lark_chat_type constraint so the
// generalized channel_* table backfills 1:1.
type ChatType string
const (
// ChatTypeP2P is a direct (peer-to-peer) conversation with the bot.
ChatTypeP2P ChatType = "p2p"
// ChatTypeGroup is a multi-party group conversation.
ChatTypeGroup ChatType = "group"
)
// MsgType is the normalized, cross-platform message kind. Adapters map
// their platform's native type onto this small closed set; the platform's
// raw type string (Lark "post" / "merge_forward" / "interactive", …) is
// NOT represented here — it stays in InboundMessage.Raw and is read only
// by the adapter. The core only ever needs to know "text vs media, and
// which media".
type MsgType string
const (
// MsgTypeText is a plain or rich text message. The human-readable
// content is flattened into InboundMessage.Text by the adapter.
MsgTypeText MsgType = "text"
// MsgTypeImage is an image attachment.
MsgTypeImage MsgType = "image"
// MsgTypeFile is a generic file attachment.
MsgTypeFile MsgType = "file"
// MsgTypeAudio is a voice / audio attachment.
MsgTypeAudio MsgType = "audio"
// MsgTypeVideo is a video attachment.
MsgTypeVideo MsgType = "video"
// MsgTypeUnknown is the fallback for a platform type the adapter does
// not map. The core treats it as a non-text, non-actionable message.
MsgTypeUnknown MsgType = "unknown"
)
// Source carries the cross-platform routing identity of an inbound
// message — every field here is true on every platform. Platform-specific
// routing keys (a Lark app_id, a Slack team id) are resolved to an
// installation by the adapter and do NOT appear on Source.
type Source struct {
// ChannelType is the platform the message arrived on; it equals the
// owning Channel's Type.
ChannelType Type
// ChatID is the platform conversation identifier. One ChatID maps to
// one Multica chat_session via the channel_chat_session_binding.
ChatID string
// ChatType discriminates direct from group conversations.
ChatType ChatType
// SenderID is the platform-native, per-installation user identifier
// (Lark open_id, Slack user id, …). It is stable WITHIN one
// installation and is the key the identity binding is stored under.
// It is NOT comparable across installations.
SenderID string
// SenderStableID is the platform's cross-installation stable identity
// for the sender when one exists (Lark union_id, …), otherwise empty.
// Captured opportunistically for future cross-installation identity
// merging; the core treats an empty value as "not available".
SenderStableID string
// ThreadID is the platform thread / topic the message belongs to,
// when threading applies and the message is inside a thread. Empty
// means a top-level conversation message. The core persists it so a
// decoupled outbound reply can be threaded back into the same topic.
ThreadID string
}
// MediaRef references a media attachment that the adapter has ALREADY
// persisted to object storage before the message reaches the core. The
// core never holds raw bytes — only this reference — so the envelope
// stays small and platform-neutral.
type MediaRef struct {
// Type is the normalized media kind (image / file / audio / video).
Type MsgType
// StorageKey locates the persisted object in Multica object storage.
StorageKey string
// Filename is the original display name, when the platform supplies
// one.
Filename string
// MimeType is the content type, when known.
MimeType string
// SizeBytes is the object size in bytes, or 0 when unknown.
SizeBytes int64
}
// ReplyCtx describes the message an inbound message quotes / replies to.
// It is nil when the inbound message is not a reply.
type ReplyCtx struct {
// MessageID is the immediate parent message's platform id (the
// message being quoted).
MessageID string
// RootID is the thread/root anchor the platform reports, when any.
RootID string
}
// InboundMessage is the single normalized shape the core consumes. Every
// adapter translates its platform's raw payload into this struct; the
// core's router, dedup, identity check, and persistence read ONLY these
// fields. Per the boundary rule (MUL-3515 §2) the struct holds only
// cross-platform-true fields; everything platform-specific lives in Raw.
type InboundMessage struct {
// EventID is the platform's delivery/event identifier and MessageID
// is the platform's message identifier. Together they back the
// idempotency layer: a platform may redeliver the same event on
// reconnect, and dedup keys on (installation, MessageID).
EventID string
MessageID string
// Source is the routing identity (chat, sender, thread).
Source Source
// Type is the normalized message kind.
Type MsgType
// Text is the human-readable content, flattened by the adapter. For
// non-text messages it may be empty or a short placeholder; the media
// itself is in MediaRefs.
Text string
// MediaRefs are the attachments, already persisted to object storage.
MediaRefs []MediaRef
// ReplyTo is the quoted/replied-to context, or nil.
ReplyTo *ReplyCtx
// AddressedToBot is the adapter's normalized verdict on whether a
// GROUP message is an interaction with the bot (@-mention or reply to
// a bot message). It is meaningless for direct (p2p) chats and the
// core ignores it there. It is a normalized boolean, not platform
// data — the platform-specific signals it was derived from (mention
// arrays, parent ids) stay in Raw.
AddressedToBot bool
// Raw is the untouched platform payload. Adapters stash platform-
// specific fields here (Lark raw msg_type / parent_id / root_id /
// mention arrays, …) and read them back only inside the adapter. The
// core never reads Raw — that is the whole point of the boundary.
Raw json.RawMessage
}
// OutboundMessage is the minimal outbound reply the core can ask any
// Channel to deliver: a text body into a chat, optionally threaded or
// quoting a specific message. Rich cards, media uploads, and outbound
// webhooks are deliberately NOT modeled here (MUL-3515 decision §6) — an
// adapter that supports richer output exposes it on its own type, not on
// this cross-platform envelope.
type OutboundMessage struct {
// ChatID is the destination conversation (the platform chat id).
ChatID string
// Text is the message body.
Text string
// ThreadID, when set, threads the reply into the given platform
// thread / topic. Empty sends at the chat level.
ThreadID string
// ReplyTo, when set, quote-replies to the given platform message id.
ReplyTo string
}
// SendResult is the outcome of Channel.Send.
type SendResult struct {
// MessageID is the platform's identifier for the delivered message.
MessageID string
}

View File

@@ -0,0 +1,82 @@
package channel
import (
"encoding/json"
"testing"
)
// TestInboundMessage_RawIsOpaque proves the boundary contract: a
// platform-specific payload survives a JSON round-trip through Raw
// byte-for-byte, so an adapter can stash anything there and read it back
// while the core leaves it untouched.
func TestInboundMessage_RawIsOpaque(t *testing.T) {
raw := json.RawMessage(`{"msg_type":"merge_forward","parent_id":"om_x","mentions":[{"id":"ou_1"}]}`)
in := InboundMessage{
EventID: "evt_1",
MessageID: "om_1",
Source: Source{
ChannelType: TypeFeishu,
ChatID: "oc_1",
ChatType: ChatTypeGroup,
SenderID: "ou_sender",
SenderStableID: "on_union",
ThreadID: "omt_1",
},
Type: MsgTypeText,
Text: "hello",
AddressedToBot: true,
ReplyTo: &ReplyCtx{MessageID: "om_parent", RootID: "om_root"},
Raw: raw,
}
encoded, err := json.Marshal(in)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
var got InboundMessage
if err := json.Unmarshal(encoded, &got); err != nil {
t.Fatalf("Unmarshal: %v", err)
}
if got.Source != in.Source {
t.Errorf("Source round-trip mismatch:\n got %+v\nwant %+v", got.Source, in.Source)
}
if got.ReplyTo == nil || *got.ReplyTo != *in.ReplyTo {
t.Errorf("ReplyTo round-trip mismatch: got %+v", got.ReplyTo)
}
if !got.AddressedToBot {
t.Errorf("AddressedToBot lost in round-trip")
}
// Raw must be semantically identical (compare as parsed JSON to be
// whitespace-insensitive).
var a, b any
if err := json.Unmarshal(got.Raw, &a); err != nil {
t.Fatalf("Raw not valid JSON after round-trip: %v", err)
}
if err := json.Unmarshal(raw, &b); err != nil {
t.Fatalf("seed Raw not valid JSON: %v", err)
}
if string(mustMarshal(t, a)) != string(mustMarshal(t, b)) {
t.Errorf("Raw payload changed across round-trip:\n got %s\nwant %s", got.Raw, raw)
}
}
func mustMarshal(t *testing.T, v any) []byte {
t.Helper()
b, err := json.Marshal(v)
if err != nil {
t.Fatalf("Marshal: %v", err)
}
return b
}
// TestOutboundMessage_Minimal documents that the outbound envelope is the
// minimal text reply (decision §6): a text body, optionally threaded or
// quoting, and nothing card/media-shaped.
func TestOutboundMessage_Minimal(t *testing.T) {
out := OutboundMessage{ChatID: "oc_1", Text: "hi", ThreadID: "omt_1", ReplyTo: "om_1"}
if out.ChatID == "" || out.Text == "" {
t.Fatalf("ChatID and Text are the required fields")
}
}

View File

@@ -0,0 +1,77 @@
package channel
import (
"fmt"
"sort"
"sync"
)
// ErrUnknownType is returned by Registry.Build when no Factory is
// registered for the requested Type. Callers can test for it with
// errors.Is.
var ErrUnknownType = fmt.Errorf("channel: no factory registered for type")
// Registry maps a channel Type to the Factory that builds it. Adding a
// platform is "register a factory here", never "edit the core". The
// Registry is safe for concurrent use.
//
// Registration is last-writer-wins: registering a Type that already has a
// Factory replaces it silently. This mirrors the plugin-registry pattern
// from the reference design (MUL-3506) where the last adapter to register
// a type wins, so a deployment can override a built-in adapter by
// registering its own afterwards without a removal step.
type Registry struct {
mu sync.RWMutex
factories map[Type]Factory
}
// NewRegistry returns an empty Registry ready for use.
func NewRegistry() *Registry {
return &Registry{factories: make(map[Type]Factory)}
}
// Register binds factory to t, replacing any factory previously
// registered for t (last-writer-wins). A nil factory or an empty Type is
// ignored — registering either would only set up a guaranteed failure at
// Build time, so the Registry refuses to record it.
func (r *Registry) Register(t Type, factory Factory) {
if t == "" || factory == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
r.factories[t] = factory
}
// Lookup returns the Factory registered for t and whether one exists.
func (r *Registry) Lookup(t Type) (Factory, bool) {
r.mu.RLock()
defer r.mu.RUnlock()
factory, ok := r.factories[t]
return factory, ok
}
// Build instantiates a Channel for cfg.Type using the registered Factory.
// It returns ErrUnknownType (wrapped, with the type name) when no Factory
// is registered, and otherwise returns whatever the Factory returns.
func (r *Registry) Build(cfg Config) (Channel, error) {
factory, ok := r.Lookup(cfg.Type)
if !ok {
return nil, fmt.Errorf("%w: %q", ErrUnknownType, cfg.Type)
}
return factory(cfg)
}
// Types returns the registered types sorted lexicographically, so the
// result is stable across calls (map iteration order is not). Useful for
// diagnostics and for enumerating which platforms a deployment supports.
func (r *Registry) Types() []Type {
r.mu.RLock()
defer r.mu.RUnlock()
out := make([]Type, 0, len(r.factories))
for t := range r.factories {
out = append(out, t)
}
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
return out
}

View File

@@ -0,0 +1,130 @@
package channel
import (
"context"
"errors"
"reflect"
"sync"
"testing"
)
// fakeChannel is a minimal Channel used to assert which Factory the
// Registry returned. id distinguishes instances built by different
// factories.
type fakeChannel struct {
typ Type
id string
}
func (f fakeChannel) Type() Type { return f.typ }
func (f fakeChannel) Connect(context.Context) error { return nil }
func (f fakeChannel) Disconnect(context.Context) error { return nil }
func (f fakeChannel) Send(context.Context, OutboundMessage) (SendResult, error) {
return SendResult{}, nil
}
func (f fakeChannel) Capabilities() Capability { return CapText }
func factoryReturning(typ Type, id string) Factory {
return func(Config) (Channel, error) { return fakeChannel{typ: typ, id: id}, nil }
}
func TestRegistry_LookupAndBuild(t *testing.T) {
r := NewRegistry()
r.Register(TypeFeishu, factoryReturning(TypeFeishu, "feishu-a"))
if _, ok := r.Lookup(TypeFeishu); !ok {
t.Fatalf("Lookup(%q) ok = false, want true", TypeFeishu)
}
ch, err := r.Build(Config{Type: TypeFeishu})
if err != nil {
t.Fatalf("Build returned error: %v", err)
}
got, ok := ch.(fakeChannel)
if !ok {
t.Fatalf("Build returned %T, want fakeChannel", ch)
}
if got.id != "feishu-a" {
t.Fatalf("Build used factory id %q, want %q", got.id, "feishu-a")
}
if got.Type() != TypeFeishu {
t.Fatalf("built channel Type() = %q, want %q", got.Type(), TypeFeishu)
}
}
// TestRegistry_LastWriterWins is the explicit acceptance item: a second
// Register for the same Type silently replaces the first, and Build/Lookup
// resolve to the LATEST factory.
func TestRegistry_LastWriterWins(t *testing.T) {
r := NewRegistry()
r.Register(TypeFeishu, factoryReturning(TypeFeishu, "first"))
r.Register(TypeFeishu, factoryReturning(TypeFeishu, "second"))
ch, err := r.Build(Config{Type: TypeFeishu})
if err != nil {
t.Fatalf("Build returned error: %v", err)
}
if got := ch.(fakeChannel).id; got != "second" {
t.Fatalf("last-writer-wins failed: Build used %q, want %q", got, "second")
}
// Registering a type must not create duplicate Types() entries.
if types := r.Types(); !reflect.DeepEqual(types, []Type{TypeFeishu}) {
t.Fatalf("Types() = %v, want exactly [%q]", types, TypeFeishu)
}
}
func TestRegistry_BuildUnknownType(t *testing.T) {
r := NewRegistry()
_, err := r.Build(Config{Type: "nope"})
if !errors.Is(err, ErrUnknownType) {
t.Fatalf("Build unknown type err = %v, want errors.Is ErrUnknownType", err)
}
}
func TestRegistry_RegisterIgnoresInvalid(t *testing.T) {
r := NewRegistry()
r.Register("", factoryReturning(TypeFeishu, "x")) // empty type ignored
r.Register(TypeFeishu, nil) // nil factory ignored
if _, ok := r.Lookup(""); ok {
t.Fatalf("empty Type was registered, want ignored")
}
if _, ok := r.Lookup(TypeFeishu); ok {
t.Fatalf("nil factory was registered, want ignored")
}
if len(r.Types()) != 0 {
t.Fatalf("Types() = %v, want empty", r.Types())
}
}
func TestRegistry_TypesSorted(t *testing.T) {
r := NewRegistry()
r.Register("wecom", factoryReturning("wecom", "w"))
r.Register("feishu", factoryReturning("feishu", "f"))
r.Register("slack", factoryReturning("slack", "s"))
got := r.Types()
want := []Type{"feishu", "slack", "wecom"}
if !reflect.DeepEqual(got, want) {
t.Fatalf("Types() = %v, want sorted %v", got, want)
}
}
// TestRegistry_ConcurrentAccess exercises the RWMutex under the race
// detector: concurrent Register/Lookup/Build/Types must not data-race.
func TestRegistry_ConcurrentAccess(t *testing.T) {
r := NewRegistry()
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
r.Register(TypeFeishu, factoryReturning(TypeFeishu, "x"))
_, _ = r.Lookup(TypeFeishu)
_, _ = r.Build(Config{Type: TypeFeishu})
_ = r.Types()
}()
}
wg.Wait()
}

View File

@@ -0,0 +1,10 @@
-- Reverse 123_channel_generalization.up.sql. The lark_* tables were left
-- in place by the up migration, so rolling back only needs to drop the
-- channel_* tables that were added.
DROP TABLE IF EXISTS channel_binding_token;
DROP TABLE IF EXISTS channel_outbound_card_message;
DROP TABLE IF EXISTS channel_inbound_audit;
DROP TABLE IF EXISTS channel_inbound_message_dedup;
DROP TABLE IF EXISTS channel_chat_session_binding;
DROP TABLE IF EXISTS channel_user_binding;
DROP TABLE IF EXISTS channel_installation;

View File

@@ -0,0 +1,267 @@
-- Generalize the Feishu/Lark-specific integration tables into
-- platform-agnostic channel_* tables (MUL-3515, parent MUL-3506). Each
-- lark_* table gains a `channel_type` discriminator and moves its
-- platform-specific identifiers/config into a JSONB `config` column; the
-- cross-platform columns stay flat. Existing Feishu rows are backfilled
-- with channel_type='feishu'.
--
-- Two hard rules from the design:
--
-- * NO foreign keys and NO cascades (MUL-3515 §4). The lark_* tables
-- leaned on composite FKs to enforce "a binding's workspace matches
-- its installation" and "a binding dies when workspace membership is
-- revoked / a chat_session is deleted". Those integrity rules now
-- live in the application layer (the cutover PR adds the membership
-- check + cleanup). The columns are kept so the app can still join,
-- but the database enforces nothing.
--
-- * The lark_* tables are NOT dropped here — that happens in a later
-- migration once the Go cutover has landed, so this migration can
-- ship green on its own. This migration only ADDS channel_* and
-- copies the data forward.
--
-- app_secret_encrypted is BYTEA; it is carried into the JSONB config as a
-- base64 string (encode(...,'base64')). Go's encoding/json decodes a
-- base64 string straight back into a []byte field, so the round-trip is
-- symmetric and the ciphertext is never stored in plaintext.
-- =====================
-- channel_installation
-- =====================
CREATE TABLE channel_installation (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL,
agent_id UUID NOT NULL,
channel_type TEXT NOT NULL,
-- Platform-specific identifiers/config. For feishu:
-- app_id, app_secret_encrypted (base64), tenant_key, bot_open_id,
-- bot_union_id, region.
config JSONB NOT NULL DEFAULT '{}'::jsonb,
status TEXT NOT NULL DEFAULT 'active'
CHECK (status IN ('active', 'revoked')),
ws_lease_token TEXT,
ws_lease_expires_at TIMESTAMPTZ,
installer_user_id UUID NOT NULL,
installed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (workspace_id, agent_id)
);
CREATE INDEX idx_channel_installation_workspace ON channel_installation(workspace_id);
CREATE INDEX idx_channel_installation_agent ON channel_installation(agent_id);
CREATE INDEX idx_channel_installation_lease ON channel_installation(ws_lease_expires_at)
WHERE status = 'active';
-- Routing key. Inbound events carry only the platform app identifier
-- (Feishu app_id); the dispatcher routes on (channel_type, app_id). The
-- functional unique index replaces the old global UNIQUE(app_id) and is
-- scoped per channel_type. Rows without an app_id (a future channel that
-- routes differently) store JSON null here, and Postgres allows many
-- NULLs in a unique index, so they do not collide.
CREATE UNIQUE INDEX idx_channel_installation_type_appid
ON channel_installation(channel_type, (config ->> 'app_id'));
INSERT INTO channel_installation (
id, workspace_id, agent_id, channel_type, config, status,
ws_lease_token, ws_lease_expires_at, installer_user_id,
installed_at, created_at, updated_at
)
SELECT
id, workspace_id, agent_id, 'feishu',
jsonb_strip_nulls(jsonb_build_object(
'app_id', app_id,
'app_secret_encrypted', encode(app_secret_encrypted, 'base64'),
'tenant_key', tenant_key,
'bot_open_id', bot_open_id,
'bot_union_id', bot_union_id,
'region', region
)),
status, ws_lease_token, ws_lease_expires_at, installer_user_id,
installed_at, created_at, updated_at
FROM lark_installation;
-- =====================
-- channel_user_binding
-- =====================
-- channel_user_id is the platform-native, per-installation user id
-- (Feishu open_id). union_id and any other secondary identity goes in
-- config. The member-FK that used to make a row's existence proof of
-- workspace membership is gone; the cutover PR validates membership in
-- the identity check and prunes bindings on member removal.
CREATE TABLE channel_user_binding (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workspace_id UUID NOT NULL,
multica_user_id UUID NOT NULL,
installation_id UUID NOT NULL,
channel_type TEXT NOT NULL,
channel_user_id TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}'::jsonb,
bound_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (installation_id, channel_user_id)
);
CREATE INDEX idx_channel_user_binding_user
ON channel_user_binding(multica_user_id, workspace_id);
CREATE INDEX idx_channel_user_binding_workspace_user
ON channel_user_binding(workspace_id, channel_user_id);
INSERT INTO channel_user_binding (
id, workspace_id, multica_user_id, installation_id,
channel_type, channel_user_id, config, bound_at
)
SELECT
id, workspace_id, multica_user_id, installation_id,
'feishu', lark_open_id,
jsonb_strip_nulls(jsonb_build_object('union_id', union_id)),
bound_at
FROM lark_user_binding;
-- =====================
-- channel_chat_session_binding
-- =====================
CREATE TABLE channel_chat_session_binding (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
chat_session_id UUID NOT NULL,
installation_id UUID NOT NULL,
channel_type TEXT NOT NULL,
channel_chat_id TEXT NOT NULL,
chat_type TEXT NOT NULL
CHECK (chat_type IN ('p2p', 'group')),
-- Most-recent inbound trigger, so the decoupled outbound patcher can
-- thread its reply back into the originating topic. Nullable; a NULL
-- thread id keeps the chat-level send path.
last_message_id TEXT,
last_thread_id TEXT,
config JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (installation_id, channel_chat_id),
UNIQUE (chat_session_id)
);
CREATE INDEX idx_channel_chat_session_binding_session
ON channel_chat_session_binding(chat_session_id);
INSERT INTO channel_chat_session_binding (
id, chat_session_id, installation_id, channel_type,
channel_chat_id, chat_type, last_message_id, last_thread_id, created_at
)
SELECT
id, chat_session_id, installation_id, 'feishu',
lark_chat_id, lark_chat_type, last_lark_message_id, last_lark_thread_id, created_at
FROM lark_chat_session_binding;
-- =====================
-- channel_inbound_message_dedup
-- =====================
-- Two-phase idempotency with owner fencing, unchanged in shape from
-- lark_inbound_message_dedup (keyed per installation + message). Transient
-- 24h cache; copied forward for completeness.
CREATE TABLE channel_inbound_message_dedup (
installation_id UUID NOT NULL,
message_id TEXT NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
processed_at TIMESTAMPTZ,
claim_token UUID NOT NULL DEFAULT gen_random_uuid(),
PRIMARY KEY (installation_id, message_id)
);
CREATE INDEX idx_channel_inbound_dedup_received
ON channel_inbound_message_dedup(received_at);
INSERT INTO channel_inbound_message_dedup (
installation_id, message_id, received_at, processed_at, claim_token
)
SELECT installation_id, message_id, received_at, processed_at, claim_token
FROM lark_inbound_message_dedup;
-- =====================
-- channel_inbound_audit
-- =====================
-- Non-content drop audit. installation_id is nullable (the old ON DELETE
-- SET NULL is now just a nullable column the app may leave NULL).
CREATE TABLE channel_inbound_audit (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
installation_id UUID,
channel_type TEXT NOT NULL,
channel_chat_id TEXT,
event_type TEXT NOT NULL,
channel_event_id TEXT,
channel_message_id TEXT,
drop_reason TEXT NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE INDEX idx_channel_inbound_audit_installation
ON channel_inbound_audit(installation_id, received_at DESC);
CREATE INDEX idx_channel_inbound_audit_reason
ON channel_inbound_audit(drop_reason, received_at DESC);
INSERT INTO channel_inbound_audit (
id, installation_id, channel_type, channel_chat_id, event_type,
channel_event_id, channel_message_id, drop_reason, received_at
)
SELECT
id, installation_id, 'feishu', lark_chat_id, event_type,
lark_event_id, lark_message_id, drop_reason, received_at
FROM lark_inbound_audit;
-- =====================
-- channel_outbound_card_message
-- =====================
CREATE TABLE channel_outbound_card_message (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
chat_session_id UUID NOT NULL,
task_id UUID,
channel_type TEXT NOT NULL,
channel_chat_id TEXT NOT NULL,
channel_card_message_id TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'streaming', 'final', 'error')),
last_patched_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX idx_channel_outbound_card_task
ON channel_outbound_card_message(task_id)
WHERE task_id IS NOT NULL;
CREATE INDEX idx_channel_outbound_card_session
ON channel_outbound_card_message(chat_session_id, created_at DESC);
INSERT INTO channel_outbound_card_message (
id, chat_session_id, task_id, channel_type, channel_chat_id,
channel_card_message_id, status, last_patched_at, created_at
)
SELECT
id, chat_session_id, task_id, 'feishu', lark_chat_id,
lark_card_message_id, status, last_patched_at, created_at
FROM lark_outbound_card_message;
-- =====================
-- channel_binding_token
-- =====================
CREATE TABLE channel_binding_token (
token_hash TEXT PRIMARY KEY,
workspace_id UUID NOT NULL,
installation_id UUID NOT NULL,
channel_type TEXT NOT NULL,
channel_user_id TEXT NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
consumed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
-- Keep the product TTL cap in lockstep with channel.BindingTokenTTL
-- (15 minutes), same as the old lark_binding_token CHECK.
CONSTRAINT channel_binding_token_ttl_cap
CHECK (expires_at <= created_at + INTERVAL '15 minutes')
);
CREATE INDEX idx_channel_binding_token_installation
ON channel_binding_token(installation_id, expires_at);
INSERT INTO channel_binding_token (
token_hash, workspace_id, installation_id, channel_type,
channel_user_id, expires_at, consumed_at, created_at
)
SELECT
token_hash, workspace_id, installation_id, 'feishu',
lark_open_id, expires_at, consumed_at, created_at
FROM lark_binding_token;

View File

@@ -0,0 +1,970 @@
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.31.1
// source: channel.sql
package db
import (
"context"
"github.com/jackc/pgx/v5/pgtype"
)
const acquireChannelWSLease = `-- name: AcquireChannelWSLease :one
UPDATE channel_installation
SET ws_lease_token = $1,
ws_lease_expires_at = $2,
updated_at = now()
WHERE id = $3
AND status = 'active'
AND (
ws_lease_token IS NULL
OR ws_lease_expires_at < now()
OR ws_lease_token = $1
)
RETURNING id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at
`
type AcquireChannelWSLeaseParams struct {
NewToken pgtype.Text `json:"new_token"`
NewExpiresAt pgtype.Timestamptz `json:"new_expires_at"`
ID pgtype.UUID `json:"id"`
}
// Atomically claims the WebSocket lease. CAS predicate accepts when no
// holder exists, the holder expired, or the holder is us (renewal).
func (q *Queries) AcquireChannelWSLease(ctx context.Context, arg AcquireChannelWSLeaseParams) (ChannelInstallation, error) {
row := q.db.QueryRow(ctx, acquireChannelWSLease, arg.NewToken, arg.NewExpiresAt, arg.ID)
var i ChannelInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.ChannelType,
&i.Config,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstallerUserID,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const backfillChannelInstallationRegionToFeishuLark = `-- name: BackfillChannelInstallationRegionToFeishuLark :execrows
UPDATE channel_installation
SET config = jsonb_set(config, '{region}', '"lark"'),
updated_at = now()
WHERE channel_type = 'feishu'
AND config ->> 'region' = 'feishu'
`
// Operator repair, feishu-only: flip every feishu installation still
// carrying region='feishu' to 'lark'. Called only on deployments whose
// legacy global base-URL override pointed at Lark international. Idempotent.
func (q *Queries) BackfillChannelInstallationRegionToFeishuLark(ctx context.Context) (int64, error) {
result, err := q.db.Exec(ctx, backfillChannelInstallationRegionToFeishuLark)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const claimChannelInboundDedup = `-- name: ClaimChannelInboundDedup :one
INSERT INTO channel_inbound_message_dedup (installation_id, message_id, claim_token)
VALUES ($1, $2, gen_random_uuid())
ON CONFLICT (installation_id, message_id) DO UPDATE
SET received_at = now(),
claim_token = gen_random_uuid()
WHERE channel_inbound_message_dedup.processed_at IS NULL
AND channel_inbound_message_dedup.received_at < now() - INTERVAL '60 seconds'
RETURNING installation_id, message_id, received_at, processed_at, claim_token
`
type ClaimChannelInboundDedupParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
MessageID string `json:"message_id"`
}
// =====================
// channel_inbound_message_dedup
// =====================
// Two-phase idempotency gate with owner fencing. Returns the row when a
// claim is acquired (fresh insert, or stale-reclaim of an in-flight claim
// older than 60s); returns no rows when terminal (processed) or actively
// in-flight. Every claim mints a fresh claim_token; Mark/Release are
// fenced on it. See the table comment in migration 123 / the lark
// predecessor for the full invariant set.
func (q *Queries) ClaimChannelInboundDedup(ctx context.Context, arg ClaimChannelInboundDedupParams) (ChannelInboundMessageDedup, error) {
row := q.db.QueryRow(ctx, claimChannelInboundDedup, arg.InstallationID, arg.MessageID)
var i ChannelInboundMessageDedup
err := row.Scan(
&i.InstallationID,
&i.MessageID,
&i.ReceivedAt,
&i.ProcessedAt,
&i.ClaimToken,
)
return i, err
}
const consumeChannelBindingToken = `-- name: ConsumeChannelBindingToken :one
UPDATE channel_binding_token
SET consumed_at = now()
WHERE token_hash = $1
AND consumed_at IS NULL
AND expires_at > now()
RETURNING token_hash, workspace_id, installation_id, channel_type, channel_user_id, expires_at, consumed_at, created_at
`
// Atomic redemption: returns the row only if the hash exists, is
// unconsumed, and unexpired. Two simultaneous redemptions cannot both win.
func (q *Queries) ConsumeChannelBindingToken(ctx context.Context, tokenHash string) (ChannelBindingToken, error) {
row := q.db.QueryRow(ctx, consumeChannelBindingToken, tokenHash)
var i ChannelBindingToken
err := row.Scan(
&i.TokenHash,
&i.WorkspaceID,
&i.InstallationID,
&i.ChannelType,
&i.ChannelUserID,
&i.ExpiresAt,
&i.ConsumedAt,
&i.CreatedAt,
)
return i, err
}
const createChannelBindingToken = `-- name: CreateChannelBindingToken :one
INSERT INTO channel_binding_token (
token_hash, workspace_id, installation_id, channel_type,
channel_user_id, expires_at
) VALUES (
$1, $2, $3, $4, $5, $6
)
RETURNING token_hash, workspace_id, installation_id, channel_type, channel_user_id, expires_at, consumed_at, created_at
`
type CreateChannelBindingTokenParams struct {
TokenHash string `json:"token_hash"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
InstallationID pgtype.UUID `json:"installation_id"`
ChannelType string `json:"channel_type"`
ChannelUserID string `json:"channel_user_id"`
ExpiresAt pgtype.Timestamptz `json:"expires_at"`
}
// =====================
// channel_binding_token
// =====================
// Mints a single-use binding token for an unbound platform user. TTL cap
// (15 min) enforced by the table CHECK in lockstep with
// channel.BindingTokenTTL. The HASH is stored, never the raw token.
func (q *Queries) CreateChannelBindingToken(ctx context.Context, arg CreateChannelBindingTokenParams) (ChannelBindingToken, error) {
row := q.db.QueryRow(ctx, createChannelBindingToken,
arg.TokenHash,
arg.WorkspaceID,
arg.InstallationID,
arg.ChannelType,
arg.ChannelUserID,
arg.ExpiresAt,
)
var i ChannelBindingToken
err := row.Scan(
&i.TokenHash,
&i.WorkspaceID,
&i.InstallationID,
&i.ChannelType,
&i.ChannelUserID,
&i.ExpiresAt,
&i.ConsumedAt,
&i.CreatedAt,
)
return i, err
}
const createChannelChatSessionBinding = `-- name: CreateChannelChatSessionBinding :one
INSERT INTO channel_chat_session_binding (
chat_session_id, installation_id, channel_type, channel_chat_id, chat_type
) VALUES (
$1, $2, $3, $4, $5
)
RETURNING id, chat_session_id, installation_id, channel_type, channel_chat_id, chat_type, last_message_id, last_thread_id, config, created_at
`
type CreateChannelChatSessionBindingParams struct {
ChatSessionID pgtype.UUID `json:"chat_session_id"`
InstallationID pgtype.UUID `json:"installation_id"`
ChannelType string `json:"channel_type"`
ChannelChatID string `json:"channel_chat_id"`
ChatType string `json:"chat_type"`
}
// =====================
// channel_chat_session_binding
// =====================
func (q *Queries) CreateChannelChatSessionBinding(ctx context.Context, arg CreateChannelChatSessionBindingParams) (ChannelChatSessionBinding, error) {
row := q.db.QueryRow(ctx, createChannelChatSessionBinding,
arg.ChatSessionID,
arg.InstallationID,
arg.ChannelType,
arg.ChannelChatID,
arg.ChatType,
)
var i ChannelChatSessionBinding
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.InstallationID,
&i.ChannelType,
&i.ChannelChatID,
&i.ChatType,
&i.LastMessageID,
&i.LastThreadID,
&i.Config,
&i.CreatedAt,
)
return i, err
}
const createChannelOutboundCardMessage = `-- name: CreateChannelOutboundCardMessage :one
INSERT INTO channel_outbound_card_message (
chat_session_id, task_id, channel_type, channel_chat_id,
channel_card_message_id, status
) VALUES (
$1, $6, $2, $3, $4, $5
)
RETURNING id, chat_session_id, task_id, channel_type, channel_chat_id, channel_card_message_id, status, last_patched_at, created_at
`
type CreateChannelOutboundCardMessageParams struct {
ChatSessionID pgtype.UUID `json:"chat_session_id"`
ChannelType string `json:"channel_type"`
ChannelChatID string `json:"channel_chat_id"`
ChannelCardMessageID string `json:"channel_card_message_id"`
Status string `json:"status"`
TaskID pgtype.UUID `json:"task_id"`
}
// =====================
// channel_outbound_card_message
// =====================
func (q *Queries) CreateChannelOutboundCardMessage(ctx context.Context, arg CreateChannelOutboundCardMessageParams) (ChannelOutboundCardMessage, error) {
row := q.db.QueryRow(ctx, createChannelOutboundCardMessage,
arg.ChatSessionID,
arg.ChannelType,
arg.ChannelChatID,
arg.ChannelCardMessageID,
arg.Status,
arg.TaskID,
)
var i ChannelOutboundCardMessage
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.TaskID,
&i.ChannelType,
&i.ChannelChatID,
&i.ChannelCardMessageID,
&i.Status,
&i.LastPatchedAt,
&i.CreatedAt,
)
return i, err
}
const createChannelUserBinding = `-- name: CreateChannelUserBinding :one
INSERT INTO channel_user_binding (
workspace_id, multica_user_id, installation_id,
channel_type, channel_user_id, config
) VALUES (
$1, $2, $3, $4, $5, $6
)
ON CONFLICT (installation_id, channel_user_id) DO UPDATE SET
config = channel_user_binding.config || EXCLUDED.config,
bound_at = now()
WHERE channel_user_binding.multica_user_id = EXCLUDED.multica_user_id
RETURNING id, workspace_id, multica_user_id, installation_id, channel_type, channel_user_id, config, bound_at
`
type CreateChannelUserBindingParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
MulticaUserID pgtype.UUID `json:"multica_user_id"`
InstallationID pgtype.UUID `json:"installation_id"`
ChannelType string `json:"channel_type"`
ChannelUserID string `json:"channel_user_id"`
Config []byte `json:"config"`
}
// =====================
// channel_user_binding
// =====================
// Records that a platform user id (per-installation; Feishu open_id) maps
// to a Multica user. The old composite member-FK is gone, so this no
// longer fails when the redeemer is not a workspace member — the caller
// (BindingTokenService.RedeemAndBind) validates membership explicitly
// before calling. ON CONFLICT DO UPDATE is still gated on multica_user_id
// matching, so a second redeemer cannot steal an already-bound user id;
// a cross-user conflict updates zero rows and the caller maps that to
// ErrBindingAlreadyAssigned. config carries secondary identity (union_id).
func (q *Queries) CreateChannelUserBinding(ctx context.Context, arg CreateChannelUserBindingParams) (ChannelUserBinding, error) {
row := q.db.QueryRow(ctx, createChannelUserBinding,
arg.WorkspaceID,
arg.MulticaUserID,
arg.InstallationID,
arg.ChannelType,
arg.ChannelUserID,
arg.Config,
)
var i ChannelUserBinding
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.MulticaUserID,
&i.InstallationID,
&i.ChannelType,
&i.ChannelUserID,
&i.Config,
&i.BoundAt,
)
return i, err
}
const deleteChannelChatSessionBindingBySession = `-- name: DeleteChannelChatSessionBindingBySession :exec
DELETE FROM channel_chat_session_binding
WHERE chat_session_id = $1
`
// Application-layer integrity (replaces the old chat_session-FK ON DELETE
// CASCADE): drop the binding when its chat_session is deleted.
func (q *Queries) DeleteChannelChatSessionBindingBySession(ctx context.Context, chatSessionID pgtype.UUID) error {
_, err := q.db.Exec(ctx, deleteChannelChatSessionBindingBySession, chatSessionID)
return err
}
const deleteChannelUserBindingsByWorkspaceMember = `-- name: DeleteChannelUserBindingsByWorkspaceMember :exec
DELETE FROM channel_user_binding
WHERE workspace_id = $1 AND multica_user_id = $2
`
type DeleteChannelUserBindingsByWorkspaceMemberParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
MulticaUserID pgtype.UUID `json:"multica_user_id"`
}
// Application-layer integrity (replaces the old member-FK ON DELETE
// CASCADE): prune every binding for a user who has been removed from a
// workspace, across all installations in that workspace.
func (q *Queries) DeleteChannelUserBindingsByWorkspaceMember(ctx context.Context, arg DeleteChannelUserBindingsByWorkspaceMemberParams) error {
_, err := q.db.Exec(ctx, deleteChannelUserBindingsByWorkspaceMember, arg.WorkspaceID, arg.MulticaUserID)
return err
}
const getChannelChatSessionBinding = `-- name: GetChannelChatSessionBinding :one
SELECT id, chat_session_id, installation_id, channel_type, channel_chat_id, chat_type, last_message_id, last_thread_id, config, created_at FROM channel_chat_session_binding
WHERE installation_id = $1 AND channel_chat_id = $2
`
type GetChannelChatSessionBindingParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
ChannelChatID string `json:"channel_chat_id"`
}
// Lookup-by-channel-chat: the inbound dispatcher finds the existing
// chat_session before deciding whether to create one.
func (q *Queries) GetChannelChatSessionBinding(ctx context.Context, arg GetChannelChatSessionBindingParams) (ChannelChatSessionBinding, error) {
row := q.db.QueryRow(ctx, getChannelChatSessionBinding, arg.InstallationID, arg.ChannelChatID)
var i ChannelChatSessionBinding
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.InstallationID,
&i.ChannelType,
&i.ChannelChatID,
&i.ChatType,
&i.LastMessageID,
&i.LastThreadID,
&i.Config,
&i.CreatedAt,
)
return i, err
}
const getChannelChatSessionBindingBySession = `-- name: GetChannelChatSessionBindingBySession :one
SELECT id, chat_session_id, installation_id, channel_type, channel_chat_id, chat_type, last_message_id, last_thread_id, config, created_at FROM channel_chat_session_binding
WHERE chat_session_id = $1
`
// Reverse lookup for the outbound patcher: given a chat_session_id, find
// its channel binding to know which (installation, chat_id) to send to.
func (q *Queries) GetChannelChatSessionBindingBySession(ctx context.Context, chatSessionID pgtype.UUID) (ChannelChatSessionBinding, error) {
row := q.db.QueryRow(ctx, getChannelChatSessionBindingBySession, chatSessionID)
var i ChannelChatSessionBinding
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.InstallationID,
&i.ChannelType,
&i.ChannelChatID,
&i.ChatType,
&i.LastMessageID,
&i.LastThreadID,
&i.Config,
&i.CreatedAt,
)
return i, err
}
const getChannelInstallation = `-- name: GetChannelInstallation :one
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation WHERE id = $1
`
func (q *Queries) GetChannelInstallation(ctx context.Context, id pgtype.UUID) (ChannelInstallation, error) {
row := q.db.QueryRow(ctx, getChannelInstallation, id)
var i ChannelInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.ChannelType,
&i.Config,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstallerUserID,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getChannelInstallationByAppID = `-- name: GetChannelInstallationByAppID :one
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation
WHERE channel_type = $1 AND config ->> 'app_id' = $2
`
type GetChannelInstallationByAppIDParams struct {
ChannelType string `json:"channel_type"`
Config []byte `json:"config"`
}
// Inbound routing. The platform event carries only the channel's app
// identifier (Feishu app_id); the dispatcher's installation resolver routes
// on (channel_type, config->>'app_id'). Backed by the functional unique
// index idx_channel_installation_type_appid.
func (q *Queries) GetChannelInstallationByAppID(ctx context.Context, arg GetChannelInstallationByAppIDParams) (ChannelInstallation, error) {
row := q.db.QueryRow(ctx, getChannelInstallationByAppID, arg.ChannelType, arg.Config)
var i ChannelInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.ChannelType,
&i.Config,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstallerUserID,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getChannelInstallationInWorkspace = `-- name: GetChannelInstallationInWorkspace :one
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation
WHERE id = $1 AND workspace_id = $2
`
type GetChannelInstallationInWorkspaceParams struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
}
func (q *Queries) GetChannelInstallationInWorkspace(ctx context.Context, arg GetChannelInstallationInWorkspaceParams) (ChannelInstallation, error) {
row := q.db.QueryRow(ctx, getChannelInstallationInWorkspace, arg.ID, arg.WorkspaceID)
var i ChannelInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.ChannelType,
&i.Config,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstallerUserID,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}
const getChannelOutboundCardByTask = `-- name: GetChannelOutboundCardByTask :one
SELECT id, chat_session_id, task_id, channel_type, channel_chat_id, channel_card_message_id, status, last_patched_at, created_at FROM channel_outbound_card_message
WHERE task_id = $1
`
// The partial unique index on (task_id) WHERE task_id IS NOT NULL
// guarantees at most one row.
func (q *Queries) GetChannelOutboundCardByTask(ctx context.Context, taskID pgtype.UUID) (ChannelOutboundCardMessage, error) {
row := q.db.QueryRow(ctx, getChannelOutboundCardByTask, taskID)
var i ChannelOutboundCardMessage
err := row.Scan(
&i.ID,
&i.ChatSessionID,
&i.TaskID,
&i.ChannelType,
&i.ChannelChatID,
&i.ChannelCardMessageID,
&i.Status,
&i.LastPatchedAt,
&i.CreatedAt,
)
return i, err
}
const getChannelUserBindingByUserID = `-- name: GetChannelUserBindingByUserID :one
SELECT id, workspace_id, multica_user_id, installation_id, channel_type, channel_user_id, config, bound_at FROM channel_user_binding
WHERE installation_id = $1 AND channel_user_id = $2
`
type GetChannelUserBindingByUserIDParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
ChannelUserID string `json:"channel_user_id"`
}
// The inbound identity lookup: does this platform user id map to a Multica
// user for this installation? With the member-FK removed, a row's
// existence no longer proves current workspace membership — the dispatcher
// re-checks membership after this lookup.
func (q *Queries) GetChannelUserBindingByUserID(ctx context.Context, arg GetChannelUserBindingByUserIDParams) (ChannelUserBinding, error) {
row := q.db.QueryRow(ctx, getChannelUserBindingByUserID, arg.InstallationID, arg.ChannelUserID)
var i ChannelUserBinding
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.MulticaUserID,
&i.InstallationID,
&i.ChannelType,
&i.ChannelUserID,
&i.Config,
&i.BoundAt,
)
return i, err
}
const listActiveChannelInstallations = `-- name: ListActiveChannelInstallations :many
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation
WHERE status = 'active'
ORDER BY created_at ASC
`
// Boot path for the inbound hub: every active installation, any channel
// type, so the hub can claim leases and open connections.
func (q *Queries) ListActiveChannelInstallations(ctx context.Context) ([]ChannelInstallation, error) {
rows, err := q.db.Query(ctx, listActiveChannelInstallations)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ChannelInstallation{}
for rows.Next() {
var i ChannelInstallation
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.ChannelType,
&i.Config,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstallerUserID,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listChannelInboundAuditByInstallation = `-- name: ListChannelInboundAuditByInstallation :many
SELECT id, installation_id, channel_type, channel_chat_id, event_type, channel_event_id, channel_message_id, drop_reason, received_at FROM channel_inbound_audit
WHERE installation_id = $1
ORDER BY received_at DESC
LIMIT $2 OFFSET $3
`
type ListChannelInboundAuditByInstallationParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
Limit int32 `json:"limit"`
Offset int32 `json:"offset"`
}
func (q *Queries) ListChannelInboundAuditByInstallation(ctx context.Context, arg ListChannelInboundAuditByInstallationParams) ([]ChannelInboundAudit, error) {
rows, err := q.db.Query(ctx, listChannelInboundAuditByInstallation, arg.InstallationID, arg.Limit, arg.Offset)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ChannelInboundAudit{}
for rows.Next() {
var i ChannelInboundAudit
if err := rows.Scan(
&i.ID,
&i.InstallationID,
&i.ChannelType,
&i.ChannelChatID,
&i.EventType,
&i.ChannelEventID,
&i.ChannelMessageID,
&i.DropReason,
&i.ReceivedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const listChannelInstallationsByWorkspace = `-- name: ListChannelInstallationsByWorkspace :many
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation
WHERE workspace_id = $1
ORDER BY created_at ASC
`
func (q *Queries) ListChannelInstallationsByWorkspace(ctx context.Context, workspaceID pgtype.UUID) ([]ChannelInstallation, error) {
rows, err := q.db.Query(ctx, listChannelInstallationsByWorkspace, workspaceID)
if err != nil {
return nil, err
}
defer rows.Close()
items := []ChannelInstallation{}
for rows.Next() {
var i ChannelInstallation
if err := rows.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.ChannelType,
&i.Config,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstallerUserID,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const markChannelInboundDedupProcessed = `-- name: MarkChannelInboundDedupProcessed :execrows
UPDATE channel_inbound_message_dedup
SET processed_at = now()
WHERE installation_id = $1
AND message_id = $2
AND claim_token = $3
AND processed_at IS NULL
`
type MarkChannelInboundDedupProcessedParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
MessageID string `json:"message_id"`
ClaimToken pgtype.UUID `json:"claim_token"`
}
// Locks a claim in as permanently processed after a durable outcome.
// Invoked inside the chat_message tx (via qtx) on the ingest path so the
// durable write and the Mark commit atomically. Token mismatch returns
// zero rows (a reclaim happened); the caller rolls back its in-tx write.
func (q *Queries) MarkChannelInboundDedupProcessed(ctx context.Context, arg MarkChannelInboundDedupProcessedParams) (int64, error) {
result, err := q.db.Exec(ctx, markChannelInboundDedupProcessed, arg.InstallationID, arg.MessageID, arg.ClaimToken)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const purgeChannelInboundDedup = `-- name: PurgeChannelInboundDedup :exec
DELETE FROM channel_inbound_message_dedup
WHERE received_at < $1
`
// Vacuum job: remove dedup rows older than the supplied cutoff (e.g. 24h).
func (q *Queries) PurgeChannelInboundDedup(ctx context.Context, receivedAt pgtype.Timestamptz) error {
_, err := q.db.Exec(ctx, purgeChannelInboundDedup, receivedAt)
return err
}
const purgeExpiredChannelBindingTokens = `-- name: PurgeExpiredChannelBindingTokens :exec
DELETE FROM channel_binding_token
WHERE expires_at < $1
`
func (q *Queries) PurgeExpiredChannelBindingTokens(ctx context.Context, expiresAt pgtype.Timestamptz) error {
_, err := q.db.Exec(ctx, purgeExpiredChannelBindingTokens, expiresAt)
return err
}
const recordChannelInboundDrop = `-- name: RecordChannelInboundDrop :exec
INSERT INTO channel_inbound_audit (
installation_id, channel_type, channel_chat_id, event_type,
channel_event_id, channel_message_id, drop_reason
) VALUES (
$4,
$1,
$5,
$2,
$6,
$7,
$3
)
`
type RecordChannelInboundDropParams struct {
ChannelType string `json:"channel_type"`
EventType string `json:"event_type"`
DropReason string `json:"drop_reason"`
InstallationID pgtype.UUID `json:"installation_id"`
ChannelChatID pgtype.Text `json:"channel_chat_id"`
ChannelEventID pgtype.Text `json:"channel_event_id"`
ChannelMessageID pgtype.Text `json:"channel_message_id"`
}
// =====================
// channel_inbound_audit
// =====================
// The only write path for dropped events. Deliberately carries no body
// column — only routing / identity / drop_reason / timestamp.
func (q *Queries) RecordChannelInboundDrop(ctx context.Context, arg RecordChannelInboundDropParams) error {
_, err := q.db.Exec(ctx, recordChannelInboundDrop,
arg.ChannelType,
arg.EventType,
arg.DropReason,
arg.InstallationID,
arg.ChannelChatID,
arg.ChannelEventID,
arg.ChannelMessageID,
)
return err
}
const releaseChannelInboundDedup = `-- name: ReleaseChannelInboundDedup :execrows
DELETE FROM channel_inbound_message_dedup
WHERE installation_id = $1
AND message_id = $2
AND claim_token = $3
AND processed_at IS NULL
`
type ReleaseChannelInboundDedupParams struct {
InstallationID pgtype.UUID `json:"installation_id"`
MessageID string `json:"message_id"`
ClaimToken pgtype.UUID `json:"claim_token"`
}
// Releases an in-flight claim when an infra error occurred before any
// durable side effect, so a retry can re-acquire immediately. Fenced on
// processed_at IS NULL and claim_token.
func (q *Queries) ReleaseChannelInboundDedup(ctx context.Context, arg ReleaseChannelInboundDedupParams) (int64, error) {
result, err := q.db.Exec(ctx, releaseChannelInboundDedup, arg.InstallationID, arg.MessageID, arg.ClaimToken)
if err != nil {
return 0, err
}
return result.RowsAffected(), nil
}
const releaseChannelWSLease = `-- name: ReleaseChannelWSLease :exec
UPDATE channel_installation
SET ws_lease_token = NULL,
ws_lease_expires_at = NULL,
updated_at = now()
WHERE id = $1
AND ws_lease_token = $2
`
type ReleaseChannelWSLeaseParams struct {
ID pgtype.UUID `json:"id"`
CurrentToken pgtype.Text `json:"current_token"`
}
// Drops the lease iff we are still the holder.
func (q *Queries) ReleaseChannelWSLease(ctx context.Context, arg ReleaseChannelWSLeaseParams) error {
_, err := q.db.Exec(ctx, releaseChannelWSLease, arg.ID, arg.CurrentToken)
return err
}
const setChannelInstallationConfig = `-- name: SetChannelInstallationConfig :exec
UPDATE channel_installation
SET config = $2, updated_at = now()
WHERE id = $1
`
type SetChannelInstallationConfigParams struct {
ID pgtype.UUID `json:"id"`
Config []byte `json:"config"`
}
// Replaces the whole config blob for one installation. Used by the
// operator backfills (e.g. setting a freshly-fetched bot_union_id) that
// read-modify-write the JSON in Go and persist it back atomically by id.
func (q *Queries) SetChannelInstallationConfig(ctx context.Context, arg SetChannelInstallationConfigParams) error {
_, err := q.db.Exec(ctx, setChannelInstallationConfig, arg.ID, arg.Config)
return err
}
const setChannelInstallationStatus = `-- name: SetChannelInstallationStatus :exec
UPDATE channel_installation
SET status = $2, updated_at = now()
WHERE id = $1
`
type SetChannelInstallationStatusParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
}
func (q *Queries) SetChannelInstallationStatus(ctx context.Context, arg SetChannelInstallationStatusParams) error {
_, err := q.db.Exec(ctx, setChannelInstallationStatus, arg.ID, arg.Status)
return err
}
const updateChannelChatSessionBindingReplyTarget = `-- name: UpdateChannelChatSessionBindingReplyTarget :exec
UPDATE channel_chat_session_binding
SET last_message_id = $2,
last_thread_id = $3
WHERE chat_session_id = $1
`
type UpdateChannelChatSessionBindingReplyTargetParams struct {
ChatSessionID pgtype.UUID `json:"chat_session_id"`
LastMessageID pgtype.Text `json:"last_message_id"`
LastThreadID pgtype.Text `json:"last_thread_id"`
}
// Records the most recent inbound trigger message + thread so the decoupled
// outbound patcher can thread its reply back into the originating topic.
func (q *Queries) UpdateChannelChatSessionBindingReplyTarget(ctx context.Context, arg UpdateChannelChatSessionBindingReplyTargetParams) error {
_, err := q.db.Exec(ctx, updateChannelChatSessionBindingReplyTarget, arg.ChatSessionID, arg.LastMessageID, arg.LastThreadID)
return err
}
const updateChannelOutboundCardStatus = `-- name: UpdateChannelOutboundCardStatus :exec
UPDATE channel_outbound_card_message
SET status = $2,
last_patched_at = now()
WHERE id = $1
`
type UpdateChannelOutboundCardStatusParams struct {
ID pgtype.UUID `json:"id"`
Status string `json:"status"`
}
func (q *Queries) UpdateChannelOutboundCardStatus(ctx context.Context, arg UpdateChannelOutboundCardStatusParams) error {
_, err := q.db.Exec(ctx, updateChannelOutboundCardStatus, arg.ID, arg.Status)
return err
}
const upsertChannelInstallation = `-- name: UpsertChannelInstallation :one
INSERT INTO channel_installation (
workspace_id, agent_id, channel_type, config, installer_user_id
) VALUES (
$1, $2, $3, $4, $5
)
ON CONFLICT (workspace_id, agent_id) DO UPDATE SET
channel_type = EXCLUDED.channel_type,
config = EXCLUDED.config,
installer_user_id = EXCLUDED.installer_user_id,
status = 'active',
installed_at = now(),
updated_at = now()
RETURNING id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at
`
type UpsertChannelInstallationParams struct {
WorkspaceID pgtype.UUID `json:"workspace_id"`
AgentID pgtype.UUID `json:"agent_id"`
ChannelType string `json:"channel_type"`
Config []byte `json:"config"`
InstallerUserID pgtype.UUID `json:"installer_user_id"`
}
// Platform-agnostic inbound channel queries (MUL-3515). These operate on
// the channel_* tables created in migration 123. Each installation carries
// a `channel_type` discriminator and a JSONB `config` blob for
// platform-specific identifiers/credentials; the cross-platform columns
// stay flat. The Go layer owns building/parsing config — these queries
// treat it as opaque JSON except for the routing index on config->>'app_id'.
//
// No foreign keys exist on these tables (MUL-3515 §4): the integrity the
// old composite FKs enforced (binding workspace matches installation;
// binding dies with membership / chat_session) is maintained in the
// application layer via the membership check in the inbound identity step
// and the *DeleteChannel*BindingsBy* cleanup queries below.
// =====================
// channel_installation
// =====================
// Install / re-install path. `config` is the opaque per-channel JSONB the
// Go layer assembles (for feishu: app_id, app_secret_encrypted, tenant_key,
// bot_open_id, bot_union_id, region). Re-installing the same agent replaces
// the whole config and forces status back to 'active'. The WS lease is
// intentionally NOT reset here — the inbound hub owns lease lifecycle.
func (q *Queries) UpsertChannelInstallation(ctx context.Context, arg UpsertChannelInstallationParams) (ChannelInstallation, error) {
row := q.db.QueryRow(ctx, upsertChannelInstallation,
arg.WorkspaceID,
arg.AgentID,
arg.ChannelType,
arg.Config,
arg.InstallerUserID,
)
var i ChannelInstallation
err := row.Scan(
&i.ID,
&i.WorkspaceID,
&i.AgentID,
&i.ChannelType,
&i.Config,
&i.Status,
&i.WsLeaseToken,
&i.WsLeaseExpiresAt,
&i.InstallerUserID,
&i.InstalledAt,
&i.CreatedAt,
&i.UpdatedAt,
)
return i, err
}

View File

@@ -177,6 +177,88 @@ type AutopilotTrigger struct {
EventFilters []byte `json:"event_filters"`
}
type ChannelBindingToken struct {
TokenHash string `json:"token_hash"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
InstallationID pgtype.UUID `json:"installation_id"`
ChannelType string `json:"channel_type"`
ChannelUserID string `json:"channel_user_id"`
ExpiresAt pgtype.Timestamptz `json:"expires_at"`
ConsumedAt pgtype.Timestamptz `json:"consumed_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type ChannelChatSessionBinding struct {
ID pgtype.UUID `json:"id"`
ChatSessionID pgtype.UUID `json:"chat_session_id"`
InstallationID pgtype.UUID `json:"installation_id"`
ChannelType string `json:"channel_type"`
ChannelChatID string `json:"channel_chat_id"`
ChatType string `json:"chat_type"`
LastMessageID pgtype.Text `json:"last_message_id"`
LastThreadID pgtype.Text `json:"last_thread_id"`
Config []byte `json:"config"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type ChannelInboundAudit struct {
ID pgtype.UUID `json:"id"`
InstallationID pgtype.UUID `json:"installation_id"`
ChannelType string `json:"channel_type"`
ChannelChatID pgtype.Text `json:"channel_chat_id"`
EventType string `json:"event_type"`
ChannelEventID pgtype.Text `json:"channel_event_id"`
ChannelMessageID pgtype.Text `json:"channel_message_id"`
DropReason string `json:"drop_reason"`
ReceivedAt pgtype.Timestamptz `json:"received_at"`
}
type ChannelInboundMessageDedup struct {
InstallationID pgtype.UUID `json:"installation_id"`
MessageID string `json:"message_id"`
ReceivedAt pgtype.Timestamptz `json:"received_at"`
ProcessedAt pgtype.Timestamptz `json:"processed_at"`
ClaimToken pgtype.UUID `json:"claim_token"`
}
type ChannelInstallation struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
AgentID pgtype.UUID `json:"agent_id"`
ChannelType string `json:"channel_type"`
Config []byte `json:"config"`
Status string `json:"status"`
WsLeaseToken pgtype.Text `json:"ws_lease_token"`
WsLeaseExpiresAt pgtype.Timestamptz `json:"ws_lease_expires_at"`
InstallerUserID pgtype.UUID `json:"installer_user_id"`
InstalledAt pgtype.Timestamptz `json:"installed_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
}
type ChannelOutboundCardMessage struct {
ID pgtype.UUID `json:"id"`
ChatSessionID pgtype.UUID `json:"chat_session_id"`
TaskID pgtype.UUID `json:"task_id"`
ChannelType string `json:"channel_type"`
ChannelChatID string `json:"channel_chat_id"`
ChannelCardMessageID string `json:"channel_card_message_id"`
Status string `json:"status"`
LastPatchedAt pgtype.Timestamptz `json:"last_patched_at"`
CreatedAt pgtype.Timestamptz `json:"created_at"`
}
type ChannelUserBinding struct {
ID pgtype.UUID `json:"id"`
WorkspaceID pgtype.UUID `json:"workspace_id"`
MulticaUserID pgtype.UUID `json:"multica_user_id"`
InstallationID pgtype.UUID `json:"installation_id"`
ChannelType string `json:"channel_type"`
ChannelUserID string `json:"channel_user_id"`
Config []byte `json:"config"`
BoundAt pgtype.Timestamptz `json:"bound_at"`
}
type ChatMessage struct {
ID pgtype.UUID `json:"id"`
ChatSessionID pgtype.UUID `json:"chat_session_id"`

View File

@@ -0,0 +1,317 @@
-- Platform-agnostic inbound channel queries (MUL-3515). These operate on
-- the channel_* tables created in migration 123. Each installation carries
-- a `channel_type` discriminator and a JSONB `config` blob for
-- platform-specific identifiers/credentials; the cross-platform columns
-- stay flat. The Go layer owns building/parsing config — these queries
-- treat it as opaque JSON except for the routing index on config->>'app_id'.
--
-- No foreign keys exist on these tables (MUL-3515 §4): the integrity the
-- old composite FKs enforced (binding workspace matches installation;
-- binding dies with membership / chat_session) is maintained in the
-- application layer via the membership check in the inbound identity step
-- and the *DeleteChannel*BindingsBy* cleanup queries below.
-- =====================
-- channel_installation
-- =====================
-- name: UpsertChannelInstallation :one
-- Install / re-install path. `config` is the opaque per-channel JSONB the
-- Go layer assembles (for feishu: app_id, app_secret_encrypted, tenant_key,
-- bot_open_id, bot_union_id, region). Re-installing the same agent replaces
-- the whole config and forces status back to 'active'. The WS lease is
-- intentionally NOT reset here — the inbound hub owns lease lifecycle.
INSERT INTO channel_installation (
workspace_id, agent_id, channel_type, config, installer_user_id
) VALUES (
$1, $2, $3, $4, $5
)
ON CONFLICT (workspace_id, agent_id) DO UPDATE SET
channel_type = EXCLUDED.channel_type,
config = EXCLUDED.config,
installer_user_id = EXCLUDED.installer_user_id,
status = 'active',
installed_at = now(),
updated_at = now()
RETURNING *;
-- name: GetChannelInstallation :one
SELECT * FROM channel_installation WHERE id = $1;
-- name: GetChannelInstallationInWorkspace :one
SELECT * FROM channel_installation
WHERE id = $1 AND workspace_id = $2;
-- name: GetChannelInstallationByAppID :one
-- Inbound routing. The platform event carries only the channel's app
-- identifier (Feishu app_id); the dispatcher's installation resolver routes
-- on (channel_type, config->>'app_id'). Backed by the functional unique
-- index idx_channel_installation_type_appid.
SELECT * FROM channel_installation
WHERE channel_type = $1 AND config ->> 'app_id' = $2;
-- name: ListChannelInstallationsByWorkspace :many
SELECT * FROM channel_installation
WHERE workspace_id = $1
ORDER BY created_at ASC;
-- name: ListActiveChannelInstallations :many
-- Boot path for the inbound hub: every active installation, any channel
-- type, so the hub can claim leases and open connections.
SELECT * FROM channel_installation
WHERE status = 'active'
ORDER BY created_at ASC;
-- name: SetChannelInstallationStatus :exec
UPDATE channel_installation
SET status = $2, updated_at = now()
WHERE id = $1;
-- name: SetChannelInstallationConfig :exec
-- Replaces the whole config blob for one installation. Used by the
-- operator backfills (e.g. setting a freshly-fetched bot_union_id) that
-- read-modify-write the JSON in Go and persist it back atomically by id.
UPDATE channel_installation
SET config = $2, updated_at = now()
WHERE id = $1;
-- name: BackfillChannelInstallationRegionToFeishuLark :execrows
-- Operator repair, feishu-only: flip every feishu installation still
-- carrying region='feishu' to 'lark'. Called only on deployments whose
-- legacy global base-URL override pointed at Lark international. Idempotent.
UPDATE channel_installation
SET config = jsonb_set(config, '{region}', '"lark"'),
updated_at = now()
WHERE channel_type = 'feishu'
AND config ->> 'region' = 'feishu';
-- name: AcquireChannelWSLease :one
-- Atomically claims the WebSocket lease. CAS predicate accepts when no
-- holder exists, the holder expired, or the holder is us (renewal).
UPDATE channel_installation
SET ws_lease_token = sqlc.arg('new_token'),
ws_lease_expires_at = sqlc.arg('new_expires_at'),
updated_at = now()
WHERE id = sqlc.arg('id')
AND status = 'active'
AND (
ws_lease_token IS NULL
OR ws_lease_expires_at < now()
OR ws_lease_token = sqlc.arg('new_token')
)
RETURNING *;
-- name: ReleaseChannelWSLease :exec
-- Drops the lease iff we are still the holder.
UPDATE channel_installation
SET ws_lease_token = NULL,
ws_lease_expires_at = NULL,
updated_at = now()
WHERE id = $1
AND ws_lease_token = sqlc.arg('current_token');
-- =====================
-- channel_user_binding
-- =====================
-- name: CreateChannelUserBinding :one
-- Records that a platform user id (per-installation; Feishu open_id) maps
-- to a Multica user. The old composite member-FK is gone, so this no
-- longer fails when the redeemer is not a workspace member — the caller
-- (BindingTokenService.RedeemAndBind) validates membership explicitly
-- before calling. ON CONFLICT DO UPDATE is still gated on multica_user_id
-- matching, so a second redeemer cannot steal an already-bound user id;
-- a cross-user conflict updates zero rows and the caller maps that to
-- ErrBindingAlreadyAssigned. config carries secondary identity (union_id).
INSERT INTO channel_user_binding (
workspace_id, multica_user_id, installation_id,
channel_type, channel_user_id, config
) VALUES (
$1, $2, $3, $4, $5, $6
)
ON CONFLICT (installation_id, channel_user_id) DO UPDATE SET
config = channel_user_binding.config || EXCLUDED.config,
bound_at = now()
WHERE channel_user_binding.multica_user_id = EXCLUDED.multica_user_id
RETURNING *;
-- name: GetChannelUserBindingByUserID :one
-- The inbound identity lookup: does this platform user id map to a Multica
-- user for this installation? With the member-FK removed, a row's
-- existence no longer proves current workspace membership — the dispatcher
-- re-checks membership after this lookup.
SELECT * FROM channel_user_binding
WHERE installation_id = $1 AND channel_user_id = $2;
-- name: DeleteChannelUserBindingsByWorkspaceMember :exec
-- Application-layer integrity (replaces the old member-FK ON DELETE
-- CASCADE): prune every binding for a user who has been removed from a
-- workspace, across all installations in that workspace.
DELETE FROM channel_user_binding
WHERE workspace_id = $1 AND multica_user_id = $2;
-- =====================
-- channel_chat_session_binding
-- =====================
-- name: CreateChannelChatSessionBinding :one
INSERT INTO channel_chat_session_binding (
chat_session_id, installation_id, channel_type, channel_chat_id, chat_type
) VALUES (
$1, $2, $3, $4, $5
)
RETURNING *;
-- name: GetChannelChatSessionBinding :one
-- Lookup-by-channel-chat: the inbound dispatcher finds the existing
-- chat_session before deciding whether to create one.
SELECT * FROM channel_chat_session_binding
WHERE installation_id = $1 AND channel_chat_id = $2;
-- name: GetChannelChatSessionBindingBySession :one
-- Reverse lookup for the outbound patcher: given a chat_session_id, find
-- its channel binding to know which (installation, chat_id) to send to.
SELECT * FROM channel_chat_session_binding
WHERE chat_session_id = $1;
-- name: UpdateChannelChatSessionBindingReplyTarget :exec
-- Records the most recent inbound trigger message + thread so the decoupled
-- outbound patcher can thread its reply back into the originating topic.
UPDATE channel_chat_session_binding
SET last_message_id = sqlc.narg('last_message_id'),
last_thread_id = sqlc.narg('last_thread_id')
WHERE chat_session_id = $1;
-- name: DeleteChannelChatSessionBindingBySession :exec
-- Application-layer integrity (replaces the old chat_session-FK ON DELETE
-- CASCADE): drop the binding when its chat_session is deleted.
DELETE FROM channel_chat_session_binding
WHERE chat_session_id = $1;
-- =====================
-- channel_inbound_message_dedup
-- =====================
-- name: ClaimChannelInboundDedup :one
-- Two-phase idempotency gate with owner fencing. Returns the row when a
-- claim is acquired (fresh insert, or stale-reclaim of an in-flight claim
-- older than 60s); returns no rows when terminal (processed) or actively
-- in-flight. Every claim mints a fresh claim_token; Mark/Release are
-- fenced on it. See the table comment in migration 123 / the lark
-- predecessor for the full invariant set.
INSERT INTO channel_inbound_message_dedup (installation_id, message_id, claim_token)
VALUES ($1, $2, gen_random_uuid())
ON CONFLICT (installation_id, message_id) DO UPDATE
SET received_at = now(),
claim_token = gen_random_uuid()
WHERE channel_inbound_message_dedup.processed_at IS NULL
AND channel_inbound_message_dedup.received_at < now() - INTERVAL '60 seconds'
RETURNING installation_id, message_id, received_at, processed_at, claim_token;
-- name: MarkChannelInboundDedupProcessed :execrows
-- Locks a claim in as permanently processed after a durable outcome.
-- Invoked inside the chat_message tx (via qtx) on the ingest path so the
-- durable write and the Mark commit atomically. Token mismatch returns
-- zero rows (a reclaim happened); the caller rolls back its in-tx write.
UPDATE channel_inbound_message_dedup
SET processed_at = now()
WHERE installation_id = $1
AND message_id = $2
AND claim_token = $3
AND processed_at IS NULL;
-- name: ReleaseChannelInboundDedup :execrows
-- Releases an in-flight claim when an infra error occurred before any
-- durable side effect, so a retry can re-acquire immediately. Fenced on
-- processed_at IS NULL and claim_token.
DELETE FROM channel_inbound_message_dedup
WHERE installation_id = $1
AND message_id = $2
AND claim_token = $3
AND processed_at IS NULL;
-- name: PurgeChannelInboundDedup :exec
-- Vacuum job: remove dedup rows older than the supplied cutoff (e.g. 24h).
DELETE FROM channel_inbound_message_dedup
WHERE received_at < $1;
-- =====================
-- channel_inbound_audit
-- =====================
-- name: RecordChannelInboundDrop :exec
-- The only write path for dropped events. Deliberately carries no body
-- column — only routing / identity / drop_reason / timestamp.
INSERT INTO channel_inbound_audit (
installation_id, channel_type, channel_chat_id, event_type,
channel_event_id, channel_message_id, drop_reason
) VALUES (
sqlc.narg('installation_id'),
$1,
sqlc.narg('channel_chat_id'),
$2,
sqlc.narg('channel_event_id'),
sqlc.narg('channel_message_id'),
$3
);
-- name: ListChannelInboundAuditByInstallation :many
SELECT * FROM channel_inbound_audit
WHERE installation_id = $1
ORDER BY received_at DESC
LIMIT $2 OFFSET $3;
-- =====================
-- channel_outbound_card_message
-- =====================
-- name: CreateChannelOutboundCardMessage :one
INSERT INTO channel_outbound_card_message (
chat_session_id, task_id, channel_type, channel_chat_id,
channel_card_message_id, status
) VALUES (
$1, sqlc.narg('task_id'), $2, $3, $4, $5
)
RETURNING *;
-- name: GetChannelOutboundCardByTask :one
-- The partial unique index on (task_id) WHERE task_id IS NOT NULL
-- guarantees at most one row.
SELECT * FROM channel_outbound_card_message
WHERE task_id = $1;
-- name: UpdateChannelOutboundCardStatus :exec
UPDATE channel_outbound_card_message
SET status = $2,
last_patched_at = now()
WHERE id = $1;
-- =====================
-- channel_binding_token
-- =====================
-- name: CreateChannelBindingToken :one
-- Mints a single-use binding token for an unbound platform user. TTL cap
-- (15 min) enforced by the table CHECK in lockstep with
-- channel.BindingTokenTTL. The HASH is stored, never the raw token.
INSERT INTO channel_binding_token (
token_hash, workspace_id, installation_id, channel_type,
channel_user_id, expires_at
) VALUES (
$1, $2, $3, $4, $5, $6
)
RETURNING *;
-- name: ConsumeChannelBindingToken :one
-- Atomic redemption: returns the row only if the hash exists, is
-- unconsumed, and unexpired. Two simultaneous redemptions cannot both win.
UPDATE channel_binding_token
SET consumed_at = now()
WHERE token_hash = $1
AND consumed_at IS NULL
AND expires_at > now()
RETURNING *;
-- name: PurgeExpiredChannelBindingTokens :exec
DELETE FROM channel_binding_token
WHERE expires_at < $1;