fix(lark): publish lark_installation:created at row-commit, not on status poll

The agent Integrations tab's "已连接到飞书" connection badge only updated after
a manual page refresh. lark_installation:created had a single emit site — the
status-poll handler GetLarkInstallStatus — so it only fired while a browser was
actively polling the install dialog to success. Every other surface (a second
admin, the inspector sidebar, the Settings panel, or the installer whose dialog
closed before the success poll) never received the invalidation frame, and under
the QueryClient defaults (staleTime: Infinity) the installations cache stayed
stale until a full page refresh.

Publish the event from RegistrationService.finishSuccess at the row-commit point,
mirroring the already-correct revoke path, so every workspace client refreshes
the moment the install lands. Wire the bus via an optional SetEventBus (keeps the
constructor and its validation tests untouched, nil-safe) and remove the now-
redundant poll-handler emit.

MUL-3059

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
J
2026-06-04 19:08:06 +08:00
parent 5b69331ad2
commit a4e7c6f93e
4 changed files with 124 additions and 20 deletions

View File

@@ -319,6 +319,10 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus
if rerr != nil {
slog.Error("lark: RegistrationService init failed; install disabled", "error", rerr)
} else {
// Publish lark_installation:created at row-commit time so the
// connection badge refreshes on every workspace client, not just
// the tab that polls the install status to success.
regSvc.SetEventBus(bus)
h.LarkRegistration = regSvc
slog.Info("lark device-flow install enabled")
}

View File

@@ -342,16 +342,11 @@ func (h *Handler) GetLarkInstallStatus(w http.ResponseWriter, r *http.Request) {
}
if state.InstallationID.Valid {
resp.InstallationID = uuidToString(state.InstallationID)
// Successful install — emit the lark_installation:created
// event so listeners (Settings tab, agent detail page) refresh
// without waiting for the frontend's cache invalidation.
// Idempotent: the event handler keys on installation_id, and
// a duplicate emit is a no-op refresh.
if state.Status == lark.RegistrationStatusSuccess {
h.publish(protocol.EventLarkInstallationCreated, uuidToString(wsUUID), "system", "", map[string]any{
"installation_id": resp.InstallationID,
})
}
// The lark_installation:created event is published by the
// RegistrationService at the row-commit point (see
// registration_service.go finishSuccess), not here — that keeps
// the connection-badge refresh independent of whether any browser
// polls this status endpoint to success.
}
writeJSON(w, http.StatusOK, resp)
}

View File

