diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index 2a07b552e..92d15cb22 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -319,6 +319,10 @@ func NewRouterWithOptions(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus if rerr != nil { slog.Error("lark: RegistrationService init failed; install disabled", "error", rerr) } else { + // Publish lark_installation:created at row-commit time so the + // connection badge refreshes on every workspace client, not just + // the tab that polls the install status to success. + regSvc.SetEventBus(bus) h.LarkRegistration = regSvc slog.Info("lark device-flow install enabled") } diff --git a/server/internal/handler/lark.go b/server/internal/handler/lark.go index ec5b047bc..d6810d565 100644 --- a/server/internal/handler/lark.go +++ b/server/internal/handler/lark.go @@ -342,16 +342,11 @@ func (h *Handler) GetLarkInstallStatus(w http.ResponseWriter, r *http.Request) { } if state.InstallationID.Valid { resp.InstallationID = uuidToString(state.InstallationID) - // Successful install — emit the lark_installation:created - // event so listeners (Settings tab, agent detail page) refresh - // without waiting for the frontend's cache invalidation. - // Idempotent: the event handler keys on installation_id, and - // a duplicate emit is a no-op refresh. - if state.Status == lark.RegistrationStatusSuccess { - h.publish(protocol.EventLarkInstallationCreated, uuidToString(wsUUID), "system", "", map[string]any{ - "installation_id": resp.InstallationID, - }) - } + // The lark_installation:created event is published by the + // RegistrationService at the row-commit point (see + // registration_service.go finishSuccess), not here — that keeps + // the connection-badge refresh independent of whether any browser + // polls this status endpoint to success. } writeJSON(w, http.StatusOK, resp) } diff --git a/server/internal/integrations/lark/registration_service.go b/server/internal/integrations/lark/registration_service.go index 4efae7029..7b81617c6 100644 --- a/server/internal/integrations/lark/registration_service.go +++ b/server/internal/integrations/lark/registration_service.go @@ -12,7 +12,9 @@ import ( "time" "github.com/jackc/pgx/v5/pgtype" + "github.com/multica-ai/multica/server/internal/events" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) // RegistrationSessionStatus is the discriminated state a `begin` @@ -101,15 +103,22 @@ func (c RegistrationServiceConfig) withDefaults() RegistrationServiceConfig { // into Postgres would add a migration + GC sweep without delivering any // product capability the user can re-use across server restarts. type RegistrationService struct { - cfg RegistrationServiceConfig - client *RegistrationClient - api APIClient - queries *db.Queries - tx TxStarter - installs *InstallationService - binder InstallerBinder + cfg RegistrationServiceConfig + client *RegistrationClient + api APIClient + queries *db.Queries + tx TxStarter + installs *InstallationService + binder InstallerBinder authQueries authQueriesAdapter + // bus is optional. When wired (SetEventBus), a successful install + // publishes lark_installation:created the moment the row commits, so + // every workspace client refreshes its connection badge without + // waiting for a browser to poll the status endpoint to success. Nil + // is valid — install still works, it just won't push the WS frame. + bus *events.Bus + mu sync.Mutex sessions map[string]*registrationSession } @@ -167,6 +176,37 @@ func NewRegistrationService( }, nil } +// SetEventBus wires the optional event bus AFTER construction so the +// six positional constructor-validation cases stay untouched and the +// bus remains nil-safe. With it set, finishSuccess publishes +// lark_installation:created at the row-commit point — the authoritative +// moment of truth — instead of relying on the HTTP status-poll handler +// to emit it only when a browser happens to poll to success. +func (s *RegistrationService) SetEventBus(bus *events.Bus) { + s.bus = bus +} + +// publishInstalled emits lark_installation:created on the optional bus. +// Mirrors the revoke path (RevokeLarkInstallation publishes +// lark_installation:revoked from its handler): both events broadcast to +// the whole workspace via the SubscribeAll fanout, and the frontend +// invalidates larkKeys.installations on the lark_installation prefix, so +// every mounted surface (agent Integrations tab, inspector, Settings) +// refreshes its connection badge with no page reload. Covers fresh +// installs and revoked→active re-installs alike — both ride the same +// UpsertLarkInstallation write. Nil-safe. +func (s *RegistrationService) publishInstalled(workspaceID, installationID pgtype.UUID) { + if s.bus == nil { + return + } + s.bus.Publish(events.Event{ + Type: protocol.EventLarkInstallationCreated, + WorkspaceID: uuidString(workspaceID), + ActorType: "system", + Payload: map[string]any{"installation_id": uuidString(installationID)}, + }) +} + // registrationSession is the in-memory state for one in-flight install. type registrationSession struct { id string @@ -248,9 +288,9 @@ type BeginInstallParams struct { // poll status; we deliberately do NOT echo the device_code or the // polling interval (which is internal scheduling state). type BeginInstallResult struct { - SessionID string - QRCodeURL string - ExpiresInSeconds int + SessionID string + QRCodeURL string + ExpiresInSeconds int PollIntervalSeconds int } @@ -505,6 +545,10 @@ func (s *RegistrationService) finishSuccess(ctx context.Context, sess *registrat return } sess.markSuccess(inst.ID, s.gcDeadline()) + // Publish at the commit point so the connection badge updates on every + // workspace client without a page refresh — not only on the tab that + // happens to poll the status endpoint to success. + s.publishInstalled(sess.workspaceID, inst.ID) s.cfg.Logger.Info("lark registration: install complete", "session_id", sess.id, "workspace_id", uuidString(sess.workspaceID), diff --git a/server/internal/integrations/lark/registration_service_test.go b/server/internal/integrations/lark/registration_service_test.go index d9d88914a..58ea340c7 100644 --- a/server/internal/integrations/lark/registration_service_test.go +++ b/server/internal/integrations/lark/registration_service_test.go @@ -10,7 +10,9 @@ import ( "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgtype" + "github.com/multica-ai/multica/server/internal/events" db "github.com/multica-ai/multica/server/pkg/db/generated" + "github.com/multica-ai/multica/server/pkg/protocol" ) // These tests cover the pure-Go halves of RegistrationService — @@ -222,6 +224,65 @@ func TestRandomSessionIDUnique(t *testing.T) { } } +// TestRegistrationServicePublishInstalledEmitsCreatedEvent pins the +// MUL-3059 fix: a completed install must publish lark_installation:created +// at the row-write point so every workspace client refreshes its +// connection badge without a page reload. The bug was that this event only +// fired from the HTTP status-poll handler, so any surface that wasn't the +// polling install dialog stayed stale until a manual refresh. The exact +// shape (type, workspace, system actor, installation_id payload) is what +// the SubscribeAll fanout and the frontend lark_installation-prefix +// invalidation depend on. +func TestRegistrationServicePublishInstalledEmitsCreatedEvent(t *testing.T) { + bus := events.New() + var caught []events.Event + bus.Subscribe(protocol.EventLarkInstallationCreated, func(e events.Event) { + caught = append(caught, e) + }) + + svc := newRegistrationServiceForTest(t) + svc.SetEventBus(bus) + + ws := uuidFromStringSvc(t, "11111111-1111-1111-1111-111111111111") + inst := uuidFromStringSvc(t, "22222222-2222-2222-2222-222222222222") + svc.publishInstalled(ws, inst) + + // Exactly one — guards against a future re-introduction of the + // now-removed second emit in the status-poll handler. + if len(caught) != 1 { + t.Fatalf("expected exactly 1 lark_installation:created event, got %d", len(caught)) + } + got := caught[0] + if got.Type != protocol.EventLarkInstallationCreated { + t.Errorf("type = %q, want %q", got.Type, protocol.EventLarkInstallationCreated) + } + if got.WorkspaceID != uuidString(ws) { + t.Errorf("workspace_id = %q, want %q", got.WorkspaceID, uuidString(ws)) + } + if got.ActorType != "system" { + t.Errorf("actor_type = %q, want \"system\"", got.ActorType) + } + payload, ok := got.Payload.(map[string]any) + if !ok { + t.Fatalf("payload type = %T, want map[string]any", got.Payload) + } + if payload["installation_id"] != uuidString(inst) { + t.Errorf("installation_id = %v, want %q", payload["installation_id"], uuidString(inst)) + } +} + +// TestRegistrationServicePublishInstalledNilBusIsNoOp pins that an install +// still completes when no bus is wired — the bus is optional (SetEventBus +// is never called in self-host builds that disable realtime), so the +// publish must be a silent no-op rather than a nil-deref panic. +func TestRegistrationServicePublishInstalledNilBusIsNoOp(t *testing.T) { + svc := newRegistrationServiceForTest(t) // no SetEventBus + svc.publishInstalled( + uuidFromStringSvc(t, "33333333-3333-3333-3333-333333333333"), + uuidFromStringSvc(t, "44444444-4444-4444-4444-444444444444"), + ) +} + // fakeInstallerBinder records BindInstallerTx calls for tests that // need to assert the bind happened. type fakeInstallerBinder struct {