mirror of
https://github.com/multica-ai/multica.git
synced 2026-07-05 21:39:54 +02:00
fix(slack): address review must-fixes — connector leak, team-keyed install, /issue copy (MUL-3666)
Three fixes from Niko's review: 1. AppConnector.connectOnce leaked the Socket Mode goroutine/connection on a handler error: it ran sm.RunContext on the long-lived ctx and returned the error without cancelling it, so a transient DB/router error left the old connection alive (consuming events into an unread channel) while Run opened a second one. Each connection now runs under its own cancellable context and a deferred cancel + join tears it down on every exit path before reconnect. 2. Slack re-install collided with the (channel_type, app_id) unique index: connecting the same Slack team to a different agent failed because the upsert conflict key was (workspace_id, agent_id, channel_type). Add a team-keyed UpsertChannelInstallationByAppID (ON CONFLICT on the (channel_type, app_id) index, updating agent_id) and use it for the Slack OAuth install, so re-connecting a workspace moves the bot to the chosen agent instead of erroring. Feishu's per-agent upsert is unchanged. 3. /issue clarified: it is not a registered Slack slash command (no `commands` scope), so Slack never routes one to us. Issue creation runs through the message path — `@bot /issue <title>` in a channel or `/issue <title>` in a DM — which the engine parser handles. Documented in the connector and the user-facing copy (en + zh). Verified: go build ./..., go vet, gofmt, go test ./internal/integrations/..., make sqlc, plus pnpm typecheck (6/6) and pnpm lint (0 errors). Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
@@ -302,7 +302,7 @@
|
||||
},
|
||||
"slack": {
|
||||
"section_title": "Slack",
|
||||
"page_description": "Connect each Multica Agent to a Slack bot. A workspace admin installs it with one click via OAuth; members can then DM the bot, @mention it in channels, and type /issue to spin up a new Multica issue.",
|
||||
"page_description": "Connect each Multica Agent to a Slack bot. A workspace admin installs it with one click via OAuth; members can then DM the bot or @mention it in a channel, and start a message with /issue (e.g. \"@bot /issue Fix the login bug\") to spin up a new Multica issue.",
|
||||
"not_enabled_title": "Slack integration not enabled",
|
||||
"not_enabled_description_prefix": "Set",
|
||||
"not_enabled_description_suffix": "on the server to enable Slack bot installations.",
|
||||
|
||||
@@ -302,7 +302,7 @@
|
||||
},
|
||||
"slack": {
|
||||
"section_title": "Slack",
|
||||
"page_description": "把每个 Multica Agent 连接到一个 Slack 机器人。工作区管理员通过 OAuth 一键安装;成员之后即可私聊机器人、在频道中 @ 它,并输入 /issue 创建新的 Multica issue。",
|
||||
"page_description": "把每个 Multica Agent 连接到一个 Slack 机器人。工作区管理员通过 OAuth 一键安装;成员之后即可私聊机器人,或在频道中 @ 它,并以 /issue 开头发消息(例如「@机器人 /issue 修复登录问题」)来创建新的 Multica issue。",
|
||||
"not_enabled_title": "Slack 集成未启用",
|
||||
"not_enabled_description_prefix": "在服务器上设置",
|
||||
"not_enabled_description_suffix": "以启用 Slack 机器人安装。",
|
||||
|
||||
@@ -162,8 +162,24 @@ func (c *AppConnector) connectOnce(ctx context.Context) error {
|
||||
api := slack.New("", slack.OptionAppLevelToken(c.appToken))
|
||||
sm := socketmode.New(api)
|
||||
|
||||
runErr := make(chan error, 1)
|
||||
go func() { runErr <- sm.RunContext(ctx) }()
|
||||
// Each connection runs under its OWN cancellable context, not the parent
|
||||
// ctx directly. Every exit path (handler error, event-stream close, ctx
|
||||
// cancellation) cancels runCtx and waits for the run goroutine to observe it
|
||||
// and exit — so a transient handler error tears the live connection down
|
||||
// before Run reconnects. Without this, the old Socket Mode goroutine would
|
||||
// keep running on the long-lived ctx, leaking the connection/goroutine and
|
||||
// consuming events into an unread channel while a second connection opens.
|
||||
runCtx, runCancel := context.WithCancel(ctx)
|
||||
runErr := make(chan error, 1) // buffered: the goroutine sends once and exits even if nobody reads
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
runErr <- sm.RunContext(runCtx)
|
||||
close(done)
|
||||
}()
|
||||
defer func() {
|
||||
runCancel()
|
||||
<-done
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -216,8 +232,16 @@ func (c *AppConnector) handleSocketEvent(ctx context.Context, sm *socketmode.Cli
|
||||
case socketmode.EventTypeIncomingError, socketmode.EventTypeErrorBadMessage:
|
||||
c.logger.WarnContext(ctx, "slack: socket mode error", "event", evt.Type)
|
||||
default:
|
||||
// Interactive / slash-command / other events are out of scope; ACK so
|
||||
// Slack does not retry, then ignore.
|
||||
// Interactive / slash-command / other envelopes are intentionally
|
||||
// ignored (ACK so Slack does not retry). In particular, /issue is NOT a
|
||||
// registered Slack slash command — the hosted app requests no `commands`
|
||||
// scope, so Slack never routes a slash-command envelope here. Issue
|
||||
// creation runs through the normal message path instead: `@bot /issue
|
||||
// <title>` in a channel (the mention is stripped, leaving "/issue …") or
|
||||
// `/issue <title>` in a DM with the bot, which the engine's
|
||||
// ParseIssueCommand picks up. Adding native slash-command support
|
||||
// (scope + registration + response_url handling) is a possible later
|
||||
// enhancement, not required for the message-driven /issue flow.
|
||||
if evt.Request != nil {
|
||||
_ = sm.Ack(*evt.Request)
|
||||
}
|
||||
|
||||
@@ -76,7 +76,7 @@ func (c OAuthConfig) supported() bool {
|
||||
// installQueries is the slice of generated queries InstallService needs.
|
||||
// *db.Queries satisfies it.
|
||||
type installQueries interface {
|
||||
UpsertChannelInstallation(ctx context.Context, arg db.UpsertChannelInstallationParams) (db.ChannelInstallation, error)
|
||||
UpsertChannelInstallationByAppID(ctx context.Context, arg db.UpsertChannelInstallationByAppIDParams) (db.ChannelInstallation, error)
|
||||
CreateChannelUserBinding(ctx context.Context, arg db.CreateChannelUserBindingParams) (db.ChannelUserBinding, error)
|
||||
ListChannelInstallationsByWorkspace(ctx context.Context, arg db.ListChannelInstallationsByWorkspaceParams) ([]db.ChannelInstallation, error)
|
||||
GetChannelInstallationInWorkspace(ctx context.Context, arg db.GetChannelInstallationInWorkspaceParams) (db.ChannelInstallation, error)
|
||||
@@ -217,7 +217,11 @@ func (s *InstallService) Complete(ctx context.Context, code, rawState string) (C
|
||||
if err != nil {
|
||||
return CompletedInstall{}, fmt.Errorf("encode slack installation config: %w", err)
|
||||
}
|
||||
inst, err := s.q.UpsertChannelInstallation(ctx, db.UpsertChannelInstallationParams{
|
||||
// Team-keyed upsert: a Slack workspace (team_id) is one installation. Re-
|
||||
// connecting the same team — including to represent a different agent —
|
||||
// updates the existing row rather than colliding with the (channel_type,
|
||||
// app_id) unique index (Niko review must-fix #3).
|
||||
inst, err := s.q.UpsertChannelInstallationByAppID(ctx, db.UpsertChannelInstallationByAppIDParams{
|
||||
WorkspaceID: wsID,
|
||||
AgentID: agentID,
|
||||
ChannelType: string(TypeSlack),
|
||||
|
||||
@@ -40,13 +40,13 @@ func mustUUID(t *testing.T, s string) pgtype.UUID {
|
||||
}
|
||||
|
||||
type fakeInstallQueries struct {
|
||||
upsertParams db.UpsertChannelInstallationParams
|
||||
upsertParams db.UpsertChannelInstallationByAppIDParams
|
||||
bindParams db.CreateChannelUserBindingParams
|
||||
bindCalled bool
|
||||
rowID pgtype.UUID
|
||||
}
|
||||
|
||||
func (f *fakeInstallQueries) UpsertChannelInstallation(_ context.Context, arg db.UpsertChannelInstallationParams) (db.ChannelInstallation, error) {
|
||||
func (f *fakeInstallQueries) UpsertChannelInstallationByAppID(_ context.Context, arg db.UpsertChannelInstallationByAppIDParams) (db.ChannelInstallation, error) {
|
||||
f.upsertParams = arg
|
||||
return db.ChannelInstallation{
|
||||
ID: f.rowID,
|
||||
|
||||
@@ -1085,3 +1085,66 @@ func (q *Queries) UpsertChannelInstallation(ctx context.Context, arg UpsertChann
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
const upsertChannelInstallationByAppID = `-- name: UpsertChannelInstallationByAppID :one
|
||||
INSERT INTO channel_installation (
|
||||
workspace_id, agent_id, channel_type, config, installer_user_id
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5
|
||||
)
|
||||
ON CONFLICT (channel_type, (config ->> 'app_id')) DO UPDATE SET
|
||||
workspace_id = EXCLUDED.workspace_id,
|
||||
agent_id = EXCLUDED.agent_id,
|
||||
config = EXCLUDED.config,
|
||||
installer_user_id = EXCLUDED.installer_user_id,
|
||||
status = 'active',
|
||||
installed_at = now(),
|
||||
updated_at = now()
|
||||
RETURNING id, workspace_id, agent_id, channel_type, config, status, ws_lease_token, ws_lease_expires_at, installer_user_id, installed_at, created_at, updated_at
|
||||
`
|
||||
|
||||
type UpsertChannelInstallationByAppIDParams struct {
|
||||
WorkspaceID pgtype.UUID `json:"workspace_id"`
|
||||
AgentID pgtype.UUID `json:"agent_id"`
|
||||
ChannelType string `json:"channel_type"`
|
||||
Config []byte `json:"config"`
|
||||
InstallerUserID pgtype.UUID `json:"installer_user_id"`
|
||||
}
|
||||
|
||||
// Team-keyed install / re-install for channels whose natural identity is the
|
||||
// platform workspace, not the (agent) pairing. Slack: one Slack workspace
|
||||
// (team_id, stored as config->>'app_id') maps to exactly one installation, so
|
||||
// re-connecting it — even to represent a DIFFERENT agent — UPDATES the existing
|
||||
// row (moving agent_id) instead of colliding with the (channel_type, app_id)
|
||||
// unique index. Contrast UpsertChannelInstallation, whose conflict key is
|
||||
// (workspace_id, agent_id, channel_type): right for Feishu (one app per agent),
|
||||
// wrong for Slack (a second agent connecting the same team would hit the
|
||||
// (channel_type, app_id) index). NOTE: a re-connect that would move the team to
|
||||
// an agent that already holds a different Slack install still trips the
|
||||
// (workspace_id, agent_id, channel_type) unique constraint — that genuine
|
||||
// conflict surfaces as an error the OAuth callback turns into a redirect.
|
||||
func (q *Queries) UpsertChannelInstallationByAppID(ctx context.Context, arg UpsertChannelInstallationByAppIDParams) (ChannelInstallation, error) {
|
||||
row := q.db.QueryRow(ctx, upsertChannelInstallationByAppID,
|
||||
arg.WorkspaceID,
|
||||
arg.AgentID,
|
||||
arg.ChannelType,
|
||||
arg.Config,
|
||||
arg.InstallerUserID,
|
||||
)
|
||||
var i ChannelInstallation
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.WorkspaceID,
|
||||
&i.AgentID,
|
||||
&i.ChannelType,
|
||||
&i.Config,
|
||||
&i.Status,
|
||||
&i.WsLeaseToken,
|
||||
&i.WsLeaseExpiresAt,
|
||||
&i.InstallerUserID,
|
||||
&i.InstalledAt,
|
||||
&i.CreatedAt,
|
||||
&i.UpdatedAt,
|
||||
)
|
||||
return i, err
|
||||
}
|
||||
|
||||
@@ -38,6 +38,34 @@ ON CONFLICT (workspace_id, agent_id, channel_type) DO UPDATE SET
|
||||
updated_at = now()
|
||||
RETURNING *;
|
||||
|
||||
-- name: UpsertChannelInstallationByAppID :one
|
||||
-- Team-keyed install / re-install for channels whose natural identity is the
|
||||
-- platform workspace, not the (agent) pairing. Slack: one Slack workspace
|
||||
-- (team_id, stored as config->>'app_id') maps to exactly one installation, so
|
||||
-- re-connecting it — even to represent a DIFFERENT agent — UPDATES the existing
|
||||
-- row (moving agent_id) instead of colliding with the (channel_type, app_id)
|
||||
-- unique index. Contrast UpsertChannelInstallation, whose conflict key is
|
||||
-- (workspace_id, agent_id, channel_type): right for Feishu (one app per agent),
|
||||
-- wrong for Slack (a second agent connecting the same team would hit the
|
||||
-- (channel_type, app_id) index). NOTE: a re-connect that would move the team to
|
||||
-- an agent that already holds a different Slack install still trips the
|
||||
-- (workspace_id, agent_id, channel_type) unique constraint — that genuine
|
||||
-- conflict surfaces as an error the OAuth callback turns into a redirect.
|
||||
INSERT INTO channel_installation (
|
||||
workspace_id, agent_id, channel_type, config, installer_user_id
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5
|
||||
)
|
||||
ON CONFLICT (channel_type, (config ->> 'app_id')) DO UPDATE SET
|
||||
workspace_id = EXCLUDED.workspace_id,
|
||||
agent_id = EXCLUDED.agent_id,
|
||||
config = EXCLUDED.config,
|
||||
installer_user_id = EXCLUDED.installer_user_id,
|
||||
status = 'active',
|
||||
installed_at = now(),
|
||||
updated_at = now()
|
||||
RETURNING *;
|
||||
|
||||
-- name: GetChannelInstallation :one
|
||||
-- Scoped by channel_type: a per-channel caller (e.g. the Feishu store)
|
||||
-- must never resolve another channel's installation by guessing its UUID.
|
||||
|
||||
Reference in New Issue
Block a user