mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-22 15:09:22 +02:00
Compare commits
2 Commits
main
...
agent/j/c3
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
37ca88be7e | ||
|
|
5374c21a99 |
94
server/internal/integrations/channel/capability.go
Normal file
94
server/internal/integrations/channel/capability.go
Normal file
@@ -0,0 +1,94 @@
|
||||
package channel
|
||||
|
||||
import "strings"
|
||||
|
||||
// Capability is a bitmask a Channel uses to DECLARE what it supports. It
|
||||
// is declaration only: this package contains no degrade logic. A caller
|
||||
// that wants to degrade output (rich card → plain text when CapRichCard
|
||||
// is absent) reads the bitmask and decides for itself, so adding a new
|
||||
// platform never forces a branch into the core. Capabilities() returns a
|
||||
// Channel's fixed set; the zero value declares nothing.
|
||||
type Capability uint64
|
||||
|
||||
const (
|
||||
// CapText — can deliver a plain text message. Every Channel is
|
||||
// expected to declare at least this.
|
||||
CapText Capability = 1 << iota
|
||||
// CapRichCard — can render a rich / interactive card (Lark
|
||||
// interactive card, Slack Block Kit, …).
|
||||
CapRichCard
|
||||
// CapThreadReply — can post a reply into a thread / topic.
|
||||
CapThreadReply
|
||||
// CapQuoteReply — can quote-reply to a specific message.
|
||||
CapQuoteReply
|
||||
// CapAttachment — can send and/or receive media attachments.
|
||||
CapAttachment
|
||||
// CapVoice — can handle voice / audio messages.
|
||||
CapVoice
|
||||
// CapTypingIndicator — can show a typing / "thinking" indicator.
|
||||
CapTypingIndicator
|
||||
// CapMessageEdit — can edit a message after it was sent (Lark card
|
||||
// patch, Slack chat.update, …).
|
||||
CapMessageEdit
|
||||
)
|
||||
|
||||
// capabilityNames maps single-bit capabilities to a stable, lower-case
|
||||
// name for String(). Order matches the bit order above so String() reads
|
||||
// least-significant-bit first.
|
||||
var capabilityNames = []struct {
|
||||
bit Capability
|
||||
name string
|
||||
}{
|
||||
{CapText, "text"},
|
||||
{CapRichCard, "rich_card"},
|
||||
{CapThreadReply, "thread_reply"},
|
||||
{CapQuoteReply, "quote_reply"},
|
||||
{CapAttachment, "attachment"},
|
||||
{CapVoice, "voice"},
|
||||
{CapTypingIndicator, "typing_indicator"},
|
||||
{CapMessageEdit, "message_edit"},
|
||||
}
|
||||
|
||||
// Has reports whether c declares every capability in want. Has(0) is
|
||||
// true (the empty requirement is always satisfied). Because want may be a
|
||||
// combination of bits, this is an "includes all of" test, not "any of".
|
||||
func (c Capability) Has(want Capability) bool {
|
||||
return c&want == want
|
||||
}
|
||||
|
||||
// String renders the set bits as a "|"-joined list of names ("text|
|
||||
// thread_reply"), "none" for the zero value, and appends any unknown
|
||||
// high bits as a hex remainder so a forgotten name never silently
|
||||
// vanishes from logs. It is for diagnostics only.
|
||||
func (c Capability) String() string {
|
||||
if c == 0 {
|
||||
return "none"
|
||||
}
|
||||
var (
|
||||
parts []string
|
||||
remaining = c
|
||||
)
|
||||
for _, cn := range capabilityNames {
|
||||
if remaining&cn.bit == cn.bit {
|
||||
parts = append(parts, cn.name)
|
||||
remaining &^= cn.bit
|
||||
}
|
||||
}
|
||||
if remaining != 0 {
|
||||
parts = append(parts, "0x"+strings.TrimLeft(hex(uint64(remaining)), "0"))
|
||||
}
|
||||
return strings.Join(parts, "|")
|
||||
}
|
||||
|
||||
// hex formats v as a fixed lower-case hex string without importing fmt
|
||||
// (keeps this leaf file dependency-free). Only used by String() for the
|
||||
// rare unknown-bit remainder.
|
||||
func hex(v uint64) string {
|
||||
const digits = "0123456789abcdef"
|
||||
var buf [16]byte
|
||||
for i := 15; i >= 0; i-- {
|
||||
buf[i] = digits[v&0xf]
|
||||
v >>= 4
|
||||
}
|
||||
return string(buf[:])
|
||||
}
|
||||
67
server/internal/integrations/channel/capability_test.go
Normal file
67
server/internal/integrations/channel/capability_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package channel
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestCapability_Has(t *testing.T) {
|
||||
c := CapText | CapThreadReply
|
||||
|
||||
if !c.Has(CapText) {
|
||||
t.Errorf("Has(CapText) = false, want true")
|
||||
}
|
||||
if !c.Has(CapText | CapThreadReply) {
|
||||
t.Errorf("Has(text|thread) = false, want true (includes-all)")
|
||||
}
|
||||
if c.Has(CapRichCard) {
|
||||
t.Errorf("Has(CapRichCard) = true, want false")
|
||||
}
|
||||
if c.Has(CapText | CapRichCard) {
|
||||
t.Errorf("Has(text|rich) = true, want false (one bit missing)")
|
||||
}
|
||||
// The empty requirement is always satisfied.
|
||||
if !c.Has(0) {
|
||||
t.Errorf("Has(0) = false, want true")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCapability_String(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
cap Capability
|
||||
want string
|
||||
}{
|
||||
{"zero", 0, "none"},
|
||||
{"single", CapText, "text"},
|
||||
{"ordered_lsb_first", CapThreadReply | CapText, "text|thread_reply"},
|
||||
{"all_named", CapMessageEdit, "message_edit"},
|
||||
{"unknown_high_bit", CapText | (1 << 40), "text|0x10000000000"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := tt.cap.String(); got != tt.want {
|
||||
t.Errorf("Capability(%#x).String() = %q, want %q", uint64(tt.cap), got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestCapability_BitsDistinct guards against two constants accidentally
|
||||
// sharing a bit after an edit to the iota block.
|
||||
func TestCapability_BitsDistinct(t *testing.T) {
|
||||
all := []Capability{
|
||||
CapText, CapRichCard, CapThreadReply, CapQuoteReply,
|
||||
CapAttachment, CapVoice, CapTypingIndicator, CapMessageEdit,
|
||||
}
|
||||
var seen Capability
|
||||
for i, c := range all {
|
||||
if c == 0 {
|
||||
t.Fatalf("capability index %d is zero", i)
|
||||
}
|
||||
if c&(c-1) != 0 {
|
||||
t.Fatalf("capability index %d (%#x) is not a single bit", i, uint64(c))
|
||||
}
|
||||
if seen&c != 0 {
|
||||
t.Fatalf("capability index %d (%#x) overlaps an earlier bit", i, uint64(c))
|
||||
}
|
||||
seen |= c
|
||||
}
|
||||
}
|
||||
81
server/internal/integrations/channel/channel.go
Normal file
81
server/internal/integrations/channel/channel.go
Normal file
@@ -0,0 +1,81 @@
|
||||
package channel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// Type identifies an inbound channel platform — the discriminator the
|
||||
// Registry keys on and the value persisted in the channel_type column of
|
||||
// the generalized channel_* tables. Use the lower-case platform slug
|
||||
// ("feishu", "slack", "wecom", …); keep it stable, it is durable data.
|
||||
type Type string
|
||||
|
||||
const (
|
||||
// TypeFeishu is the Feishu / Lark adapter — the only implementation
|
||||
// in phase 1. It serves both the mainland Feishu cloud and the Lark
|
||||
// international cloud; the cloud (region) is per-installation config,
|
||||
// not a separate Type.
|
||||
TypeFeishu Type = "feishu"
|
||||
)
|
||||
|
||||
// Channel is the platform-agnostic contract every IM integration
|
||||
// implements. An adapter keeps ALL platform specifics behind these five
|
||||
// methods: the core supervisor calls Connect/Disconnect to manage the
|
||||
// link, Send to deliver an outbound reply, and reads Capabilities to
|
||||
// decide how to render; it never touches platform SDKs or wire formats.
|
||||
//
|
||||
// Inbound is intentionally NOT on this interface. A Channel pushes
|
||||
// normalized InboundMessage values into the core router via the wiring
|
||||
// established at construction (the adapter owns its receive loop); the
|
||||
// core does not poll the Channel for messages.
|
||||
type Channel interface {
|
||||
// Type reports the platform discriminator. It MUST equal the Type
|
||||
// the Channel was registered under, and is stable for the lifetime
|
||||
// of the instance.
|
||||
Type() Type
|
||||
|
||||
// Connect establishes the platform link (e.g. dials the outbound
|
||||
// WebSocket long-conn, or starts the inbound HTTP listener). The
|
||||
// connection mode is the implementation's choice and invisible to
|
||||
// the core. Connect blocks only until the link is established (or
|
||||
// fails); the ongoing receive loop runs in the background and is
|
||||
// torn down by Disconnect or by ctx cancellation.
|
||||
Connect(ctx context.Context) error
|
||||
|
||||
// Disconnect tears the platform link down and releases its
|
||||
// resources. It is safe to call after a failed Connect and safe to
|
||||
// call more than once; a Channel that is already disconnected
|
||||
// returns nil.
|
||||
Disconnect(ctx context.Context) error
|
||||
|
||||
// Send delivers a single outbound message and returns the platform's
|
||||
// identifier for the delivered message. A non-nil error is reserved
|
||||
// for real delivery failures (network, auth, rate limit) that the
|
||||
// caller may retry.
|
||||
Send(ctx context.Context, out OutboundMessage) (SendResult, error)
|
||||
|
||||
// Capabilities declares what this Channel supports. It is a pure
|
||||
// declaration with no side effects and a stable result; callers read
|
||||
// it to choose a rendering and degrade on their own (this package
|
||||
// performs no degradation — see the Capability docs).
|
||||
Capabilities() Capability
|
||||
}
|
||||
|
||||
// Config is the normalized per-installation configuration a Factory
|
||||
// consumes. Type is the platform discriminator; Raw is the platform's
|
||||
// own credential/config blob (Feishu's app_id / encrypted app_secret /
|
||||
// tenant_key / region, Slack's bot/app tokens, …), carried opaquely so
|
||||
// the foundation never grows a per-platform field. It maps directly onto
|
||||
// the channel_type column + JSONB config of a channel_installation row
|
||||
// (MUL-3515 decision §3).
|
||||
type Config struct {
|
||||
Type Type
|
||||
Raw json.RawMessage
|
||||
}
|
||||
|
||||
// Factory builds a Channel from its per-installation Config. Each adapter
|
||||
// registers exactly one Factory under its Type; the Registry calls it to
|
||||
// instantiate a per-installation Channel. A Factory should validate Raw
|
||||
// and return an error rather than a half-built Channel.
|
||||
type Factory func(cfg Config) (Channel, error)
|
||||
45
server/internal/integrations/channel/doc.go
Normal file
45
server/internal/integrations/channel/doc.go
Normal file
@@ -0,0 +1,45 @@
|
||||
// Package channel is the platform-agnostic foundation for Multica's
|
||||
// inbound IM integrations (Feishu/Lark, Slack, WeCom, …). It owns the
|
||||
// contract every integration implements so the core never learns what a
|
||||
// given platform's event JSON looks like — design tracked in MUL-3506,
|
||||
// phase-1 foundation in MUL-3515.
|
||||
//
|
||||
// The contract has four pieces:
|
||||
//
|
||||
// 1. Channel — the per-integration interface
|
||||
// (Type / Connect / Disconnect / Send / Capabilities). An adapter
|
||||
// translates platform payloads in both directions and owns its own
|
||||
// connection mode (outbound WebSocket long-conn, inbound HTTP, …);
|
||||
// the core only calls these five methods.
|
||||
//
|
||||
// 2. InboundMessage / OutboundMessage — the normalized message
|
||||
// envelopes. Every platform's inbound payload is translated by its
|
||||
// adapter into one InboundMessage; the core routes, dedups, and
|
||||
// persists only this struct. Outbound is the minimal text reply
|
||||
// (ChatID + Text + optional thread / reply target) — rich cards,
|
||||
// media, and outbound webhooks are deliberately out of scope here
|
||||
// and stay inside the adapter that supports them.
|
||||
//
|
||||
// 3. Capability — a bitmask each Channel uses to DECLARE what it can
|
||||
// do (rich cards, threads, attachments, …). This package only
|
||||
// models the declaration; it intentionally contains no degrade
|
||||
// logic. Callers that want to degrade (rich card → plain text)
|
||||
// read the bitmask and decide for themselves, so adding a platform
|
||||
// never forces an if/else into the core.
|
||||
//
|
||||
// 4. Registry — a Type→Factory map with last-writer-wins semantics.
|
||||
// Adding a platform is "register a factory", not "edit the core".
|
||||
//
|
||||
// Boundary rule (MUL-3515 decision §2): the envelope holds ONLY fields
|
||||
// that are true across every platform — Text, a normalized message-type
|
||||
// enum, media references, the reply/thread anchors, the routing Source,
|
||||
// and the event/message ids used for dedup. Anything platform-specific
|
||||
// (a Lark raw msg_type, parent_id, root_id, …) lives in Raw and is read
|
||||
// ONLY by the adapter that produced it. The core never reads Raw.
|
||||
//
|
||||
// This package is pure: it has no database, network, or platform
|
||||
// dependencies, and nothing in it imports another integration package.
|
||||
// The concrete Feishu/Lark adapter, the DB-backed installation/identity/
|
||||
// session resolvers, and the supervisor that drives Connect/Disconnect
|
||||
// are wired in the follow-up cutover (see MUL-3515).
|
||||
package channel
|
||||
175
server/internal/integrations/channel/message.go
Normal file
175
server/internal/integrations/channel/message.go
Normal file
@@ -0,0 +1,175 @@
|
||||
package channel
|
||||
|
||||
import "encoding/json"
|
||||
|
||||
// ChatType discriminates a 1:1 direct conversation with the bot from a
|
||||
// multi-party group chat. Product behavior differs: direct chats ingest
|
||||
// every message; group chats only ingest messages explicitly addressed
|
||||
// to the bot (@-mention or reply to a bot message). The wire values match
|
||||
// the existing lark_chat_session_binding.lark_chat_type constraint so the
|
||||
// generalized channel_* table backfills 1:1.
|
||||
type ChatType string
|
||||
|
||||
const (
|
||||
// ChatTypeP2P is a direct (peer-to-peer) conversation with the bot.
|
||||
ChatTypeP2P ChatType = "p2p"
|
||||
// ChatTypeGroup is a multi-party group conversation.
|
||||
ChatTypeGroup ChatType = "group"
|
||||
)
|
||||
|
||||
// MsgType is the normalized, cross-platform message kind. Adapters map
|
||||
// their platform's native type onto this small closed set; the platform's
|
||||
// raw type string (Lark "post" / "merge_forward" / "interactive", …) is
|
||||
// NOT represented here — it stays in InboundMessage.Raw and is read only
|
||||
// by the adapter. The core only ever needs to know "text vs media, and
|
||||
// which media".
|
||||
type MsgType string
|
||||
|
||||
const (
|
||||
// MsgTypeText is a plain or rich text message. The human-readable
|
||||
// content is flattened into InboundMessage.Text by the adapter.
|
||||
MsgTypeText MsgType = "text"
|
||||
// MsgTypeImage is an image attachment.
|
||||
MsgTypeImage MsgType = "image"
|
||||
// MsgTypeFile is a generic file attachment.
|
||||
MsgTypeFile MsgType = "file"
|
||||
// MsgTypeAudio is a voice / audio attachment.
|
||||
MsgTypeAudio MsgType = "audio"
|
||||
// MsgTypeVideo is a video attachment.
|
||||
MsgTypeVideo MsgType = "video"
|
||||
// MsgTypeUnknown is the fallback for a platform type the adapter does
|
||||
// not map. The core treats it as a non-text, non-actionable message.
|
||||
MsgTypeUnknown MsgType = "unknown"
|
||||
)
|
||||
|
||||
// Source carries the cross-platform routing identity of an inbound
|
||||
// message — every field here is true on every platform. Platform-specific
|
||||
// routing keys (a Lark app_id, a Slack team id) are resolved to an
|
||||
// installation by the adapter and do NOT appear on Source.
|
||||
type Source struct {
|
||||
// ChannelType is the platform the message arrived on; it equals the
|
||||
// owning Channel's Type.
|
||||
ChannelType Type
|
||||
|
||||
// ChatID is the platform conversation identifier. One ChatID maps to
|
||||
// one Multica chat_session via the channel_chat_session_binding.
|
||||
ChatID string
|
||||
|
||||
// ChatType discriminates direct from group conversations.
|
||||
ChatType ChatType
|
||||
|
||||
// SenderID is the platform-native, per-installation user identifier
|
||||
// (Lark open_id, Slack user id, …). It is stable WITHIN one
|
||||
// installation and is the key the identity binding is stored under.
|
||||
// It is NOT comparable across installations.
|
||||
SenderID string
|
||||
|
||||
// SenderStableID is the platform's cross-installation stable identity
|
||||
// for the sender when one exists (Lark union_id, …), otherwise empty.
|
||||
// Captured opportunistically for future cross-installation identity
|
||||
// merging; the core treats an empty value as "not available".
|
||||
SenderStableID string
|
||||
|
||||
// ThreadID is the platform thread / topic the message belongs to,
|
||||
// when threading applies and the message is inside a thread. Empty
|
||||
// means a top-level conversation message. The core persists it so a
|
||||
// decoupled outbound reply can be threaded back into the same topic.
|
||||
ThreadID string
|
||||
}
|
||||
|
||||
// MediaRef references a media attachment that the adapter has ALREADY
|
||||
// persisted to object storage before the message reaches the core. The
|
||||
// core never holds raw bytes — only this reference — so the envelope
|
||||
// stays small and platform-neutral.
|
||||
type MediaRef struct {
|
||||
// Type is the normalized media kind (image / file / audio / video).
|
||||
Type MsgType
|
||||
// StorageKey locates the persisted object in Multica object storage.
|
||||
StorageKey string
|
||||
// Filename is the original display name, when the platform supplies
|
||||
// one.
|
||||
Filename string
|
||||
// MimeType is the content type, when known.
|
||||
MimeType string
|
||||
// SizeBytes is the object size in bytes, or 0 when unknown.
|
||||
SizeBytes int64
|
||||
}
|
||||
|
||||
// ReplyCtx describes the message an inbound message quotes / replies to.
|
||||
// It is nil when the inbound message is not a reply.
|
||||
type ReplyCtx struct {
|
||||
// MessageID is the immediate parent message's platform id (the
|
||||
// message being quoted).
|
||||
MessageID string
|
||||
// RootID is the thread/root anchor the platform reports, when any.
|
||||
RootID string
|
||||
}
|
||||
|
||||
// InboundMessage is the single normalized shape the core consumes. Every
|
||||
// adapter translates its platform's raw payload into this struct; the
|
||||
// core's router, dedup, identity check, and persistence read ONLY these
|
||||
// fields. Per the boundary rule (MUL-3515 §2) the struct holds only
|
||||
// cross-platform-true fields; everything platform-specific lives in Raw.
|
||||
type InboundMessage struct {
|
||||
// EventID is the platform's delivery/event identifier and MessageID
|
||||
// is the platform's message identifier. Together they back the
|
||||
// idempotency layer: a platform may redeliver the same event on
|
||||
// reconnect, and dedup keys on (installation, MessageID).
|
||||
EventID string
|
||||
MessageID string
|
||||
|
||||
// Source is the routing identity (chat, sender, thread).
|
||||
Source Source
|
||||
|
||||
// Type is the normalized message kind.
|
||||
Type MsgType
|
||||
|
||||
// Text is the human-readable content, flattened by the adapter. For
|
||||
// non-text messages it may be empty or a short placeholder; the media
|
||||
// itself is in MediaRefs.
|
||||
Text string
|
||||
|
||||
// MediaRefs are the attachments, already persisted to object storage.
|
||||
MediaRefs []MediaRef
|
||||
|
||||
// ReplyTo is the quoted/replied-to context, or nil.
|
||||
ReplyTo *ReplyCtx
|
||||
|
||||
// AddressedToBot is the adapter's normalized verdict on whether a
|
||||
// GROUP message is an interaction with the bot (@-mention or reply to
|
||||
// a bot message). It is meaningless for direct (p2p) chats and the
|
||||
// core ignores it there. It is a normalized boolean, not platform
|
||||
// data — the platform-specific signals it was derived from (mention
|
||||
// arrays, parent ids) stay in Raw.
|
||||
AddressedToBot bool
|
||||
|
||||
// Raw is the untouched platform payload. Adapters stash platform-
|
||||
// specific fields here (Lark raw msg_type / parent_id / root_id /
|
||||
// mention arrays, …) and read them back only inside the adapter. The
|
||||
// core never reads Raw — that is the whole point of the boundary.
|
||||
Raw json.RawMessage
|
||||
}
|
||||
|
||||
// OutboundMessage is the minimal outbound reply the core can ask any
|
||||
// Channel to deliver: a text body into a chat, optionally threaded or
|
||||
// quoting a specific message. Rich cards, media uploads, and outbound
|
||||
// webhooks are deliberately NOT modeled here (MUL-3515 decision §6) — an
|
||||
// adapter that supports richer output exposes it on its own type, not on
|
||||
// this cross-platform envelope.
|
||||
type OutboundMessage struct {
|
||||
// ChatID is the destination conversation (the platform chat id).
|
||||
ChatID string
|
||||
// Text is the message body.
|
||||
Text string
|
||||
// ThreadID, when set, threads the reply into the given platform
|
||||
// thread / topic. Empty sends at the chat level.
|
||||
ThreadID string
|
||||
// ReplyTo, when set, quote-replies to the given platform message id.
|
||||
ReplyTo string
|
||||
}
|
||||
|
||||
// SendResult is the outcome of Channel.Send.
|
||||
type SendResult struct {
|
||||
// MessageID is the platform's identifier for the delivered message.
|
||||
MessageID string
|
||||
}
|
||||
82
server/internal/integrations/channel/message_test.go
Normal file
82
server/internal/integrations/channel/message_test.go
Normal file
@@ -0,0 +1,82 @@
|
||||
package channel
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// TestInboundMessage_RawIsOpaque proves the boundary contract: a
|
||||
// platform-specific payload survives a JSON round-trip through Raw
|
||||
// byte-for-byte, so an adapter can stash anything there and read it back
|
||||
// while the core leaves it untouched.
|
||||
func TestInboundMessage_RawIsOpaque(t *testing.T) {
|
||||
raw := json.RawMessage(`{"msg_type":"merge_forward","parent_id":"om_x","mentions":[{"id":"ou_1"}]}`)
|
||||
in := InboundMessage{
|
||||
EventID: "evt_1",
|
||||
MessageID: "om_1",
|
||||
Source: Source{
|
||||
ChannelType: TypeFeishu,
|
||||
ChatID: "oc_1",
|
||||
ChatType: ChatTypeGroup,
|
||||
SenderID: "ou_sender",
|
||||
SenderStableID: "on_union",
|
||||
ThreadID: "omt_1",
|
||||
},
|
||||
Type: MsgTypeText,
|
||||
Text: "hello",
|
||||
AddressedToBot: true,
|
||||
ReplyTo: &ReplyCtx{MessageID: "om_parent", RootID: "om_root"},
|
||||
Raw: raw,
|
||||
}
|
||||
|
||||
encoded, err := json.Marshal(in)
|
||||
if err != nil {
|
||||
t.Fatalf("Marshal: %v", err)
|
||||
}
|
||||
var got InboundMessage
|
||||
if err := json.Unmarshal(encoded, &got); err != nil {
|
||||
t.Fatalf("Unmarshal: %v", err)
|
||||
}
|
||||
|
||||
if got.Source != in.Source {
|
||||
t.Errorf("Source round-trip mismatch:\n got %+v\nwant %+v", got.Source, in.Source)
|
||||
}
|
||||
if got.ReplyTo == nil || *got.ReplyTo != *in.ReplyTo {
|
||||
t.Errorf("ReplyTo round-trip mismatch: got %+v", got.ReplyTo)
|
||||
}
|
||||
if !got.AddressedToBot {
|
||||
t.Errorf("AddressedToBot lost in round-trip")
|
||||
}
|
||||
|
||||
// Raw must be semantically identical (compare as parsed JSON to be
|
||||
// whitespace-insensitive).
|
||||
var a, b any
|
||||
if err := json.Unmarshal(got.Raw, &a); err != nil {
|
||||
t.Fatalf("Raw not valid JSON after round-trip: %v", err)
|
||||
}
|
||||
if err := json.Unmarshal(raw, &b); err != nil {
|
||||
t.Fatalf("seed Raw not valid JSON: %v", err)
|
||||
}
|
||||
if string(mustMarshal(t, a)) != string(mustMarshal(t, b)) {
|
||||
t.Errorf("Raw payload changed across round-trip:\n got %s\nwant %s", got.Raw, raw)
|
||||
}
|
||||
}
|
||||
|
||||
func mustMarshal(t *testing.T, v any) []byte {
|
||||
t.Helper()
|
||||
b, err := json.Marshal(v)
|
||||
if err != nil {
|
||||
t.Fatalf("Marshal: %v", err)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// TestOutboundMessage_Minimal documents that the outbound envelope is the
|
||||
// minimal text reply (decision §6): a text body, optionally threaded or
|
||||
// quoting, and nothing card/media-shaped.
|
||||
func TestOutboundMessage_Minimal(t *testing.T) {
|
||||
out := OutboundMessage{ChatID: "oc_1", Text: "hi", ThreadID: "omt_1", ReplyTo: "om_1"}
|
||||
if out.ChatID == "" || out.Text == "" {
|
||||
t.Fatalf("ChatID and Text are the required fields")
|
||||
}
|
||||
}
|
||||
77
server/internal/integrations/channel/registry.go
Normal file
77
server/internal/integrations/channel/registry.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package channel
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// ErrUnknownType is returned by Registry.Build when no Factory is
|
||||
// registered for the requested Type. Callers can test for it with
|
||||
// errors.Is.
|
||||
var ErrUnknownType = fmt.Errorf("channel: no factory registered for type")
|
||||
|
||||
// Registry maps a channel Type to the Factory that builds it. Adding a
|
||||
// platform is "register a factory here", never "edit the core". The
|
||||
// Registry is safe for concurrent use.
|
||||
//
|
||||
// Registration is last-writer-wins: registering a Type that already has a
|
||||
// Factory replaces it silently. This mirrors the plugin-registry pattern
|
||||
// from the reference design (MUL-3506) where the last adapter to register
|
||||
// a type wins, so a deployment can override a built-in adapter by
|
||||
// registering its own afterwards without a removal step.
|
||||
type Registry struct {
|
||||
mu sync.RWMutex
|
||||
factories map[Type]Factory
|
||||
}
|
||||
|
||||
// NewRegistry returns an empty Registry ready for use.
|
||||
func NewRegistry() *Registry {
|
||||
return &Registry{factories: make(map[Type]Factory)}
|
||||
}
|
||||
|
||||
// Register binds factory to t, replacing any factory previously
|
||||
// registered for t (last-writer-wins). A nil factory or an empty Type is
|
||||
// ignored — registering either would only set up a guaranteed failure at
|
||||
// Build time, so the Registry refuses to record it.
|
||||
func (r *Registry) Register(t Type, factory Factory) {
|
||||
if t == "" || factory == nil {
|
||||
return
|
||||
}
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.factories[t] = factory
|
||||
}
|
||||
|
||||
// Lookup returns the Factory registered for t and whether one exists.
|
||||
func (r *Registry) Lookup(t Type) (Factory, bool) {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
factory, ok := r.factories[t]
|
||||
return factory, ok
|
||||
}
|
||||
|
||||
// Build instantiates a Channel for cfg.Type using the registered Factory.
|
||||
// It returns ErrUnknownType (wrapped, with the type name) when no Factory
|
||||
// is registered, and otherwise returns whatever the Factory returns.
|
||||
func (r *Registry) Build(cfg Config) (Channel, error) {
|
||||
factory, ok := r.Lookup(cfg.Type)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: %q", ErrUnknownType, cfg.Type)
|
||||
}
|
||||
return factory(cfg)
|
||||
}
|
||||
|
||||
// Types returns the registered types sorted lexicographically, so the
|
||||
// result is stable across calls (map iteration order is not). Useful for
|
||||
// diagnostics and for enumerating which platforms a deployment supports.
|
||||
func (r *Registry) Types() []Type {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
out := make([]Type, 0, len(r.factories))
|
||||
for t := range r.factories {
|
||||
out = append(out, t)
|
||||
}
|
||||
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
|
||||
return out
|
||||
}
|
||||
130
server/internal/integrations/channel/registry_test.go
Normal file
130
server/internal/integrations/channel/registry_test.go
Normal file
@@ -0,0 +1,130 @@
|
||||
package channel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
// fakeChannel is a minimal Channel used to assert which Factory the
|
||||
// Registry returned. id distinguishes instances built by different
|
||||
// factories.
|
||||
type fakeChannel struct {
|
||||
typ Type
|
||||
id string
|
||||
}
|
||||
|
||||
func (f fakeChannel) Type() Type { return f.typ }
|
||||
func (f fakeChannel) Connect(context.Context) error { return nil }
|
||||
func (f fakeChannel) Disconnect(context.Context) error { return nil }
|
||||
func (f fakeChannel) Send(context.Context, OutboundMessage) (SendResult, error) {
|
||||
return SendResult{}, nil
|
||||
}
|
||||
func (f fakeChannel) Capabilities() Capability { return CapText }
|
||||
|
||||
func factoryReturning(typ Type, id string) Factory {
|
||||
return func(Config) (Channel, error) { return fakeChannel{typ: typ, id: id}, nil }
|
||||
}
|
||||
|
||||
func TestRegistry_LookupAndBuild(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
r.Register(TypeFeishu, factoryReturning(TypeFeishu, "feishu-a"))
|
||||
|
||||
if _, ok := r.Lookup(TypeFeishu); !ok {
|
||||
t.Fatalf("Lookup(%q) ok = false, want true", TypeFeishu)
|
||||
}
|
||||
|
||||
ch, err := r.Build(Config{Type: TypeFeishu})
|
||||
if err != nil {
|
||||
t.Fatalf("Build returned error: %v", err)
|
||||
}
|
||||
got, ok := ch.(fakeChannel)
|
||||
if !ok {
|
||||
t.Fatalf("Build returned %T, want fakeChannel", ch)
|
||||
}
|
||||
if got.id != "feishu-a" {
|
||||
t.Fatalf("Build used factory id %q, want %q", got.id, "feishu-a")
|
||||
}
|
||||
if got.Type() != TypeFeishu {
|
||||
t.Fatalf("built channel Type() = %q, want %q", got.Type(), TypeFeishu)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegistry_LastWriterWins is the explicit acceptance item: a second
|
||||
// Register for the same Type silently replaces the first, and Build/Lookup
|
||||
// resolve to the LATEST factory.
|
||||
func TestRegistry_LastWriterWins(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
r.Register(TypeFeishu, factoryReturning(TypeFeishu, "first"))
|
||||
r.Register(TypeFeishu, factoryReturning(TypeFeishu, "second"))
|
||||
|
||||
ch, err := r.Build(Config{Type: TypeFeishu})
|
||||
if err != nil {
|
||||
t.Fatalf("Build returned error: %v", err)
|
||||
}
|
||||
if got := ch.(fakeChannel).id; got != "second" {
|
||||
t.Fatalf("last-writer-wins failed: Build used %q, want %q", got, "second")
|
||||
}
|
||||
|
||||
// Registering a type must not create duplicate Types() entries.
|
||||
if types := r.Types(); !reflect.DeepEqual(types, []Type{TypeFeishu}) {
|
||||
t.Fatalf("Types() = %v, want exactly [%q]", types, TypeFeishu)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistry_BuildUnknownType(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
_, err := r.Build(Config{Type: "nope"})
|
||||
if !errors.Is(err, ErrUnknownType) {
|
||||
t.Fatalf("Build unknown type err = %v, want errors.Is ErrUnknownType", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistry_RegisterIgnoresInvalid(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
r.Register("", factoryReturning(TypeFeishu, "x")) // empty type ignored
|
||||
r.Register(TypeFeishu, nil) // nil factory ignored
|
||||
|
||||
if _, ok := r.Lookup(""); ok {
|
||||
t.Fatalf("empty Type was registered, want ignored")
|
||||
}
|
||||
if _, ok := r.Lookup(TypeFeishu); ok {
|
||||
t.Fatalf("nil factory was registered, want ignored")
|
||||
}
|
||||
if len(r.Types()) != 0 {
|
||||
t.Fatalf("Types() = %v, want empty", r.Types())
|
||||
}
|
||||
}
|
||||
|
||||
func TestRegistry_TypesSorted(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
r.Register("wecom", factoryReturning("wecom", "w"))
|
||||
r.Register("feishu", factoryReturning("feishu", "f"))
|
||||
r.Register("slack", factoryReturning("slack", "s"))
|
||||
|
||||
got := r.Types()
|
||||
want := []Type{"feishu", "slack", "wecom"}
|
||||
if !reflect.DeepEqual(got, want) {
|
||||
t.Fatalf("Types() = %v, want sorted %v", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestRegistry_ConcurrentAccess exercises the RWMutex under the race
|
||||
// detector: concurrent Register/Lookup/Build/Types must not data-race.
|
||||
func TestRegistry_ConcurrentAccess(t *testing.T) {
|
||||
r := NewRegistry()
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 50; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
r.Register(TypeFeishu, factoryReturning(TypeFeishu, "x"))
|
||||
_, _ = r.Lookup(TypeFeishu)
|
||||
_, _ = r.Build(Config{Type: TypeFeishu})
|
||||
_ = r.Types()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
10
server/migrations/123_channel_generalization.down.sql
Normal file
10
server/migrations/123_channel_generalization.down.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
-- Reverse 123_channel_generalization.up.sql. The lark_* tables were left
|
||||
-- in place by the up migration, so rolling back only needs to drop the
|
||||
-- channel_* tables that were added.
|
||||
DROP TABLE IF EXISTS channel_binding_token;
|
||||
DROP TABLE IF EXISTS channel_outbound_card_message;
|
||||
DROP TABLE IF EXISTS channel_inbound_audit;
|
||||
DROP TABLE IF EXISTS channel_inbound_message_dedup;
|
||||
DROP TABLE IF EXISTS channel_chat_session_binding;
|
||||
DROP TABLE IF EXISTS channel_user_binding;
|
||||
DROP TABLE IF EXISTS channel_installation;
|
||||
267
server/migrations/123_channel_generalization.up.sql
Normal file
267
server/migrations/123_channel_generalization.up.sql
Normal file
@@ -0,0 +1,267 @@
|
||||
-- Generalize the Feishu/Lark-specific integration tables into
|
||||
-- platform-agnostic channel_* tables (MUL-3515, parent MUL-3506). Each
|
||||
-- lark_* table gains a `channel_type` discriminator and moves its
|
||||
-- platform-specific identifiers/config into a JSONB `config` column; the
|
||||
-- cross-platform columns stay flat. Existing Feishu rows are backfilled
|
||||
-- with channel_type='feishu'.
|
||||
--
|
||||
-- Two hard rules from the design:
|
||||
--
|
||||
-- * NO foreign keys and NO cascades (MUL-3515 §4). The lark_* tables
|
||||
-- leaned on composite FKs to enforce "a binding's workspace matches
|
||||
-- its installation" and "a binding dies when workspace membership is
|
||||
-- revoked / a chat_session is deleted". Those integrity rules now
|
||||
-- live in the application layer (the cutover PR adds the membership
|
||||
-- check + cleanup). The columns are kept so the app can still join,
|
||||
-- but the database enforces nothing.
|
||||
--
|
||||
-- * The lark_* tables are NOT dropped here — that happens in a later
|
||||
-- migration once the Go cutover has landed, so this migration can
|
||||
-- ship green on its own. This migration only ADDS channel_* and
|
||||
-- copies the data forward.
|
||||
--
|
||||
-- app_secret_encrypted is BYTEA; it is carried into the JSONB config as a
|
||||
-- base64 string (encode(...,'base64')). Go's encoding/json decodes a
|
||||
-- base64 string straight back into a []byte field, so the round-trip is
|
||||
-- symmetric and the ciphertext is never stored in plaintext.
|
||||
|
||||
-- =====================
|
||||
-- channel_installation
|
||||
-- =====================
|
||||
CREATE TABLE channel_installation (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
workspace_id UUID NOT NULL,
|
||||
agent_id UUID NOT NULL,
|
||||
channel_type TEXT NOT NULL,
|
||||
-- Platform-specific identifiers/config. For feishu:
|
||||
-- app_id, app_secret_encrypted (base64), tenant_key, bot_open_id,
|
||||
-- bot_union_id, region.
|
||||
config JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
status TEXT NOT NULL DEFAULT 'active'
|
||||
CHECK (status IN ('active', 'revoked')),
|
||||
ws_lease_token TEXT,
|
||||
ws_lease_expires_at TIMESTAMPTZ,
|
||||
installer_user_id UUID NOT NULL,
|
||||
installed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
UNIQUE (workspace_id, agent_id)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_channel_installation_workspace ON channel_installation(workspace_id);
|
||||
CREATE INDEX idx_channel_installation_agent ON channel_installation(agent_id);
|
||||
CREATE INDEX idx_channel_installation_lease ON channel_installation(ws_lease_expires_at)
|
||||
WHERE status = 'active';
|
||||
-- Routing key. Inbound events carry only the platform app identifier
|
||||
-- (Feishu app_id); the dispatcher routes on (channel_type, app_id). The
|
||||
-- functional unique index replaces the old global UNIQUE(app_id) and is
|
||||
-- scoped per channel_type. Rows without an app_id (a future channel that
|
||||
-- routes differently) store JSON null here, and Postgres allows many
|
||||
-- NULLs in a unique index, so they do not collide.
|
||||
CREATE UNIQUE INDEX idx_channel_installation_type_appid
|
||||
ON channel_installation(channel_type, (config ->> 'app_id'));
|
||||
|
||||
INSERT INTO channel_installation (
|
||||
id, workspace_id, agent_id, channel_type, config, status,
|
||||
ws_lease_token, ws_lease_expires_at, installer_user_id,
|
||||
installed_at, created_at, updated_at
|
||||
)
|
||||
SELECT
|
||||
id, workspace_id, agent_id, 'feishu',
|
||||
jsonb_strip_nulls(jsonb_build_object(
|
||||
'app_id', app_id,
|
||||
'app_secret_encrypted', encode(app_secret_encrypted, 'base64'),
|
||||
'tenant_key', tenant_key,
|
||||
'bot_open_id', bot_open_id,
|
||||
'bot_union_id', bot_union_id,
|
||||
'region', region
|
||||
)),
|
||||
status, ws_lease_token, ws_lease_expires_at, installer_user_id,
|
||||
installed_at, created_at, updated_at
|
||||
FROM lark_installation;
|
||||
|
||||
-- =====================
|
||||
-- channel_user_binding
|
||||
-- =====================
|
||||
-- channel_user_id is the platform-native, per-installation user id
|
||||
-- (Feishu open_id). union_id and any other secondary identity goes in
|
||||
-- config. The member-FK that used to make a row's existence proof of
|
||||
-- workspace membership is gone; the cutover PR validates membership in
|
||||
-- the identity check and prunes bindings on member removal.
|
||||
CREATE TABLE channel_user_binding (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
workspace_id UUID NOT NULL,
|
||||
multica_user_id UUID NOT NULL,
|
||||
installation_id UUID NOT NULL,
|
||||
channel_type TEXT NOT NULL,
|
||||
channel_user_id TEXT NOT NULL,
|
||||
config JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
bound_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
UNIQUE (installation_id, channel_user_id)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_channel_user_binding_user
|
||||
ON channel_user_binding(multica_user_id, workspace_id);
|
||||
CREATE INDEX idx_channel_user_binding_workspace_user
|
||||
ON channel_user_binding(workspace_id, channel_user_id);
|
||||
|
||||
INSERT INTO channel_user_binding (
|
||||
id, workspace_id, multica_user_id, installation_id,
|
||||
channel_type, channel_user_id, config, bound_at
|
||||
)
|
||||
SELECT
|
||||
id, workspace_id, multica_user_id, installation_id,
|
||||
'feishu', lark_open_id,
|
||||
jsonb_strip_nulls(jsonb_build_object('union_id', union_id)),
|
||||
bound_at
|
||||
FROM lark_user_binding;
|
||||
|
||||
-- =====================
|
||||
-- channel_chat_session_binding
|
||||
-- =====================
|
||||
CREATE TABLE channel_chat_session_binding (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
chat_session_id UUID NOT NULL,
|
||||
installation_id UUID NOT NULL,
|
||||
channel_type TEXT NOT NULL,
|
||||
channel_chat_id TEXT NOT NULL,
|
||||
chat_type TEXT NOT NULL
|
||||
CHECK (chat_type IN ('p2p', 'group')),
|
||||
-- Most-recent inbound trigger, so the decoupled outbound patcher can
|
||||
-- thread its reply back into the originating topic. Nullable; a NULL
|
||||
-- thread id keeps the chat-level send path.
|
||||
last_message_id TEXT,
|
||||
last_thread_id TEXT,
|
||||
config JSONB NOT NULL DEFAULT '{}'::jsonb,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
UNIQUE (installation_id, channel_chat_id),
|
||||
UNIQUE (chat_session_id)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_channel_chat_session_binding_session
|
||||
ON channel_chat_session_binding(chat_session_id);
|
||||
|
||||
INSERT INTO channel_chat_session_binding (
|
||||
id, chat_session_id, installation_id, channel_type,
|
||||
channel_chat_id, chat_type, last_message_id, last_thread_id, created_at
|
||||
)
|
||||
SELECT
|
||||
id, chat_session_id, installation_id, 'feishu',
|
||||
lark_chat_id, lark_chat_type, last_lark_message_id, last_lark_thread_id, created_at
|
||||
FROM lark_chat_session_binding;
|
||||
|
||||
-- =====================
|
||||
-- channel_inbound_message_dedup
|
||||
-- =====================
|
||||
-- Two-phase idempotency with owner fencing, unchanged in shape from
|
||||
-- lark_inbound_message_dedup (keyed per installation + message). Transient
|
||||
-- 24h cache; copied forward for completeness.
|
||||
CREATE TABLE channel_inbound_message_dedup (
|
||||
installation_id UUID NOT NULL,
|
||||
message_id TEXT NOT NULL,
|
||||
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
processed_at TIMESTAMPTZ,
|
||||
claim_token UUID NOT NULL DEFAULT gen_random_uuid(),
|
||||
PRIMARY KEY (installation_id, message_id)
|
||||
);
|
||||
|
||||
CREATE INDEX idx_channel_inbound_dedup_received
|
||||
ON channel_inbound_message_dedup(received_at);
|
||||
|
||||
INSERT INTO channel_inbound_message_dedup (
|
||||
installation_id, message_id, received_at, processed_at, claim_token
|
||||
)
|
||||
SELECT installation_id, message_id, received_at, processed_at, claim_token
|
||||
FROM lark_inbound_message_dedup;
|
||||
|
||||
-- =====================
|
||||
-- channel_inbound_audit
|
||||
-- =====================
|
||||
-- Non-content drop audit. installation_id is nullable (the old ON DELETE
|
||||
-- SET NULL is now just a nullable column the app may leave NULL).
|
||||
CREATE TABLE channel_inbound_audit (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
installation_id UUID,
|
||||
channel_type TEXT NOT NULL,
|
||||
channel_chat_id TEXT,
|
||||
event_type TEXT NOT NULL,
|
||||
channel_event_id TEXT,
|
||||
channel_message_id TEXT,
|
||||
drop_reason TEXT NOT NULL,
|
||||
received_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE INDEX idx_channel_inbound_audit_installation
|
||||
ON channel_inbound_audit(installation_id, received_at DESC);
|
||||
CREATE INDEX idx_channel_inbound_audit_reason
|
||||
ON channel_inbound_audit(drop_reason, received_at DESC);
|
||||
|
||||
INSERT INTO channel_inbound_audit (
|
||||
id, installation_id, channel_type, channel_chat_id, event_type,
|
||||
channel_event_id, channel_message_id, drop_reason, received_at
|
||||
)
|
||||
SELECT
|
||||
id, installation_id, 'feishu', lark_chat_id, event_type,
|
||||
lark_event_id, lark_message_id, drop_reason, received_at
|
||||
FROM lark_inbound_audit;
|
||||
|
||||
-- =====================
|
||||
-- channel_outbound_card_message
|
||||
-- =====================
|
||||
CREATE TABLE channel_outbound_card_message (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
chat_session_id UUID NOT NULL,
|
||||
task_id UUID,
|
||||
channel_type TEXT NOT NULL,
|
||||
channel_chat_id TEXT NOT NULL,
|
||||
channel_card_message_id TEXT NOT NULL,
|
||||
status TEXT NOT NULL DEFAULT 'pending'
|
||||
CHECK (status IN ('pending', 'streaming', 'final', 'error')),
|
||||
last_patched_at TIMESTAMPTZ,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
|
||||
CREATE UNIQUE INDEX idx_channel_outbound_card_task
|
||||
ON channel_outbound_card_message(task_id)
|
||||
WHERE task_id IS NOT NULL;
|
||||
CREATE INDEX idx_channel_outbound_card_session
|
||||
ON channel_outbound_card_message(chat_session_id, created_at DESC);
|
||||
|
||||
INSERT INTO channel_outbound_card_message (
|
||||
id, chat_session_id, task_id, channel_type, channel_chat_id,
|
||||
channel_card_message_id, status, last_patched_at, created_at
|
||||
)
|
||||
SELECT
|
||||
id, chat_session_id, task_id, 'feishu', lark_chat_id,
|
||||
lark_card_message_id, status, last_patched_at, created_at
|
||||
FROM lark_outbound_card_message;
|
||||
|
||||
-- =====================
|
||||
-- channel_binding_token
|
||||
-- =====================
|
||||
CREATE TABLE channel_binding_token (
|
||||
token_hash TEXT PRIMARY KEY,
|
||||
workspace_id UUID NOT NULL,
|
||||
installation_id UUID NOT NULL,
|
||||
channel_type TEXT NOT NULL,
|
||||
channel_user_id TEXT NOT NULL,
|
||||
expires_at TIMESTAMPTZ NOT NULL,
|
||||
consumed_at TIMESTAMPTZ,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
-- Keep the product TTL cap in lockstep with channel.BindingTokenTTL
|
||||
-- (15 minutes), same as the old lark_binding_token CHECK.
|
||||
CONSTRAINT channel_binding_token_ttl_cap
|
||||
CHECK (expires_at <= created_at + INTERVAL '15 minutes')
|
||||
);
|
||||
|
||||
CREATE INDEX idx_channel_binding_token_installation
|
||||
ON channel_binding_token(installation_id, expires_at);
|
||||
|
||||
INSERT INTO channel_binding_token (
|
||||
token_hash, workspace_id, installation_id, channel_type,
|
||||
channel_user_id, expires_at, consumed_at, created_at
|
||||
)
|
||||
SELECT
|
||||
token_hash, workspace_id, installation_id, 'feishu',
|
||||
lark_open_id, expires_at, consumed_at, created_at
|
||||
FROM lark_binding_token;
|
||||
970
server/pkg/db/generated/channel.sql.go
Normal file
970
server/pkg/db/generated/channel.sql.go
Normal file
@@ -0,0 +1,970 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.31.1
|
||||
// source: channel.sql
|
||||
|
||||
package db
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
const acquireChannelWSLease = `-- name: AcquireChannelWSLease :one
|
||||
UPDATE channel_installation
|
||||
SET ws_lease_token = $1,
|
||||
ws_lease_expires_at = $2,
|
||||
updated_at = now()
|
||||
WHERE id = $3
|
||||
AND status = 'active'
|
||||
AND (
|
||||
ws_lease_token IS NULL
|
||||
OR ws_lease_expires_at < now()
|
||||
OR ws_lease_token = $1
|
||||
)
|
||||
RETURNING id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at
|
||||
`
|
||||
|
||||
type AcquireChannelWSLeaseParams struct {
|
||||
NewToken pgtype.Text `json:"new_token"`
|
||||
NewExpiresAt pgtype.Timestamptz `json:"new_expires_at"`
|
||||
ID pgtype.UUID `json:"id"`
|
||||
}
|
||||
|
||||
// Atomically claims the WebSocket lease. CAS predicate accepts when no
|
||||
// holder exists, the holder expired, or the holder is us (renewal).
|
||||
func (q *Queries) AcquireChannelWSLease(ctx context.Context, arg AcquireChannelWSLeaseParams) (ChannelInstallation, error) {
|
||||
row := q.db.QueryRow(ctx, acquireChannelWSLease, arg.NewToken, arg.NewExpiresAt, arg.ID)
|
||||
var i ChannelInstallation
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.ChannelType,
|
||||
&i.Config,
|
||||
&i.Status,
|
||||
&i.WsLeaseToken,
|
||||
&i.WsLeaseExpiresAt,
|
||||
&i.InstallerUserID,
|
||||
&i.InstalledAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const backfillChannelInstallationRegionToFeishuLark = `-- name: BackfillChannelInstallationRegionToFeishuLark :execrows
|
||||
UPDATE channel_installation
|
||||
SET config = jsonb_set(config, '{region}', '"lark"'),
|
||||
updated_at = now()
|
||||
WHERE channel_type = 'feishu'
|
||||
AND config ->> 'region' = 'feishu'
|
||||
`
|
||||
|
||||
// Operator repair, feishu-only: flip every feishu installation still
|
||||
// carrying region='feishu' to 'lark'. Called only on deployments whose
|
||||
// legacy global base-URL override pointed at Lark international. Idempotent.
|
||||
func (q *Queries) BackfillChannelInstallationRegionToFeishuLark(ctx context.Context) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, backfillChannelInstallationRegionToFeishuLark)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
const claimChannelInboundDedup = `-- name: ClaimChannelInboundDedup :one
|
||||
|
||||
INSERT INTO channel_inbound_message_dedup (installation_id, message_id, claim_token)
|
||||
VALUES ($1, $2, gen_random_uuid())
|
||||
ON CONFLICT (installation_id, message_id) DO UPDATE
|
||||
SET received_at = now(),
|
||||
claim_token = gen_random_uuid()
|
||||
WHERE channel_inbound_message_dedup.processed_at IS NULL
|
||||
AND channel_inbound_message_dedup.received_at < now() - INTERVAL '60 seconds'
|
||||
RETURNING installation_id, message_id, received_at, processed_at, claim_token
|
||||
`
|
||||
|
||||
type ClaimChannelInboundDedupParams struct {
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
MessageID string `json:"message_id"`
|
||||
}
|
||||
|
||||
// =====================
|
||||
// channel_inbound_message_dedup
|
||||
// =====================
|
||||
// Two-phase idempotency gate with owner fencing. Returns the row when a
|
||||
// claim is acquired (fresh insert, or stale-reclaim of an in-flight claim
|
||||
// older than 60s); returns no rows when terminal (processed) or actively
|
||||
// in-flight. Every claim mints a fresh claim_token; Mark/Release are
|
||||
// fenced on it. See the table comment in migration 123 / the lark
|
||||
// predecessor for the full invariant set.
|
||||
func (q *Queries) ClaimChannelInboundDedup(ctx context.Context, arg ClaimChannelInboundDedupParams) (ChannelInboundMessageDedup, error) {
|
||||
row := q.db.QueryRow(ctx, claimChannelInboundDedup, arg.InstallationID, arg.MessageID)
|
||||
var i ChannelInboundMessageDedup
|
||||
err := row.Scan(
|
||||
&i.InstallationID,
|
||||
&i.MessageID,
|
||||
&i.ReceivedAt,
|
||||
&i.ProcessedAt,
|
||||
&i.ClaimToken,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const consumeChannelBindingToken = `-- name: ConsumeChannelBindingToken :one
|
||||
UPDATE channel_binding_token
|
||||
SET consumed_at = now()
|
||||
WHERE token_hash = $1
|
||||
AND consumed_at IS NULL
|
||||
AND expires_at > now()
|
||||
RETURNING token_hash, workspace_id, installation_id, channel_type, channel_user_id, expires_at, consumed_at, created_at
|
||||
`
|
||||
|
||||
// Atomic redemption: returns the row only if the hash exists, is
|
||||
// unconsumed, and unexpired. Two simultaneous redemptions cannot both win.
|
||||
func (q *Queries) ConsumeChannelBindingToken(ctx context.Context, tokenHash string) (ChannelBindingToken, error) {
|
||||
row := q.db.QueryRow(ctx, consumeChannelBindingToken, tokenHash)
|
||||
var i ChannelBindingToken
|
||||
err := row.Scan(
|
||||
&i.TokenHash,
|
||||
&i.WorkspaceID,
|
||||
&i.InstallationID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelUserID,
|
||||
&i.ExpiresAt,
|
||||
&i.ConsumedAt,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const createChannelBindingToken = `-- name: CreateChannelBindingToken :one
|
||||
|
||||
INSERT INTO channel_binding_token (
|
||||
token_hash, workspace_id, installation_id, channel_type,
|
||||
channel_user_id, expires_at
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6
|
||||
)
|
||||
RETURNING token_hash, workspace_id, installation_id, channel_type, channel_user_id, expires_at, consumed_at, created_at
|
||||
`
|
||||
|
||||
type CreateChannelBindingTokenParams struct {
|
||||
TokenHash string `json:"token_hash"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelUserID string `json:"channel_user_id"`
|
||||
ExpiresAt pgtype.Timestamptz `json:"expires_at"`
|
||||
}
|
||||
|
||||
// =====================
|
||||
// channel_binding_token
|
||||
// =====================
|
||||
// Mints a single-use binding token for an unbound platform user. TTL cap
|
||||
// (15 min) enforced by the table CHECK in lockstep with
|
||||
// channel.BindingTokenTTL. The HASH is stored, never the raw token.
|
||||
func (q *Queries) CreateChannelBindingToken(ctx context.Context, arg CreateChannelBindingTokenParams) (ChannelBindingToken, error) {
|
||||
row := q.db.QueryRow(ctx, createChannelBindingToken,
|
||||
arg.TokenHash,
|
||||
arg.WorkspaceID,
|
||||
arg.InstallationID,
|
||||
arg.ChannelType,
|
||||
arg.ChannelUserID,
|
||||
arg.ExpiresAt,
|
||||
)
|
||||
var i ChannelBindingToken
|
||||
err := row.Scan(
|
||||
&i.TokenHash,
|
||||
&i.WorkspaceID,
|
||||
&i.InstallationID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelUserID,
|
||||
&i.ExpiresAt,
|
||||
&i.ConsumedAt,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const createChannelChatSessionBinding = `-- name: CreateChannelChatSessionBinding :one
|
||||
|
||||
INSERT INTO channel_chat_session_binding (
|
||||
chat_session_id, installation_id, channel_type, channel_chat_id, chat_type
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5
|
||||
)
|
||||
RETURNING id, chat_session_id, installation_id, channel_type, channel_chat_id, chat_type, last_message_id, last_thread_id, config, created_at
|
||||
`
|
||||
|
||||
type CreateChannelChatSessionBindingParams struct {
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelChatID string `json:"channel_chat_id"`
|
||||
ChatType string `json:"chat_type"`
|
||||
}
|
||||
|
||||
// =====================
|
||||
// channel_chat_session_binding
|
||||
// =====================
|
||||
func (q *Queries) CreateChannelChatSessionBinding(ctx context.Context, arg CreateChannelChatSessionBindingParams) (ChannelChatSessionBinding, error) {
|
||||
row := q.db.QueryRow(ctx, createChannelChatSessionBinding,
|
||||
arg.ChatSessionID,
|
||||
arg.InstallationID,
|
||||
arg.ChannelType,
|
||||
arg.ChannelChatID,
|
||||
arg.ChatType,
|
||||
)
|
||||
var i ChannelChatSessionBinding
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.ChatSessionID,
|
||||
&i.InstallationID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelChatID,
|
||||
&i.ChatType,
|
||||
&i.LastMessageID,
|
||||
&i.LastThreadID,
|
||||
&i.Config,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const createChannelOutboundCardMessage = `-- name: CreateChannelOutboundCardMessage :one
|
||||
|
||||
INSERT INTO channel_outbound_card_message (
|
||||
chat_session_id, task_id, channel_type, channel_chat_id,
|
||||
channel_card_message_id, status
|
||||
) VALUES (
|
||||
$1, $6, $2, $3, $4, $5
|
||||
)
|
||||
RETURNING id, chat_session_id, task_id, channel_type, channel_chat_id, channel_card_message_id, status, last_patched_at, created_at
|
||||
`
|
||||
|
||||
type CreateChannelOutboundCardMessageParams struct {
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelChatID string `json:"channel_chat_id"`
|
||||
ChannelCardMessageID string `json:"channel_card_message_id"`
|
||||
Status string `json:"status"`
|
||||
TaskID pgtype.UUID `json:"task_id"`
|
||||
}
|
||||
|
||||
// =====================
|
||||
// channel_outbound_card_message
|
||||
// =====================
|
||||
func (q *Queries) CreateChannelOutboundCardMessage(ctx context.Context, arg CreateChannelOutboundCardMessageParams) (ChannelOutboundCardMessage, error) {
|
||||
row := q.db.QueryRow(ctx, createChannelOutboundCardMessage,
|
||||
arg.ChatSessionID,
|
||||
arg.ChannelType,
|
||||
arg.ChannelChatID,
|
||||
arg.ChannelCardMessageID,
|
||||
arg.Status,
|
||||
arg.TaskID,
|
||||
)
|
||||
var i ChannelOutboundCardMessage
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.ChatSessionID,
|
||||
&i.TaskID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelChatID,
|
||||
&i.ChannelCardMessageID,
|
||||
&i.Status,
|
||||
&i.LastPatchedAt,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const createChannelUserBinding = `-- name: CreateChannelUserBinding :one
|
||||
|
||||
INSERT INTO channel_user_binding (
|
||||
workspace_id, multica_user_id, installation_id,
|
||||
channel_type, channel_user_id, config
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6
|
||||
)
|
||||
ON CONFLICT (installation_id, channel_user_id) DO UPDATE SET
|
||||
config = channel_user_binding.config || EXCLUDED.config,
|
||||
bound_at = now()
|
||||
WHERE channel_user_binding.multica_user_id = EXCLUDED.multica_user_id
|
||||
RETURNING id, workspace_id, multica_user_id, installation_id, channel_type, channel_user_id, config, bound_at
|
||||
`
|
||||
|
||||
type CreateChannelUserBindingParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
MulticaUserID pgtype.UUID `json:"multica_user_id"`
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelUserID string `json:"channel_user_id"`
|
||||
Config []byte `json:"config"`
|
||||
}
|
||||
|
||||
// =====================
|
||||
// channel_user_binding
|
||||
// =====================
|
||||
// Records that a platform user id (per-installation; Feishu open_id) maps
|
||||
// to a Multica user. The old composite member-FK is gone, so this no
|
||||
// longer fails when the redeemer is not a workspace member — the caller
|
||||
// (BindingTokenService.RedeemAndBind) validates membership explicitly
|
||||
// before calling. ON CONFLICT DO UPDATE is still gated on multica_user_id
|
||||
// matching, so a second redeemer cannot steal an already-bound user id;
|
||||
// a cross-user conflict updates zero rows and the caller maps that to
|
||||
// ErrBindingAlreadyAssigned. config carries secondary identity (union_id).
|
||||
func (q *Queries) CreateChannelUserBinding(ctx context.Context, arg CreateChannelUserBindingParams) (ChannelUserBinding, error) {
|
||||
row := q.db.QueryRow(ctx, createChannelUserBinding,
|
||||
arg.WorkspaceID,
|
||||
arg.MulticaUserID,
|
||||
arg.InstallationID,
|
||||
arg.ChannelType,
|
||||
arg.ChannelUserID,
|
||||
arg.Config,
|
||||
)
|
||||
var i ChannelUserBinding
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.MulticaUserID,
|
||||
&i.InstallationID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelUserID,
|
||||
&i.Config,
|
||||
&i.BoundAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const deleteChannelChatSessionBindingBySession = `-- name: DeleteChannelChatSessionBindingBySession :exec
|
||||
DELETE FROM channel_chat_session_binding
|
||||
WHERE chat_session_id = $1
|
||||
`
|
||||
|
||||
// Application-layer integrity (replaces the old chat_session-FK ON DELETE
|
||||
// CASCADE): drop the binding when its chat_session is deleted.
|
||||
func (q *Queries) DeleteChannelChatSessionBindingBySession(ctx context.Context, chatSessionID pgtype.UUID) error {
|
||||
_, err := q.db.Exec(ctx, deleteChannelChatSessionBindingBySession, chatSessionID)
|
||||
return err
|
||||
}
|
||||
|
||||
const deleteChannelUserBindingsByWorkspaceMember = `-- name: DeleteChannelUserBindingsByWorkspaceMember :exec
|
||||
DELETE FROM channel_user_binding
|
||||
WHERE workspace_id = $1 AND multica_user_id = $2
|
||||
`
|
||||
|
||||
type DeleteChannelUserBindingsByWorkspaceMemberParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
MulticaUserID pgtype.UUID `json:"multica_user_id"`
|
||||
}
|
||||
|
||||
// Application-layer integrity (replaces the old member-FK ON DELETE
|
||||
// CASCADE): prune every binding for a user who has been removed from a
|
||||
// workspace, across all installations in that workspace.
|
||||
func (q *Queries) DeleteChannelUserBindingsByWorkspaceMember(ctx context.Context, arg DeleteChannelUserBindingsByWorkspaceMemberParams) error {
|
||||
_, err := q.db.Exec(ctx, deleteChannelUserBindingsByWorkspaceMember, arg.WorkspaceID, arg.MulticaUserID)
|
||||
return err
|
||||
}
|
||||
|
||||
const getChannelChatSessionBinding = `-- name: GetChannelChatSessionBinding :one
|
||||
SELECT id, chat_session_id, installation_id, channel_type, channel_chat_id, chat_type, last_message_id, last_thread_id, config, created_at FROM channel_chat_session_binding
|
||||
WHERE installation_id = $1 AND channel_chat_id = $2
|
||||
`
|
||||
|
||||
type GetChannelChatSessionBindingParams struct {
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelChatID string `json:"channel_chat_id"`
|
||||
}
|
||||
|
||||
// Lookup-by-channel-chat: the inbound dispatcher finds the existing
|
||||
// chat_session before deciding whether to create one.
|
||||
func (q *Queries) GetChannelChatSessionBinding(ctx context.Context, arg GetChannelChatSessionBindingParams) (ChannelChatSessionBinding, error) {
|
||||
row := q.db.QueryRow(ctx, getChannelChatSessionBinding, arg.InstallationID, arg.ChannelChatID)
|
||||
var i ChannelChatSessionBinding
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.ChatSessionID,
|
||||
&i.InstallationID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelChatID,
|
||||
&i.ChatType,
|
||||
&i.LastMessageID,
|
||||
&i.LastThreadID,
|
||||
&i.Config,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getChannelChatSessionBindingBySession = `-- name: GetChannelChatSessionBindingBySession :one
|
||||
SELECT id, chat_session_id, installation_id, channel_type, channel_chat_id, chat_type, last_message_id, last_thread_id, config, created_at FROM channel_chat_session_binding
|
||||
WHERE chat_session_id = $1
|
||||
`
|
||||
|
||||
// Reverse lookup for the outbound patcher: given a chat_session_id, find
|
||||
// its channel binding to know which (installation, chat_id) to send to.
|
||||
func (q *Queries) GetChannelChatSessionBindingBySession(ctx context.Context, chatSessionID pgtype.UUID) (ChannelChatSessionBinding, error) {
|
||||
row := q.db.QueryRow(ctx, getChannelChatSessionBindingBySession, chatSessionID)
|
||||
var i ChannelChatSessionBinding
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.ChatSessionID,
|
||||
&i.InstallationID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelChatID,
|
||||
&i.ChatType,
|
||||
&i.LastMessageID,
|
||||
&i.LastThreadID,
|
||||
&i.Config,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getChannelInstallation = `-- name: GetChannelInstallation :one
|
||||
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation WHERE id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) GetChannelInstallation(ctx context.Context, id pgtype.UUID) (ChannelInstallation, error) {
|
||||
row := q.db.QueryRow(ctx, getChannelInstallation, id)
|
||||
var i ChannelInstallation
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.ChannelType,
|
||||
&i.Config,
|
||||
&i.Status,
|
||||
&i.WsLeaseToken,
|
||||
&i.WsLeaseExpiresAt,
|
||||
&i.InstallerUserID,
|
||||
&i.InstalledAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getChannelInstallationByAppID = `-- name: GetChannelInstallationByAppID :one
|
||||
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation
|
||||
WHERE channel_type = $1 AND config ->> 'app_id' = $2
|
||||
`
|
||||
|
||||
type GetChannelInstallationByAppIDParams struct {
|
||||
ChannelType string `json:"channel_type"`
|
||||
Config []byte `json:"config"`
|
||||
}
|
||||
|
||||
// Inbound routing. The platform event carries only the channel's app
|
||||
// identifier (Feishu app_id); the dispatcher's installation resolver routes
|
||||
// on (channel_type, config->>'app_id'). Backed by the functional unique
|
||||
// index idx_channel_installation_type_appid.
|
||||
func (q *Queries) GetChannelInstallationByAppID(ctx context.Context, arg GetChannelInstallationByAppIDParams) (ChannelInstallation, error) {
|
||||
row := q.db.QueryRow(ctx, getChannelInstallationByAppID, arg.ChannelType, arg.Config)
|
||||
var i ChannelInstallation
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.ChannelType,
|
||||
&i.Config,
|
||||
&i.Status,
|
||||
&i.WsLeaseToken,
|
||||
&i.WsLeaseExpiresAt,
|
||||
&i.InstallerUserID,
|
||||
&i.InstalledAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getChannelInstallationInWorkspace = `-- name: GetChannelInstallationInWorkspace :one
|
||||
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation
|
||||
WHERE id = $1 AND workspace_id = $2
|
||||
`
|
||||
|
||||
type GetChannelInstallationInWorkspaceParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
}
|
||||
|
||||
func (q *Queries) GetChannelInstallationInWorkspace(ctx context.Context, arg GetChannelInstallationInWorkspaceParams) (ChannelInstallation, error) {
|
||||
row := q.db.QueryRow(ctx, getChannelInstallationInWorkspace, arg.ID, arg.WorkspaceID)
|
||||
var i ChannelInstallation
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.ChannelType,
|
||||
&i.Config,
|
||||
&i.Status,
|
||||
&i.WsLeaseToken,
|
||||
&i.WsLeaseExpiresAt,
|
||||
&i.InstallerUserID,
|
||||
&i.InstalledAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getChannelOutboundCardByTask = `-- name: GetChannelOutboundCardByTask :one
|
||||
SELECT id, chat_session_id, task_id, channel_type, channel_chat_id, channel_card_message_id, status, last_patched_at, created_at FROM channel_outbound_card_message
|
||||
WHERE task_id = $1
|
||||
`
|
||||
|
||||
// The partial unique index on (task_id) WHERE task_id IS NOT NULL
|
||||
// guarantees at most one row.
|
||||
func (q *Queries) GetChannelOutboundCardByTask(ctx context.Context, taskID pgtype.UUID) (ChannelOutboundCardMessage, error) {
|
||||
row := q.db.QueryRow(ctx, getChannelOutboundCardByTask, taskID)
|
||||
var i ChannelOutboundCardMessage
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.ChatSessionID,
|
||||
&i.TaskID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelChatID,
|
||||
&i.ChannelCardMessageID,
|
||||
&i.Status,
|
||||
&i.LastPatchedAt,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const getChannelUserBindingByUserID = `-- name: GetChannelUserBindingByUserID :one
|
||||
SELECT id, workspace_id, multica_user_id, installation_id, channel_type, channel_user_id, config, bound_at FROM channel_user_binding
|
||||
WHERE installation_id = $1 AND channel_user_id = $2
|
||||
`
|
||||
|
||||
type GetChannelUserBindingByUserIDParams struct {
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelUserID string `json:"channel_user_id"`
|
||||
}
|
||||
|
||||
// The inbound identity lookup: does this platform user id map to a Multica
|
||||
// user for this installation? With the member-FK removed, a row's
|
||||
// existence no longer proves current workspace membership — the dispatcher
|
||||
// re-checks membership after this lookup.
|
||||
func (q *Queries) GetChannelUserBindingByUserID(ctx context.Context, arg GetChannelUserBindingByUserIDParams) (ChannelUserBinding, error) {
|
||||
row := q.db.QueryRow(ctx, getChannelUserBindingByUserID, arg.InstallationID, arg.ChannelUserID)
|
||||
var i ChannelUserBinding
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.MulticaUserID,
|
||||
&i.InstallationID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelUserID,
|
||||
&i.Config,
|
||||
&i.BoundAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const listActiveChannelInstallations = `-- name: ListActiveChannelInstallations :many
|
||||
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation
|
||||
WHERE status = 'active'
|
||||
ORDER BY created_at ASC
|
||||
`
|
||||
|
||||
// Boot path for the inbound hub: every active installation, any channel
|
||||
// type, so the hub can claim leases and open connections.
|
||||
func (q *Queries) ListActiveChannelInstallations(ctx context.Context) ([]ChannelInstallation, error) {
|
||||
rows, err := q.db.Query(ctx, listActiveChannelInstallations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ChannelInstallation{}
|
||||
for rows.Next() {
|
||||
var i ChannelInstallation
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.ChannelType,
|
||||
&i.Config,
|
||||
&i.Status,
|
||||
&i.WsLeaseToken,
|
||||
&i.WsLeaseExpiresAt,
|
||||
&i.InstallerUserID,
|
||||
&i.InstalledAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listChannelInboundAuditByInstallation = `-- name: ListChannelInboundAuditByInstallation :many
|
||||
SELECT id, installation_id, channel_type, channel_chat_id, event_type, channel_event_id, channel_message_id, drop_reason, received_at FROM channel_inbound_audit
|
||||
WHERE installation_id = $1
|
||||
ORDER BY received_at DESC
|
||||
LIMIT $2 OFFSET $3
|
||||
`
|
||||
|
||||
type ListChannelInboundAuditByInstallationParams struct {
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
Limit int32 `json:"limit"`
|
||||
Offset int32 `json:"offset"`
|
||||
}
|
||||
|
||||
func (q *Queries) ListChannelInboundAuditByInstallation(ctx context.Context, arg ListChannelInboundAuditByInstallationParams) ([]ChannelInboundAudit, error) {
|
||||
rows, err := q.db.Query(ctx, listChannelInboundAuditByInstallation, arg.InstallationID, arg.Limit, arg.Offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ChannelInboundAudit{}
|
||||
for rows.Next() {
|
||||
var i ChannelInboundAudit
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.InstallationID,
|
||||
&i.ChannelType,
|
||||
&i.ChannelChatID,
|
||||
&i.EventType,
|
||||
&i.ChannelEventID,
|
||||
&i.ChannelMessageID,
|
||||
&i.DropReason,
|
||||
&i.ReceivedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listChannelInstallationsByWorkspace = `-- name: ListChannelInstallationsByWorkspace :many
|
||||
SELECT id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at FROM channel_installation
|
||||
WHERE workspace_id = $1
|
||||
ORDER BY created_at ASC
|
||||
`
|
||||
|
||||
func (q *Queries) ListChannelInstallationsByWorkspace(ctx context.Context, workspaceID pgtype.UUID) ([]ChannelInstallation, error) {
|
||||
rows, err := q.db.Query(ctx, listChannelInstallationsByWorkspace, workspaceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
items := []ChannelInstallation{}
|
||||
for rows.Next() {
|
||||
var i ChannelInstallation
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.ChannelType,
|
||||
&i.Config,
|
||||
&i.Status,
|
||||
&i.WsLeaseToken,
|
||||
&i.WsLeaseExpiresAt,
|
||||
&i.InstallerUserID,
|
||||
&i.InstalledAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const markChannelInboundDedupProcessed = `-- name: MarkChannelInboundDedupProcessed :execrows
|
||||
UPDATE channel_inbound_message_dedup
|
||||
SET processed_at = now()
|
||||
WHERE installation_id = $1
|
||||
AND message_id = $2
|
||||
AND claim_token = $3
|
||||
AND processed_at IS NULL
|
||||
`
|
||||
|
||||
type MarkChannelInboundDedupProcessedParams struct {
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
MessageID string `json:"message_id"`
|
||||
ClaimToken pgtype.UUID `json:"claim_token"`
|
||||
}
|
||||
|
||||
// Locks a claim in as permanently processed after a durable outcome.
|
||||
// Invoked inside the chat_message tx (via qtx) on the ingest path so the
|
||||
// durable write and the Mark commit atomically. Token mismatch returns
|
||||
// zero rows (a reclaim happened); the caller rolls back its in-tx write.
|
||||
func (q *Queries) MarkChannelInboundDedupProcessed(ctx context.Context, arg MarkChannelInboundDedupProcessedParams) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, markChannelInboundDedupProcessed, arg.InstallationID, arg.MessageID, arg.ClaimToken)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
const purgeChannelInboundDedup = `-- name: PurgeChannelInboundDedup :exec
|
||||
DELETE FROM channel_inbound_message_dedup
|
||||
WHERE received_at < $1
|
||||
`
|
||||
|
||||
// Vacuum job: remove dedup rows older than the supplied cutoff (e.g. 24h).
|
||||
func (q *Queries) PurgeChannelInboundDedup(ctx context.Context, receivedAt pgtype.Timestamptz) error {
|
||||
_, err := q.db.Exec(ctx, purgeChannelInboundDedup, receivedAt)
|
||||
return err
|
||||
}
|
||||
|
||||
const purgeExpiredChannelBindingTokens = `-- name: PurgeExpiredChannelBindingTokens :exec
|
||||
DELETE FROM channel_binding_token
|
||||
WHERE expires_at < $1
|
||||
`
|
||||
|
||||
func (q *Queries) PurgeExpiredChannelBindingTokens(ctx context.Context, expiresAt pgtype.Timestamptz) error {
|
||||
_, err := q.db.Exec(ctx, purgeExpiredChannelBindingTokens, expiresAt)
|
||||
return err
|
||||
}
|
||||
|
||||
const recordChannelInboundDrop = `-- name: RecordChannelInboundDrop :exec
|
||||
|
||||
INSERT INTO channel_inbound_audit (
|
||||
installation_id, channel_type, channel_chat_id, event_type,
|
||||
channel_event_id, channel_message_id, drop_reason
|
||||
) VALUES (
|
||||
$4,
|
||||
$1,
|
||||
$5,
|
||||
$2,
|
||||
$6,
|
||||
$7,
|
||||
$3
|
||||
)
|
||||
`
|
||||
|
||||
type RecordChannelInboundDropParams struct {
|
||||
ChannelType string `json:"channel_type"`
|
||||
EventType string `json:"event_type"`
|
||||
DropReason string `json:"drop_reason"`
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelChatID pgtype.Text `json:"channel_chat_id"`
|
||||
ChannelEventID pgtype.Text `json:"channel_event_id"`
|
||||
ChannelMessageID pgtype.Text `json:"channel_message_id"`
|
||||
}
|
||||
|
||||
// =====================
|
||||
// channel_inbound_audit
|
||||
// =====================
|
||||
// The only write path for dropped events. Deliberately carries no body
|
||||
// column — only routing / identity / drop_reason / timestamp.
|
||||
func (q *Queries) RecordChannelInboundDrop(ctx context.Context, arg RecordChannelInboundDropParams) error {
|
||||
_, err := q.db.Exec(ctx, recordChannelInboundDrop,
|
||||
arg.ChannelType,
|
||||
arg.EventType,
|
||||
arg.DropReason,
|
||||
arg.InstallationID,
|
||||
arg.ChannelChatID,
|
||||
arg.ChannelEventID,
|
||||
arg.ChannelMessageID,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const releaseChannelInboundDedup = `-- name: ReleaseChannelInboundDedup :execrows
|
||||
DELETE FROM channel_inbound_message_dedup
|
||||
WHERE installation_id = $1
|
||||
AND message_id = $2
|
||||
AND claim_token = $3
|
||||
AND processed_at IS NULL
|
||||
`
|
||||
|
||||
type ReleaseChannelInboundDedupParams struct {
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
MessageID string `json:"message_id"`
|
||||
ClaimToken pgtype.UUID `json:"claim_token"`
|
||||
}
|
||||
|
||||
// Releases an in-flight claim when an infra error occurred before any
|
||||
// durable side effect, so a retry can re-acquire immediately. Fenced on
|
||||
// processed_at IS NULL and claim_token.
|
||||
func (q *Queries) ReleaseChannelInboundDedup(ctx context.Context, arg ReleaseChannelInboundDedupParams) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, releaseChannelInboundDedup, arg.InstallationID, arg.MessageID, arg.ClaimToken)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
const releaseChannelWSLease = `-- name: ReleaseChannelWSLease :exec
|
||||
UPDATE channel_installation
|
||||
SET ws_lease_token = NULL,
|
||||
ws_lease_expires_at = NULL,
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
AND ws_lease_token = $2
|
||||
`
|
||||
|
||||
type ReleaseChannelWSLeaseParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
CurrentToken pgtype.Text `json:"current_token"`
|
||||
}
|
||||
|
||||
// Drops the lease iff we are still the holder.
|
||||
func (q *Queries) ReleaseChannelWSLease(ctx context.Context, arg ReleaseChannelWSLeaseParams) error {
|
||||
_, err := q.db.Exec(ctx, releaseChannelWSLease, arg.ID, arg.CurrentToken)
|
||||
return err
|
||||
}
|
||||
|
||||
const setChannelInstallationConfig = `-- name: SetChannelInstallationConfig :exec
|
||||
UPDATE channel_installation
|
||||
SET config = $2, updated_at = now()
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
type SetChannelInstallationConfigParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Config []byte `json:"config"`
|
||||
}
|
||||
|
||||
// Replaces the whole config blob for one installation. Used by the
|
||||
// operator backfills (e.g. setting a freshly-fetched bot_union_id) that
|
||||
// read-modify-write the JSON in Go and persist it back atomically by id.
|
||||
func (q *Queries) SetChannelInstallationConfig(ctx context.Context, arg SetChannelInstallationConfigParams) error {
|
||||
_, err := q.db.Exec(ctx, setChannelInstallationConfig, arg.ID, arg.Config)
|
||||
return err
|
||||
}
|
||||
|
||||
const setChannelInstallationStatus = `-- name: SetChannelInstallationStatus :exec
|
||||
UPDATE channel_installation
|
||||
SET status = $2, updated_at = now()
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
type SetChannelInstallationStatusParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
func (q *Queries) SetChannelInstallationStatus(ctx context.Context, arg SetChannelInstallationStatusParams) error {
|
||||
_, err := q.db.Exec(ctx, setChannelInstallationStatus, arg.ID, arg.Status)
|
||||
return err
|
||||
}
|
||||
|
||||
const updateChannelChatSessionBindingReplyTarget = `-- name: UpdateChannelChatSessionBindingReplyTarget :exec
|
||||
UPDATE channel_chat_session_binding
|
||||
SET last_message_id = $2,
|
||||
last_thread_id = $3
|
||||
WHERE chat_session_id = $1
|
||||
`
|
||||
|
||||
type UpdateChannelChatSessionBindingReplyTargetParams struct {
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
LastMessageID pgtype.Text `json:"last_message_id"`
|
||||
LastThreadID pgtype.Text `json:"last_thread_id"`
|
||||
}
|
||||
|
||||
// Records the most recent inbound trigger message + thread so the decoupled
|
||||
// outbound patcher can thread its reply back into the originating topic.
|
||||
func (q *Queries) UpdateChannelChatSessionBindingReplyTarget(ctx context.Context, arg UpdateChannelChatSessionBindingReplyTargetParams) error {
|
||||
_, err := q.db.Exec(ctx, updateChannelChatSessionBindingReplyTarget, arg.ChatSessionID, arg.LastMessageID, arg.LastThreadID)
|
||||
return err
|
||||
}
|
||||
|
||||
const updateChannelOutboundCardStatus = `-- name: UpdateChannelOutboundCardStatus :exec
|
||||
UPDATE channel_outbound_card_message
|
||||
SET status = $2,
|
||||
last_patched_at = now()
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
type UpdateChannelOutboundCardStatusParams struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
func (q *Queries) UpdateChannelOutboundCardStatus(ctx context.Context, arg UpdateChannelOutboundCardStatusParams) error {
|
||||
_, err := q.db.Exec(ctx, updateChannelOutboundCardStatus, arg.ID, arg.Status)
|
||||
return err
|
||||
}
|
||||
|
||||
const upsertChannelInstallation = `-- name: UpsertChannelInstallation :one
|
||||
|
||||
|
||||
INSERT INTO channel_installation (
|
||||
workspace_id, agent_id, channel_type, config, installer_user_id
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5
|
||||
)
|
||||
ON CONFLICT (workspace_id, agent_id) DO UPDATE SET
|
||||
channel_type = EXCLUDED.channel_type,
|
||||
config = EXCLUDED.config,
|
||||
installer_user_id = EXCLUDED.installer_user_id,
|
||||
status = 'active',
|
||||
installed_at = now(),
|
||||
updated_at = now()
|
||||
RETURNING id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at
|
||||
`
|
||||
|
||||
type UpsertChannelInstallationParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
Config []byte `json:"config"`
|
||||
InstallerUserID pgtype.UUID `json:"installer_user_id"`
|
||||
}
|
||||
|
||||
// Platform-agnostic inbound channel queries (MUL-3515). These operate on
|
||||
// the channel_* tables created in migration 123. Each installation carries
|
||||
// a `channel_type` discriminator and a JSONB `config` blob for
|
||||
// platform-specific identifiers/credentials; the cross-platform columns
|
||||
// stay flat. The Go layer owns building/parsing config — these queries
|
||||
// treat it as opaque JSON except for the routing index on config->>'app_id'.
|
||||
//
|
||||
// No foreign keys exist on these tables (MUL-3515 §4): the integrity the
|
||||
// old composite FKs enforced (binding workspace matches installation;
|
||||
// binding dies with membership / chat_session) is maintained in the
|
||||
// application layer via the membership check in the inbound identity step
|
||||
// and the *DeleteChannel*BindingsBy* cleanup queries below.
|
||||
// =====================
|
||||
// channel_installation
|
||||
// =====================
|
||||
// Install / re-install path. `config` is the opaque per-channel JSONB the
|
||||
// Go layer assembles (for feishu: app_id, app_secret_encrypted, tenant_key,
|
||||
// bot_open_id, bot_union_id, region). Re-installing the same agent replaces
|
||||
// the whole config and forces status back to 'active'. The WS lease is
|
||||
// intentionally NOT reset here — the inbound hub owns lease lifecycle.
|
||||
func (q *Queries) UpsertChannelInstallation(ctx context.Context, arg UpsertChannelInstallationParams) (ChannelInstallation, error) {
|
||||
row := q.db.QueryRow(ctx, upsertChannelInstallation,
|
||||
arg.WorkspaceID,
|
||||
arg.AgentID,
|
||||
arg.ChannelType,
|
||||
arg.Config,
|
||||
arg.InstallerUserID,
|
||||
)
|
||||
var i ChannelInstallation
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.ChannelType,
|
||||
&i.Config,
|
||||
&i.Status,
|
||||
&i.WsLeaseToken,
|
||||
&i.WsLeaseExpiresAt,
|
||||
&i.InstallerUserID,
|
||||
&i.InstalledAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
@@ -177,6 +177,88 @@ type AutopilotTrigger struct {
|
||||
EventFilters []byte `json:"event_filters"`
|
||||
}
|
||||
|
||||
type ChannelBindingToken struct {
|
||||
TokenHash string `json:"token_hash"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelUserID string `json:"channel_user_id"`
|
||||
ExpiresAt pgtype.Timestamptz `json:"expires_at"`
|
||||
ConsumedAt pgtype.Timestamptz `json:"consumed_at"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type ChannelChatSessionBinding struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelChatID string `json:"channel_chat_id"`
|
||||
ChatType string `json:"chat_type"`
|
||||
LastMessageID pgtype.Text `json:"last_message_id"`
|
||||
LastThreadID pgtype.Text `json:"last_thread_id"`
|
||||
Config []byte `json:"config"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type ChannelInboundAudit struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelChatID pgtype.Text `json:"channel_chat_id"`
|
||||
EventType string `json:"event_type"`
|
||||
ChannelEventID pgtype.Text `json:"channel_event_id"`
|
||||
ChannelMessageID pgtype.Text `json:"channel_message_id"`
|
||||
DropReason string `json:"drop_reason"`
|
||||
ReceivedAt pgtype.Timestamptz `json:"received_at"`
|
||||
}
|
||||
|
||||
type ChannelInboundMessageDedup struct {
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
MessageID string `json:"message_id"`
|
||||
ReceivedAt pgtype.Timestamptz `json:"received_at"`
|
||||
ProcessedAt pgtype.Timestamptz `json:"processed_at"`
|
||||
ClaimToken pgtype.UUID `json:"claim_token"`
|
||||
}
|
||||
|
||||
type ChannelInstallation struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
Config []byte `json:"config"`
|
||||
Status string `json:"status"`
|
||||
WsLeaseToken pgtype.Text `json:"ws_lease_token"`
|
||||
WsLeaseExpiresAt pgtype.Timestamptz `json:"ws_lease_expires_at"`
|
||||
InstallerUserID pgtype.UUID `json:"installer_user_id"`
|
||||
InstalledAt pgtype.Timestamptz `json:"installed_at"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
UpdatedAt pgtype.Timestamptz `json:"updated_at"`
|
||||
}
|
||||
|
||||
type ChannelOutboundCardMessage struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
TaskID pgtype.UUID `json:"task_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelChatID string `json:"channel_chat_id"`
|
||||
ChannelCardMessageID string `json:"channel_card_message_id"`
|
||||
Status string `json:"status"`
|
||||
LastPatchedAt pgtype.Timestamptz `json:"last_patched_at"`
|
||||
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
||||
}
|
||||
|
||||
type ChannelUserBinding struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
MulticaUserID pgtype.UUID `json:"multica_user_id"`
|
||||
InstallationID pgtype.UUID `json:"installation_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
ChannelUserID string `json:"channel_user_id"`
|
||||
Config []byte `json:"config"`
|
||||
BoundAt pgtype.Timestamptz `json:"bound_at"`
|
||||
}
|
||||
|
||||
type ChatMessage struct {
|
||||
ID pgtype.UUID `json:"id"`
|
||||
ChatSessionID pgtype.UUID `json:"chat_session_id"`
|
||||
|
||||
317
server/pkg/db/queries/channel.sql
Normal file
317
server/pkg/db/queries/channel.sql
Normal file
@@ -0,0 +1,317 @@
|
||||
-- Platform-agnostic inbound channel queries (MUL-3515). These operate on
|
||||
-- the channel_* tables created in migration 123. Each installation carries
|
||||
-- a `channel_type` discriminator and a JSONB `config` blob for
|
||||
-- platform-specific identifiers/credentials; the cross-platform columns
|
||||
-- stay flat. The Go layer owns building/parsing config — these queries
|
||||
-- treat it as opaque JSON except for the routing index on config->>'app_id'.
|
||||
--
|
||||
-- No foreign keys exist on these tables (MUL-3515 §4): the integrity the
|
||||
-- old composite FKs enforced (binding workspace matches installation;
|
||||
-- binding dies with membership / chat_session) is maintained in the
|
||||
-- application layer via the membership check in the inbound identity step
|
||||
-- and the *DeleteChannel*BindingsBy* cleanup queries below.
|
||||
|
||||
-- =====================
|
||||
-- channel_installation
|
||||
-- =====================
|
||||
|
||||
-- name: UpsertChannelInstallation :one
|
||||
-- Install / re-install path. `config` is the opaque per-channel JSONB the
|
||||
-- Go layer assembles (for feishu: app_id, app_secret_encrypted, tenant_key,
|
||||
-- bot_open_id, bot_union_id, region). Re-installing the same agent replaces
|
||||
-- the whole config and forces status back to 'active'. The WS lease is
|
||||
-- intentionally NOT reset here — the inbound hub owns lease lifecycle.
|
||||
INSERT INTO channel_installation (
|
||||
workspace_id, agent_id, channel_type, config, installer_user_id
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5
|
||||
)
|
||||
ON CONFLICT (workspace_id, agent_id) DO UPDATE SET
|
||||
channel_type = EXCLUDED.channel_type,
|
||||
config = EXCLUDED.config,
|
||||
installer_user_id = EXCLUDED.installer_user_id,
|
||||
status = 'active',
|
||||
installed_at = now(),
|
||||
updated_at = now()
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetChannelInstallation :one
|
||||
SELECT * FROM channel_installation WHERE id = $1;
|
||||
|
||||
-- name: GetChannelInstallationInWorkspace :one
|
||||
SELECT * FROM channel_installation
|
||||
WHERE id = $1 AND workspace_id = $2;
|
||||
|
||||
-- name: GetChannelInstallationByAppID :one
|
||||
-- Inbound routing. The platform event carries only the channel's app
|
||||
-- identifier (Feishu app_id); the dispatcher's installation resolver routes
|
||||
-- on (channel_type, config->>'app_id'). Backed by the functional unique
|
||||
-- index idx_channel_installation_type_appid.
|
||||
SELECT * FROM channel_installation
|
||||
WHERE channel_type = $1 AND config ->> 'app_id' = $2;
|
||||
|
||||
-- name: ListChannelInstallationsByWorkspace :many
|
||||
SELECT * FROM channel_installation
|
||||
WHERE workspace_id = $1
|
||||
ORDER BY created_at ASC;
|
||||
|
||||
-- name: ListActiveChannelInstallations :many
|
||||
-- Boot path for the inbound hub: every active installation, any channel
|
||||
-- type, so the hub can claim leases and open connections.
|
||||
SELECT * FROM channel_installation
|
||||
WHERE status = 'active'
|
||||
ORDER BY created_at ASC;
|
||||
|
||||
-- name: SetChannelInstallationStatus :exec
|
||||
UPDATE channel_installation
|
||||
SET status = $2, updated_at = now()
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: SetChannelInstallationConfig :exec
|
||||
-- Replaces the whole config blob for one installation. Used by the
|
||||
-- operator backfills (e.g. setting a freshly-fetched bot_union_id) that
|
||||
-- read-modify-write the JSON in Go and persist it back atomically by id.
|
||||
UPDATE channel_installation
|
||||
SET config = $2, updated_at = now()
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: BackfillChannelInstallationRegionToFeishuLark :execrows
|
||||
-- Operator repair, feishu-only: flip every feishu installation still
|
||||
-- carrying region='feishu' to 'lark'. Called only on deployments whose
|
||||
-- legacy global base-URL override pointed at Lark international. Idempotent.
|
||||
UPDATE channel_installation
|
||||
SET config = jsonb_set(config, '{region}', '"lark"'),
|
||||
updated_at = now()
|
||||
WHERE channel_type = 'feishu'
|
||||
AND config ->> 'region' = 'feishu';
|
||||
|
||||
-- name: AcquireChannelWSLease :one
|
||||
-- Atomically claims the WebSocket lease. CAS predicate accepts when no
|
||||
-- holder exists, the holder expired, or the holder is us (renewal).
|
||||
UPDATE channel_installation
|
||||
SET ws_lease_token = sqlc.arg('new_token'),
|
||||
ws_lease_expires_at = sqlc.arg('new_expires_at'),
|
||||
updated_at = now()
|
||||
WHERE id = sqlc.arg('id')
|
||||
AND status = 'active'
|
||||
AND (
|
||||
ws_lease_token IS NULL
|
||||
OR ws_lease_expires_at < now()
|
||||
OR ws_lease_token = sqlc.arg('new_token')
|
||||
)
|
||||
RETURNING *;
|
||||
|
||||
-- name: ReleaseChannelWSLease :exec
|
||||
-- Drops the lease iff we are still the holder.
|
||||
UPDATE channel_installation
|
||||
SET ws_lease_token = NULL,
|
||||
ws_lease_expires_at = NULL,
|
||||
updated_at = now()
|
||||
WHERE id = $1
|
||||
AND ws_lease_token = sqlc.arg('current_token');
|
||||
|
||||
-- =====================
|
||||
-- channel_user_binding
|
||||
-- =====================
|
||||
|
||||
-- name: CreateChannelUserBinding :one
|
||||
-- Records that a platform user id (per-installation; Feishu open_id) maps
|
||||
-- to a Multica user. The old composite member-FK is gone, so this no
|
||||
-- longer fails when the redeemer is not a workspace member — the caller
|
||||
-- (BindingTokenService.RedeemAndBind) validates membership explicitly
|
||||
-- before calling. ON CONFLICT DO UPDATE is still gated on multica_user_id
|
||||
-- matching, so a second redeemer cannot steal an already-bound user id;
|
||||
-- a cross-user conflict updates zero rows and the caller maps that to
|
||||
-- ErrBindingAlreadyAssigned. config carries secondary identity (union_id).
|
||||
INSERT INTO channel_user_binding (
|
||||
workspace_id, multica_user_id, installation_id,
|
||||
channel_type, channel_user_id, config
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6
|
||||
)
|
||||
ON CONFLICT (installation_id, channel_user_id) DO UPDATE SET
|
||||
config = channel_user_binding.config || EXCLUDED.config,
|
||||
bound_at = now()
|
||||
WHERE channel_user_binding.multica_user_id = EXCLUDED.multica_user_id
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetChannelUserBindingByUserID :one
|
||||
-- The inbound identity lookup: does this platform user id map to a Multica
|
||||
-- user for this installation? With the member-FK removed, a row's
|
||||
-- existence no longer proves current workspace membership — the dispatcher
|
||||
-- re-checks membership after this lookup.
|
||||
SELECT * FROM channel_user_binding
|
||||
WHERE installation_id = $1 AND channel_user_id = $2;
|
||||
|
||||
-- name: DeleteChannelUserBindingsByWorkspaceMember :exec
|
||||
-- Application-layer integrity (replaces the old member-FK ON DELETE
|
||||
-- CASCADE): prune every binding for a user who has been removed from a
|
||||
-- workspace, across all installations in that workspace.
|
||||
DELETE FROM channel_user_binding
|
||||
WHERE workspace_id = $1 AND multica_user_id = $2;
|
||||
|
||||
-- =====================
|
||||
-- channel_chat_session_binding
|
||||
-- =====================
|
||||
|
||||
-- name: CreateChannelChatSessionBinding :one
|
||||
INSERT INTO channel_chat_session_binding (
|
||||
chat_session_id, installation_id, channel_type, channel_chat_id, chat_type
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5
|
||||
)
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetChannelChatSessionBinding :one
|
||||
-- Lookup-by-channel-chat: the inbound dispatcher finds the existing
|
||||
-- chat_session before deciding whether to create one.
|
||||
SELECT * FROM channel_chat_session_binding
|
||||
WHERE installation_id = $1 AND channel_chat_id = $2;
|
||||
|
||||
-- name: GetChannelChatSessionBindingBySession :one
|
||||
-- Reverse lookup for the outbound patcher: given a chat_session_id, find
|
||||
-- its channel binding to know which (installation, chat_id) to send to.
|
||||
SELECT * FROM channel_chat_session_binding
|
||||
WHERE chat_session_id = $1;
|
||||
|
||||
-- name: UpdateChannelChatSessionBindingReplyTarget :exec
|
||||
-- Records the most recent inbound trigger message + thread so the decoupled
|
||||
-- outbound patcher can thread its reply back into the originating topic.
|
||||
UPDATE channel_chat_session_binding
|
||||
SET last_message_id = sqlc.narg('last_message_id'),
|
||||
last_thread_id = sqlc.narg('last_thread_id')
|
||||
WHERE chat_session_id = $1;
|
||||
|
||||
-- name: DeleteChannelChatSessionBindingBySession :exec
|
||||
-- Application-layer integrity (replaces the old chat_session-FK ON DELETE
|
||||
-- CASCADE): drop the binding when its chat_session is deleted.
|
||||
DELETE FROM channel_chat_session_binding
|
||||
WHERE chat_session_id = $1;
|
||||
|
||||
-- =====================
|
||||
-- channel_inbound_message_dedup
|
||||
-- =====================
|
||||
|
||||
-- name: ClaimChannelInboundDedup :one
|
||||
-- Two-phase idempotency gate with owner fencing. Returns the row when a
|
||||
-- claim is acquired (fresh insert, or stale-reclaim of an in-flight claim
|
||||
-- older than 60s); returns no rows when terminal (processed) or actively
|
||||
-- in-flight. Every claim mints a fresh claim_token; Mark/Release are
|
||||
-- fenced on it. See the table comment in migration 123 / the lark
|
||||
-- predecessor for the full invariant set.
|
||||
INSERT INTO channel_inbound_message_dedup (installation_id, message_id, claim_token)
|
||||
VALUES ($1, $2, gen_random_uuid())
|
||||
ON CONFLICT (installation_id, message_id) DO UPDATE
|
||||
SET received_at = now(),
|
||||
claim_token = gen_random_uuid()
|
||||
WHERE channel_inbound_message_dedup.processed_at IS NULL
|
||||
AND channel_inbound_message_dedup.received_at < now() - INTERVAL '60 seconds'
|
||||
RETURNING installation_id, message_id, received_at, processed_at, claim_token;
|
||||
|
||||
-- name: MarkChannelInboundDedupProcessed :execrows
|
||||
-- Locks a claim in as permanently processed after a durable outcome.
|
||||
-- Invoked inside the chat_message tx (via qtx) on the ingest path so the
|
||||
-- durable write and the Mark commit atomically. Token mismatch returns
|
||||
-- zero rows (a reclaim happened); the caller rolls back its in-tx write.
|
||||
UPDATE channel_inbound_message_dedup
|
||||
SET processed_at = now()
|
||||
WHERE installation_id = $1
|
||||
AND message_id = $2
|
||||
AND claim_token = $3
|
||||
AND processed_at IS NULL;
|
||||
|
||||
-- name: ReleaseChannelInboundDedup :execrows
|
||||
-- Releases an in-flight claim when an infra error occurred before any
|
||||
-- durable side effect, so a retry can re-acquire immediately. Fenced on
|
||||
-- processed_at IS NULL and claim_token.
|
||||
DELETE FROM channel_inbound_message_dedup
|
||||
WHERE installation_id = $1
|
||||
AND message_id = $2
|
||||
AND claim_token = $3
|
||||
AND processed_at IS NULL;
|
||||
|
||||
-- name: PurgeChannelInboundDedup :exec
|
||||
-- Vacuum job: remove dedup rows older than the supplied cutoff (e.g. 24h).
|
||||
DELETE FROM channel_inbound_message_dedup
|
||||
WHERE received_at < $1;
|
||||
|
||||
-- =====================
|
||||
-- channel_inbound_audit
|
||||
-- =====================
|
||||
|
||||
-- name: RecordChannelInboundDrop :exec
|
||||
-- The only write path for dropped events. Deliberately carries no body
|
||||
-- column — only routing / identity / drop_reason / timestamp.
|
||||
INSERT INTO channel_inbound_audit (
|
||||
installation_id, channel_type, channel_chat_id, event_type,
|
||||
channel_event_id, channel_message_id, drop_reason
|
||||
) VALUES (
|
||||
sqlc.narg('installation_id'),
|
||||
$1,
|
||||
sqlc.narg('channel_chat_id'),
|
||||
$2,
|
||||
sqlc.narg('channel_event_id'),
|
||||
sqlc.narg('channel_message_id'),
|
||||
$3
|
||||
);
|
||||
|
||||
-- name: ListChannelInboundAuditByInstallation :many
|
||||
SELECT * FROM channel_inbound_audit
|
||||
WHERE installation_id = $1
|
||||
ORDER BY received_at DESC
|
||||
LIMIT $2 OFFSET $3;
|
||||
|
||||
-- =====================
|
||||
-- channel_outbound_card_message
|
||||
-- =====================
|
||||
|
||||
-- name: CreateChannelOutboundCardMessage :one
|
||||
INSERT INTO channel_outbound_card_message (
|
||||
chat_session_id, task_id, channel_type, channel_chat_id,
|
||||
channel_card_message_id, status
|
||||
) VALUES (
|
||||
$1, sqlc.narg('task_id'), $2, $3, $4, $5
|
||||
)
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetChannelOutboundCardByTask :one
|
||||
-- The partial unique index on (task_id) WHERE task_id IS NOT NULL
|
||||
-- guarantees at most one row.
|
||||
SELECT * FROM channel_outbound_card_message
|
||||
WHERE task_id = $1;
|
||||
|
||||
-- name: UpdateChannelOutboundCardStatus :exec
|
||||
UPDATE channel_outbound_card_message
|
||||
SET status = $2,
|
||||
last_patched_at = now()
|
||||
WHERE id = $1;
|
||||
|
||||
-- =====================
|
||||
-- channel_binding_token
|
||||
-- =====================
|
||||
|
||||
-- name: CreateChannelBindingToken :one
|
||||
-- Mints a single-use binding token for an unbound platform user. TTL cap
|
||||
-- (15 min) enforced by the table CHECK in lockstep with
|
||||
-- channel.BindingTokenTTL. The HASH is stored, never the raw token.
|
||||
INSERT INTO channel_binding_token (
|
||||
token_hash, workspace_id, installation_id, channel_type,
|
||||
channel_user_id, expires_at
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6
|
||||
)
|
||||
RETURNING *;
|
||||
|
||||
-- name: ConsumeChannelBindingToken :one
|
||||
-- Atomic redemption: returns the row only if the hash exists, is
|
||||
-- unconsumed, and unexpired. Two simultaneous redemptions cannot both win.
|
||||
UPDATE channel_binding_token
|
||||
SET consumed_at = now()
|
||||
WHERE token_hash = $1
|
||||
AND consumed_at IS NULL
|
||||
AND expires_at > now()
|
||||
RETURNING *;
|
||||
|
||||
-- name: PurgeExpiredChannelBindingTokens :exec
|
||||
DELETE FROM channel_binding_token
|
||||
WHERE expires_at < $1;
|
||||
Reference in New Issue
Block a user