From a4e7c6f93e44759cee547bb4517b842139540b8a Mon Sep 17 00:00:00 2001 From: J Date: Thu, 4 Jun 2026 19:08:06 +0800 Subject: [PATCH] fix(lark): publish lark_installation:created at row-commit, not on status poll MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The agent Integrations tab's "已连接到飞书" connection badge only updated after a manual page refresh. lark_installation:created had a single emit site — the status-poll handler GetLarkInstallStatus — so it only fired while a browser was actively polling the install dialog to success. Every other surface (a second admin, the inspector sidebar, the Settings panel, or the installer whose dialog closed before the success poll) never received the invalidation frame, and under the QueryClient defaults (staleTime: Infinity) the installations cache stayed stale until a full page refresh. Publish the event from RegistrationService.finishSuccess at the row-commit point, mirroring the already-correct revoke path, so every workspace client refreshes the moment the install lands. Wire the bus via an optional SetEventBus (keeps the constructor and its validation tests untouched, nil-safe) and remove the now- redundant poll-handler emit. MUL-3059 Co-authored-by: multica-agent --- server/cmd/server/router.go | 4 ++ server/internal/handler/lark.go | 15 ++--- .../integrations/lark/registration_service.go | 64 ++++++++++++++++--- .../lark/registration_service_test.go | 61 ++++++++++++++++++ 4 files changed, 124 insertions(+), 20 deletions(-) diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 2a07b552e..92d15cb22 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -319,6 +319,10 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus if rerr != nil { slog.Error("lark: RegistrationService init failed; install disabled", "error", rerr) } else { + // Publish lark_installation:created at row-commit time so the + // connection badge refreshes on every workspace client, not just + // the tab that polls the install status to success. + regSvc.SetEventBus(bus) h.LarkRegistration = regSvc slog.Info("lark device-flow install enabled") } diff --git a/server/internal/handler/lark.go b/server/internal/handler/lark.go index ec5b047bc..d6810d565 100644 --- a/server/internal/handler/lark.go +++ b/server/internal/handler/lark.go @@ -342,16 +342,11 @@ func (h *Handler) GetLarkInstallStatus(w http.ResponseWriter, r *http.Request) { } if state.InstallationID.Valid { resp.InstallationID = uuidToString(state.InstallationID) - // Successful install — emit the lark_installation:created - // event so listeners (Settings tab, agent detail page) refresh - // without waiting for the frontend's cache invalidation. - // Idempotent: the event handler keys on installation_id, and - // a duplicate emit is a no-op refresh. - if state.Status == lark.RegistrationStatusSuccess { - h.publish(protocol.EventLarkInstallationCreated, uuidToString(wsUUID), "system", "", map[string]any{ - "installation_id": resp.InstallationID, - }) - } + // The lark_installation:created event is published by the + // RegistrationService at the row-commit point (see + // registration_service.go finishSuccess), not here — that keeps + // the connection-badge refresh independent of whether any browser + // polls this status endpoint to success. } writeJSON(w, http.StatusOK, resp) } diff --git a/server/internal/integrations/lark/registration_service.go b/server/internal/integrations/lark/registration_service.go index 4efae7029..7b81617c6 100644 --- a/server/internal/integrations/lark/registration_service.go +++ b/server/internal/integrations/lark/registration_service.go @@ -12,7 +12,9 @@ import ( "time" "github.com/jackc/pgx/v5/pgtype" + "github.com/multica-ai/multica/server/internal/events" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) // RegistrationSessionStatus is the discriminated state a `begin` @@ -101,15 +103,22 @@ func (c RegistrationServiceConfig) withDefaults() RegistrationServiceConfig { // into Postgres would add a migration + GC sweep without delivering any // product capability the user can re-use across server restarts. type RegistrationService struct { - cfg RegistrationServiceConfig - client *RegistrationClient - api APIClient - queries *db.Queries - tx TxStarter - installs *InstallationService - binder InstallerBinder + cfg RegistrationServiceConfig + client *RegistrationClient + api APIClient + queries *db.Queries + tx TxStarter + installs *InstallationService + binder InstallerBinder authQueries authQueriesAdapter + // bus is optional. When wired (SetEventBus), a successful install + // publishes lark_installation:created the moment the row commits, so + // every workspace client refreshes its connection badge without + // waiting for a browser to poll the status endpoint to success. Nil + // is valid — install still works, it just won't push the WS frame. + bus *events.Bus + mu sync.Mutex sessions map[string]*registrationSession } @@ -167,6 +176,37 @@ func NewRegistrationService( }, nil } +// SetEventBus wires the optional event bus AFTER construction so the +// six positional constructor-validation cases stay untouched and the +// bus remains nil-safe. With it set, finishSuccess publishes +// lark_installation:created at the row-commit point — the authoritative +// moment of truth — instead of relying on the HTTP status-poll handler +// to emit it only when a browser happens to poll to success. +func (s *RegistrationService) SetEventBus(bus *events.Bus) { + s.bus = bus +} + +// publishInstalled emits lark_installation:created on the optional bus. +// Mirrors the revoke path (RevokeLarkInstallation publishes +// lark_installation:revoked from its handler): both events broadcast to +// the whole workspace via the SubscribeAll fanout, and the frontend +// invalidates larkKeys.installations on the lark_installation prefix, so +// every mounted surface (agent Integrations tab, inspector, Settings) +// refreshes its connection badge with no page reload. Covers fresh +// installs and revoked→active re-installs alike — both ride the same +// UpsertLarkInstallation write. Nil-safe. +func (s *RegistrationService) publishInstalled(workspaceID, installationID pgtype.UUID) { + if s.bus == nil { + return + } + s.bus.Publish(events.Event{ + Type: protocol.EventLarkInstallationCreated, + WorkspaceID: uuidString(workspaceID), + ActorType: "system", + Payload: map[string]any{"installation_id": uuidString(installationID)}, + }) +} + // registrationSession is the in-memory state for one in-flight install. type registrationSession struct { id string @@ -248,9 +288,9 @@ type BeginInstallParams struct { // poll status; we deliberately do NOT echo the device_code or the // polling interval (which is internal scheduling state). type BeginInstallResult struct { - SessionID string - QRCodeURL string - ExpiresInSeconds int + SessionID string + QRCodeURL string + ExpiresInSeconds int PollIntervalSeconds int } @@ -505,6 +545,10 @@ func (s *RegistrationService) finishSuccess(ctx context.Context, sess *registrat return } sess.markSuccess(inst.ID, s.gcDeadline()) + // Publish at the commit point so the connection badge updates on every + // workspace client without a page refresh — not only on the tab that + // happens to poll the status endpoint to success. + s.publishInstalled(sess.workspaceID, inst.ID) s.cfg.Logger.Info("lark registration: install complete", "session_id", sess.id, "workspace_id", uuidString(sess.workspaceID), diff --git a/server/internal/integrations/lark/registration_service_test.go b/server/internal/integrations/lark/registration_service_test.go index d9d88914a..58ea340c7 100644 --- a/server/internal/integrations/lark/registration_service_test.go +++ b/server/internal/integrations/lark/registration_service_test.go @@ -10,7 +10,9 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/multica-ai/multica/server/internal/events" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) // These tests cover the pure-Go halves of RegistrationService — @@ -222,6 +224,65 @@ func TestRandomSessionIDUnique(t *testing.T) { } } +// TestRegistrationServicePublishInstalledEmitsCreatedEvent pins the +// MUL-3059 fix: a completed install must publish lark_installation:created +// at the row-write point so every workspace client refreshes its +// connection badge without a page reload. The bug was that this event only +// fired from the HTTP status-poll handler, so any surface that wasn't the +// polling install dialog stayed stale until a manual refresh. The exact +// shape (type, workspace, system actor, installation_id payload) is what +// the SubscribeAll fanout and the frontend lark_installation-prefix +// invalidation depend on. +func TestRegistrationServicePublishInstalledEmitsCreatedEvent(t *testing.T) { + bus := events.New() + var caught []events.Event + bus.Subscribe(protocol.EventLarkInstallationCreated, func(e events.Event) { + caught = append(caught, e) + }) + + svc := newRegistrationServiceForTest(t) + svc.SetEventBus(bus) + + ws := uuidFromStringSvc(t, "11111111-1111-1111-1111-111111111111") + inst := uuidFromStringSvc(t, "22222222-2222-2222-2222-222222222222") + svc.publishInstalled(ws, inst) + + // Exactly one — guards against a future re-introduction of the + // now-removed second emit in the status-poll handler. + if len(caught) != 1 { + t.Fatalf("expected exactly 1 lark_installation:created event, got %d", len(caught)) + } + got := caught[0] + if got.Type != protocol.EventLarkInstallationCreated { + t.Errorf("type = %q, want %q", got.Type, protocol.EventLarkInstallationCreated) + } + if got.WorkspaceID != uuidString(ws) { + t.Errorf("workspace_id = %q, want %q", got.WorkspaceID, uuidString(ws)) + } + if got.ActorType != "system" { + t.Errorf("actor_type = %q, want \"system\"", got.ActorType) + } + payload, ok := got.Payload.(map[string]any) + if !ok { + t.Fatalf("payload type = %T, want map[string]any", got.Payload) + } + if payload["installation_id"] != uuidString(inst) { + t.Errorf("installation_id = %v, want %q", payload["installation_id"], uuidString(inst)) + } +} + +// TestRegistrationServicePublishInstalledNilBusIsNoOp pins that an install +// still completes when no bus is wired — the bus is optional (SetEventBus +// is never called in self-host builds that disable realtime), so the +// publish must be a silent no-op rather than a nil-deref panic. +func TestRegistrationServicePublishInstalledNilBusIsNoOp(t *testing.T) { + svc := newRegistrationServiceForTest(t) // no SetEventBus + svc.publishInstalled( + uuidFromStringSvc(t, "33333333-3333-3333-3333-333333333333"), + uuidFromStringSvc(t, "44444444-4444-4444-4444-444444444444"), + ) +} + // fakeInstallerBinder records BindInstallerTx calls for tests that // need to assert the bind happened. type fakeInstallerBinder struct {