Files
LinYushen 91424752ac feat(realtime): phase 0 — extract Broadcaster interface + add metrics (MUL-1138) (#1429)
* feat(realtime): phase 0 — extract Broadcaster interface + add metrics

Phase 0 of the WebSocket horizontal-scaling plan tracked in MUL-1138.
This change is intentionally behavior-preserving: it sets up the seams
needed for later phases (subscribe/unsubscribe protocol, scope-level
fanout, Redis Streams relay) without altering any wire protocol or
producer call sites.

What changed
- New realtime.Broadcaster interface covering the three fanout methods
  producers already use on *Hub (BroadcastToWorkspace, SendToUser,
  Broadcast). *Hub continues to satisfy it; a future Redis-backed
  implementation can be dropped in without touching listeners.
- registerListeners now depends on realtime.Broadcaster instead of
  *realtime.Hub, isolating the bus → realtime fanout layer behind an
  interface.
- New realtime.Metrics singleton with atomic counters: connects,
  disconnects, active connections, slow-client evictions, total
  messages sent/dropped, and per-event-type send counters. Wired into
  Hub register/unregister/broadcast paths and into every listener.
- New GET /health/realtime endpoint returning a JSON snapshot of the
  metrics so we can observe baseline fanout pressure before phase 1.

Why phase 0 first
GPT-Boy's only-Redis plan and CC-Girl's review both call out the same
prerequisite: get a Broadcaster seam and visibility in place before
introducing scope-level subscriptions or a Redis relay. Doing this as
a standalone step keeps each later PR focused and trivially revertable.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat(realtime): only-Redis fanout — scopes, subscribe protocol, Redis Streams relay (MUL-1138)

Implements the final-version plan agreed in MUL-1138 on top of phase 0:

* Hub: 4 scope types (workspace/user/task/chat), per-client subscription
  set, subscribe/unsubscribe WS frames, ScopeAuthorizer hook for
  task/chat scope auth, first/last-subscriber callbacks for the relay,
  workspace+user auto-subscribe on connect.
* RedisRelay: Broadcaster impl that XADDs every event into
  ws:scope:{type}:{id}:stream and XREADGROUPs only the scopes for which
  this node has live subscribers. Per-node consumer group, heartbeat,
  stale-consumer sweeper, MAXLEN cap, lag/disconnect metrics.
* Listeners: route task:* events to ScopeTask, chat:* events to
  ScopeChat; workspace remains the default for everything else.
* events.Event: optional TaskID / ChatSessionID hints so the listener
  layer can pick the right scope without re-parsing payloads.
* Handler: publishTask / publishChat helpers; chat + task message
  publishers updated to use them.
* main.go: when REDIS_URL is set, wrap the hub with NewRedisRelay and
  pass the relay (instead of the hub) to registerListeners. A
  db-backed ScopeAuthorizer enforces that task/chat subscribes belong
  to the caller's workspace.
* Metrics: per-scope subscribe/deny counters, redis connect state, node
  id, lag/dropped counters surfaced via /health/realtime.

Behavior in single-node mode (REDIS_URL unset) is unchanged.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(realtime): address PR #1429 review must-fix items (MUL-1138)

- listeners: keep task/chat events on workspace fanout until the WS
  client supports scope-subscribe + reconnect-replay. Routing them
  through BroadcastToScope today (without any client subscriber) would
  silently drop every chat / task message and break the live timeline,
  chat unread badges, and pending-task UI. The server-side scope infra
  (Hub subscribe/unsubscribe, ScopeAuthorizer, Redis Streams relay)
  stays in place so flipping the switch in the client follow-up PR is
  a one-line change.

- scope_authorizer: ScopeChat now enforces CreatorID == userID, mirroring
  the HTTP layer (handler/chat.go: GetChatSession / SendChatMessage /
  MarkChatSessionRead). Without this, any workspace member who learned a
  session_id could subscribe to chat:message / chat:done /
  chat:session_read for a peer's private chat. The same creator-only
  check is applied to ScopeTask when the task is a chat task
  (task.ChatSessionID set). Issue tasks remain workspace-scoped.

- Refactor scope authorizer to depend on a narrow scopeAuthQuerier
  interface so its decisions can be unit-tested without a live DB.

- Add tests:
  * listeners_scope_test.go pins the workspace-fanout fallback for
    task:message / task:progress / chat:message / chat:done /
    chat:session_read.
  * scope_authorizer_test.go covers chat creator-only access, chat-task
    creator-only access, and issue-task workspace-only access (creator
    allowed, peer denied, cross-workspace denied, missing session
    denied, empty userID denied).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: CC-Girl <cc-girl@multica.ai>
2026-04-23 13:36:55 +08:00

89 lines
2.4 KiB
Go

package events
import (
"log/slog"
"sync"
)
// Event represents a domain event published by handlers or services.
type Event struct {
Type string // e.g. "issue:created", "inbox:new"
WorkspaceID string // routes to correct Hub room
ActorType string // "member", "agent", or "system"
ActorID string
Payload any // JSON-serializable, same shape as current WS payloads
// Optional scope hints used by the realtime fanout layer to route the
// event to a more specific scope than `workspace:{WorkspaceID}`. When set
// these tell the listener which Redis stream / Hub room to publish on
// without re-deserializing Payload. See MUL-1138 phase 1.
TaskID string
ChatSessionID string
}
// Handler is a function that processes an event.
type Handler func(Event)
// Bus is an in-process synchronous pub/sub event bus.
type Bus struct {
mu sync.RWMutex
listeners map[string][]Handler
globalHandlers []Handler
}
// New creates a new event bus.
func New() *Bus {
return &Bus{
listeners: make(map[string][]Handler),
}
}
// Subscribe registers a handler for a given event type.
// Handlers are called synchronously in registration order.
func (b *Bus) Subscribe(eventType string, h Handler) {
b.mu.Lock()
defer b.mu.Unlock()
b.listeners[eventType] = append(b.listeners[eventType], h)
}
// SubscribeAll registers a handler that receives ALL events regardless of type.
// Global handlers are called after type-specific handlers.
func (b *Bus) SubscribeAll(h Handler) {
b.mu.Lock()
defer b.mu.Unlock()
b.globalHandlers = append(b.globalHandlers, h)
}
// Publish dispatches an event to all registered handlers for that event type.
// Type-specific handlers run first, then global (SubscribeAll) handlers.
// Each handler is called synchronously. Panics in individual handlers are
// recovered so one failing handler does not prevent others from executing.
func (b *Bus) Publish(e Event) {
b.mu.RLock()
handlers := b.listeners[e.Type]
globals := b.globalHandlers
b.mu.RUnlock()
for _, h := range handlers {
func() {
defer func() {
if r := recover(); r != nil {
slog.Error("panic in event listener", "event_type", e.Type, "recovered", r)
}
}()
h(e)
}()
}
for _, h := range globals {
func() {
defer func() {
if r := recover(); r != nil {
slog.Error("panic in global event listener", "event_type", e.Type, "recovered", r)
}
}()
h(e)
}()
}
}