@@ -12,7 +12,9 @@ import (
"time"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/events"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// RegistrationSessionStatus is the discriminated state a `begin`
@@ -101,15 +103,22 @@ func (c RegistrationServiceConfig) withDefaults() RegistrationServiceConfig {
// into Postgres would add a migration + GC sweep without delivering any
// product capability the user can re-use across server restarts.
type RegistrationService struct {
cfg RegistrationServiceConfig
client *RegistrationClient
api APIClient
queries *db.Queries
tx TxStarter
installs *InstallationService
binder InstallerBinder
cfg RegistrationServiceConfig
client *RegistrationClient
api APIClient
queries *db.Queries
tx TxStarter
installs *InstallationService
binder InstallerBinder
authQueries authQueriesAdapter
// bus is optional. When wired (SetEventBus), a successful install
// publishes lark_installation:created the moment the row commits, so
// every workspace client refreshes its connection badge without
// waiting for a browser to poll the status endpoint to success. Nil
// is valid — install still works, it just won't push the WS frame.
bus *events.Bus
mu sync.Mutex
sessions map[string]*registrationSession
}
@@ -167,6 +176,37 @@ func NewRegistrationService(
}, nil
}
// SetEventBus wires the optional event bus AFTER construction so the
// six positional constructor-validation cases stay untouched and the
// bus remains nil-safe. With it set, finishSuccess publishes
// lark_installation:created at the row-commit point — the authoritative
// moment of truth — instead of relying on the HTTP status-poll handler
// to emit it only when a browser happens to poll to success.
func (s *RegistrationService) SetEventBus(bus *events.Bus) {
s.bus = bus
}
// publishInstalled emits lark_installation:created on the optional bus.
// Mirrors the revoke path (RevokeLarkInstallation publishes
// lark_installation:revoked from its handler): both events broadcast to
// the whole workspace via the SubscribeAll fanout, and the frontend
// invalidates larkKeys.installations on the lark_installation prefix, so
// every mounted surface (agent Integrations tab, inspector, Settings)
// refreshes its connection badge with no page reload. Covers fresh
// installs and revoked→active re-installs alike — both ride the same
// UpsertLarkInstallation write. Nil-safe.
func (s *RegistrationService) publishInstalled(workspaceID, installationID pgtype.UUID) {
if s.bus == nil {
return
}
s.bus.Publish(events.Event{
Type: protocol.EventLarkInstallationCreated,
WorkspaceID: uuidString(workspaceID),
ActorType: "system",
Payload: map[string]any{"installation_id": uuidString(installationID)},
})
}
// registrationSession is the in-memory state for one in-flight install.
type registrationSession struct {
id string
@@ -248,9 +288,9 @@ type BeginInstallParams struct {
// poll status; we deliberately do NOT echo the device_code or the
// polling interval (which is internal scheduling state).
type BeginInstallResult struct {
SessionID string
QRCodeURL string
ExpiresInSeconds int
SessionID string
QRCodeURL string
ExpiresInSeconds int
PollIntervalSeconds int
}
@@ -505,6 +545,10 @@ func (s *RegistrationService) finishSuccess(ctx context.Context, sess *registrat
return
}
sess.markSuccess(inst.ID, s.gcDeadline())
// Publish at the commit point so the connection badge updates on every
// workspace client without a page refresh — not only on the tab that
// happens to poll the status endpoint to success.
s.publishInstalled(sess.workspaceID, inst.ID)
s.cfg.Logger.Info("lark registration: install complete",
"session_id", sess.id,
"workspace_id", uuidString(sess.workspaceID),

View File

@@ -10,7 +10,9 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/multica-ai/multica/server/internal/events"
db "github.com/multica-ai/multica/server/pkg/db/generated"
"github.com/multica-ai/multica/server/pkg/protocol"
)
// These tests cover the pure-Go halves of RegistrationService —
@@ -222,6 +224,65 @@ func TestRandomSessionIDUnique(t *testing.T) {
}
}
// TestRegistrationServicePublishInstalledEmitsCreatedEvent pins the
// MUL-3059 fix: a completed install must publish lark_installation:created
// at the row-write point so every workspace client refreshes its
// connection badge without a page reload. The bug was that this event only
// fired from the HTTP status-poll handler, so any surface that wasn't the
// polling install dialog stayed stale until a manual refresh. The exact
// shape (type, workspace, system actor, installation_id payload) is what
// the SubscribeAll fanout and the frontend lark_installation-prefix
// invalidation depend on.
func TestRegistrationServicePublishInstalledEmitsCreatedEvent(t *testing.T) {
bus := events.New()
var caught []events.Event
bus.Subscribe(protocol.EventLarkInstallationCreated, func(e events.Event) {
caught = append(caught, e)
})
svc := newRegistrationServiceForTest(t)
svc.SetEventBus(bus)
ws := uuidFromStringSvc(t, "11111111-1111-1111-1111-111111111111")
inst := uuidFromStringSvc(t, "22222222-2222-2222-2222-222222222222")
svc.publishInstalled(ws, inst)
// Exactly one — guards against a future re-introduction of the
// now-removed second emit in the status-poll handler.
if len(caught) != 1 {
t.Fatalf("expected exactly 1 lark_installation:created event, got %d", len(caught))
}
got := caught[0]
if got.Type != protocol.EventLarkInstallationCreated {
t.Errorf("type = %q, want %q", got.Type, protocol.EventLarkInstallationCreated)
}
if got.WorkspaceID != uuidString(ws) {
t.Errorf("workspace_id = %q, want %q", got.WorkspaceID, uuidString(ws))
}
if got.ActorType != "system" {
t.Errorf("actor_type = %q, want \"system\"", got.ActorType)
}
payload, ok := got.Payload.(map[string]any)
if !ok {
t.Fatalf("payload type = %T, want map[string]any", got.Payload)
}
if payload["installation_id"] != uuidString(inst) {
t.Errorf("installation_id = %v, want %q", payload["installation_id"], uuidString(inst))
}
}
// TestRegistrationServicePublishInstalledNilBusIsNoOp pins that an install
// still completes when no bus is wired — the bus is optional (SetEventBus
// is never called in self-host builds that disable realtime), so the
// publish must be a silent no-op rather than a nil-deref panic.
func TestRegistrationServicePublishInstalledNilBusIsNoOp(t *testing.T) {
svc := newRegistrationServiceForTest(t) // no SetEventBus
svc.publishInstalled(
uuidFromStringSvc(t, "33333333-3333-3333-3333-333333333333"),
uuidFromStringSvc(t, "44444444-4444-4444-4444-444444444444"),
)
}
// fakeInstallerBinder records BindInstallerTx calls for tests that
// need to assert the bind happened.
type fakeInstallerBinder struct {