mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 03:38:32 +02:00
* feat(realtime): phase 0 — extract Broadcaster interface + add metrics Phase 0 of the WebSocket horizontal-scaling plan tracked in MUL-1138. This change is intentionally behavior-preserving: it sets up the seams needed for later phases (subscribe/unsubscribe protocol, scope-level fanout, Redis Streams relay) without altering any wire protocol or producer call sites. What changed - New realtime.Broadcaster interface covering the three fanout methods producers already use on *Hub (BroadcastToWorkspace, SendToUser, Broadcast). *Hub continues to satisfy it; a future Redis-backed implementation can be dropped in without touching listeners. - registerListeners now depends on realtime.Broadcaster instead of *realtime.Hub, isolating the bus → realtime fanout layer behind an interface. - New realtime.Metrics singleton with atomic counters: connects, disconnects, active connections, slow-client evictions, total messages sent/dropped, and per-event-type send counters. Wired into Hub register/unregister/broadcast paths and into every listener. - New GET /health/realtime endpoint returning a JSON snapshot of the metrics so we can observe baseline fanout pressure before phase 1. Why phase 0 first GPT-Boy's only-Redis plan and CC-Girl's review both call out the same prerequisite: get a Broadcaster seam and visibility in place before introducing scope-level subscriptions or a Redis relay. Doing this as a standalone step keeps each later PR focused and trivially revertable. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * feat(realtime): only-Redis fanout — scopes, subscribe protocol, Redis Streams relay (MUL-1138) Implements the final-version plan agreed in MUL-1138 on top of phase 0: * Hub: 4 scope types (workspace/user/task/chat), per-client subscription set, subscribe/unsubscribe WS frames, ScopeAuthorizer hook for task/chat scope auth, first/last-subscriber callbacks for the relay, workspace+user auto-subscribe on connect. * RedisRelay: Broadcaster impl that XADDs every event into ws:scope:{type}:{id}:stream and XREADGROUPs only the scopes for which this node has live subscribers. Per-node consumer group, heartbeat, stale-consumer sweeper, MAXLEN cap, lag/disconnect metrics. * Listeners: route task:* events to ScopeTask, chat:* events to ScopeChat; workspace remains the default for everything else. * events.Event: optional TaskID / ChatSessionID hints so the listener layer can pick the right scope without re-parsing payloads. * Handler: publishTask / publishChat helpers; chat + task message publishers updated to use them. * main.go: when REDIS_URL is set, wrap the hub with NewRedisRelay and pass the relay (instead of the hub) to registerListeners. A db-backed ScopeAuthorizer enforces that task/chat subscribes belong to the caller's workspace. * Metrics: per-scope subscribe/deny counters, redis connect state, node id, lag/dropped counters surfaced via /health/realtime. Behavior in single-node mode (REDIS_URL unset) is unchanged. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(realtime): address PR #1429 review must-fix items (MUL-1138) - listeners: keep task/chat events on workspace fanout until the WS client supports scope-subscribe + reconnect-replay. Routing them through BroadcastToScope today (without any client subscriber) would silently drop every chat / task message and break the live timeline, chat unread badges, and pending-task UI. The server-side scope infra (Hub subscribe/unsubscribe, ScopeAuthorizer, Redis Streams relay) stays in place so flipping the switch in the client follow-up PR is a one-line change. - scope_authorizer: ScopeChat now enforces CreatorID == userID, mirroring the HTTP layer (handler/chat.go: GetChatSession / SendChatMessage / MarkChatSessionRead). Without this, any workspace member who learned a session_id could subscribe to chat:message / chat:done / chat:session_read for a peer's private chat. The same creator-only check is applied to ScopeTask when the task is a chat task (task.ChatSessionID set). Issue tasks remain workspace-scoped. - Refactor scope authorizer to depend on a narrow scopeAuthQuerier interface so its decisions can be unit-tested without a live DB. - Add tests: * listeners_scope_test.go pins the workspace-fanout fallback for task:message / task:progress / chat:message / chat:done / chat:session_read. * scope_authorizer_test.go covers chat creator-only access, chat-task creator-only access, and issue-task workspace-only access (creator allowed, peer denied, cross-workspace denied, missing session denied, empty userID denied). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: CC-Girl <cc-girl@multica.ai>
89 lines
2.4 KiB
Go
89 lines
2.4 KiB
Go
package events
|
|
|
|
import (
|
|
"log/slog"
|
|
"sync"
|
|
)
|
|
|
|
// Event represents a domain event published by handlers or services.
|
|
type Event struct {
|
|
Type string // e.g. "issue:created", "inbox:new"
|
|
WorkspaceID string // routes to correct Hub room
|
|
ActorType string // "member", "agent", or "system"
|
|
ActorID string
|
|
Payload any // JSON-serializable, same shape as current WS payloads
|
|
|
|
// Optional scope hints used by the realtime fanout layer to route the
|
|
// event to a more specific scope than `workspace:{WorkspaceID}`. When set
|
|
// these tell the listener which Redis stream / Hub room to publish on
|
|
// without re-deserializing Payload. See MUL-1138 phase 1.
|
|
TaskID string
|
|
ChatSessionID string
|
|
}
|
|
|
|
// Handler is a function that processes an event.
|
|
type Handler func(Event)
|
|
|
|
// Bus is an in-process synchronous pub/sub event bus.
|
|
type Bus struct {
|
|
mu sync.RWMutex
|
|
listeners map[string][]Handler
|
|
globalHandlers []Handler
|
|
}
|
|
|
|
// New creates a new event bus.
|
|
func New() *Bus {
|
|
return &Bus{
|
|
listeners: make(map[string][]Handler),
|
|
}
|
|
}
|
|
|
|
// Subscribe registers a handler for a given event type.
|
|
// Handlers are called synchronously in registration order.
|
|
func (b *Bus) Subscribe(eventType string, h Handler) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
b.listeners[eventType] = append(b.listeners[eventType], h)
|
|
}
|
|
|
|
// SubscribeAll registers a handler that receives ALL events regardless of type.
|
|
// Global handlers are called after type-specific handlers.
|
|
func (b *Bus) SubscribeAll(h Handler) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
b.globalHandlers = append(b.globalHandlers, h)
|
|
}
|
|
|
|
// Publish dispatches an event to all registered handlers for that event type.
|
|
// Type-specific handlers run first, then global (SubscribeAll) handlers.
|
|
// Each handler is called synchronously. Panics in individual handlers are
|
|
// recovered so one failing handler does not prevent others from executing.
|
|
func (b *Bus) Publish(e Event) {
|
|
b.mu.RLock()
|
|
handlers := b.listeners[e.Type]
|
|
globals := b.globalHandlers
|
|
b.mu.RUnlock()
|
|
|
|
for _, h := range handlers {
|
|
func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
slog.Error("panic in event listener", "event_type", e.Type, "recovered", r)
|
|
}
|
|
}()
|
|
h(e)
|
|
}()
|
|
}
|
|
|
|
for _, h := range globals {
|
|
func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
slog.Error("panic in global event listener", "event_type", e.Type, "recovered", r)
|
|
}
|
|
}()
|
|
h(e)
|
|
}()
|
|
}
|
|
}
|