From 1d64ea4ba616e42fc85de41719bed59c3bd0dd29 Mon Sep 17 00:00:00 2001 From: Jiayuan Zhang Date: Mon, 13 Apr 2026 14:46:34 +0800 Subject: [PATCH] feat: add online status indicator on agent & member avatars (#821) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add online status indicator dot on agent & member avatars Backend: - Track member presence via WebSocket connections in the Hub - Broadcast member:online/offline events when users connect/disconnect - Add GET /api/workspaces/{id}/members/online endpoint - Add member:online and member:offline event type constants Frontend: - Add isOnline prop to ActorAvatar with a status dot at top-right corner - Green dot = online, gray dot = offline, no dot = status unknown - Fetch online member list via new query, update optimistically on WS events - Derive agent online status from existing agent.status field - Wire online status through ActorAvatar views wrapper (enabled by default) * fix: address code review — fix hub tests and avatar rounding 1. Hub tests: consume the member:online presence event from the first connection before asserting on broadcast messages. 2. ActorAvatar: use rounded-[inherit] on the inner wrapper so callers can override rounding (e.g. rounded-lg for agent list items). * fix: consume member:online presence event in integration test Same fix as the hub unit tests — read and discard the member:online event before asserting on issue:created in TestWebSocketIntegration. --- packages/core/api/client.ts | 4 ++ packages/core/realtime/use-realtime-sync.ts | 33 ++++++++++ packages/core/types/events.ts | 10 +++ packages/core/workspace/hooks.ts | 18 +++++- packages/core/workspace/queries.ts | 11 ++++ .../ui/components/common/actor-avatar.tsx | 42 +++++++++---- packages/views/common/actor-avatar.tsx | 6 +- server/cmd/server/integration_test.go | 8 +++ server/cmd/server/router.go | 1 + server/internal/handler/workspace.go | 7 +++ server/internal/realtime/hub.go | 63 +++++++++++++++++++ server/internal/realtime/hub_test.go | 27 +++++++- server/pkg/protocol/events.go | 2 + 13 files changed, 215 insertions(+), 17 deletions(-) diff --git a/packages/core/api/client.ts b/packages/core/api/client.ts index 7eb25920d..192fe151e 100644 --- a/packages/core/api/client.ts +++ b/packages/core/api/client.ts @@ -525,6 +525,10 @@ export class ApiClient { return this.fetch(`/api/workspaces/${workspaceId}/members`); } + async getOnlineMembers(workspaceId: string): Promise<{ user_ids: string[] }> { + return this.fetch(`/api/workspaces/${workspaceId}/members/online`); + } + async createMember(workspaceId: string, data: CreateMemberRequest): Promise { return this.fetch(`/api/workspaces/${workspaceId}/members`, { method: "POST", diff --git a/packages/core/realtime/use-realtime-sync.ts b/packages/core/realtime/use-realtime-sync.ts index 5a79d955d..e440717dc 100644 --- a/packages/core/realtime/use-realtime-sync.ts +++ b/packages/core/realtime/use-realtime-sync.ts @@ -25,6 +25,8 @@ import type { MemberAddedPayload, WorkspaceDeletedPayload, MemberRemovedPayload, + MemberOnlinePayload, + MemberOfflinePayload, IssueUpdatedPayload, IssueCreatedPayload, IssueDeletedPayload, @@ -133,6 +135,7 @@ export function useRealtimeSync( "issue_reaction:added", "issue_reaction:removed", "subscriber:added", "subscriber:removed", "daemon:heartbeat", + "member:online", "member:offline", ]); const unsubAny = ws.onAny((msg) => { @@ -242,6 +245,33 @@ export function useRealtimeSync( if (issue_id) qc.invalidateQueries({ queryKey: issueKeys.subscribers(issue_id) }); }); + // --- Member presence handlers (optimistic cache updates) --- + + const unsubMemberOnline = ws.on("member:online", (p) => { + const { user_id } = p as MemberOnlinePayload; + if (!user_id) return; + const wsId = workspaceStore.getState().workspace?.id; + if (wsId) { + qc.setQueryData(workspaceKeys.onlineMembers(wsId), (old) => { + if (!old) return [user_id]; + if (old.includes(user_id)) return old; + return [...old, user_id]; + }); + } + }); + + const unsubMemberOffline = ws.on("member:offline", (p) => { + const { user_id } = p as MemberOfflinePayload; + if (!user_id) return; + const wsId = workspaceStore.getState().workspace?.id; + if (wsId) { + qc.setQueryData(workspaceKeys.onlineMembers(wsId), (old) => { + if (!old) return old; + return old.filter((id) => id !== user_id); + }); + } + }); + // --- Side-effect handlers (toast, navigation) --- const unsubWsDeleted = ws.on("workspace:deleted", (p) => { @@ -302,6 +332,8 @@ export function useRealtimeSync( unsubWsDeleted(); unsubMemberRemoved(); unsubMemberAdded(); + unsubMemberOnline(); + unsubMemberOffline(); timers.forEach(clearTimeout); timers.clear(); }; @@ -320,6 +352,7 @@ export function useRealtimeSync( qc.invalidateQueries({ queryKey: inboxKeys.all(wsId) }); qc.invalidateQueries({ queryKey: workspaceKeys.agents(wsId) }); qc.invalidateQueries({ queryKey: workspaceKeys.members(wsId) }); + qc.invalidateQueries({ queryKey: workspaceKeys.onlineMembers(wsId) }); qc.invalidateQueries({ queryKey: workspaceKeys.skills(wsId) }); qc.invalidateQueries({ queryKey: projectKeys.all(wsId) }); qc.invalidateQueries({ queryKey: runtimeKeys.all(wsId) }); diff --git a/packages/core/types/events.ts b/packages/core/types/events.ts index 2f21527fa..dc3011bb3 100644 --- a/packages/core/types/events.ts +++ b/packages/core/types/events.ts @@ -34,6 +34,8 @@ export type WSEventType = | "member:added" | "member:updated" | "member:removed" + | "member:online" + | "member:offline" | "daemon:heartbeat" | "daemon:register" | "skill:created" @@ -149,6 +151,14 @@ export interface MemberRemovedPayload { workspace_id: string; } +export interface MemberOnlinePayload { + user_id: string; +} + +export interface MemberOfflinePayload { + user_id: string; +} + export interface SubscriberAddedPayload { issue_id: string; user_type: string; diff --git a/packages/core/workspace/hooks.ts b/packages/core/workspace/hooks.ts index ecfad9742..257c23160 100644 --- a/packages/core/workspace/hooks.ts +++ b/packages/core/workspace/hooks.ts @@ -2,12 +2,13 @@ import { useQuery } from "@tanstack/react-query"; import { useWorkspaceId } from "../hooks"; -import { memberListOptions, agentListOptions } from "./queries"; +import { memberListOptions, agentListOptions, onlineMembersOptions } from "./queries"; export function useActorName() { const wsId = useWorkspaceId(); const { data: members = [] } = useQuery(memberListOptions(wsId)); const { data: agents = [] } = useQuery(agentListOptions(wsId)); + const { data: onlineUserIds } = useQuery(onlineMembersOptions(wsId)); const getMemberName = (userId: string) => { const m = members.find((m) => m.user_id === userId); @@ -41,5 +42,18 @@ export function useActorName() { return null; }; - return { getMemberName, getAgentName, getActorName, getActorInitials, getActorAvatarUrl }; + const getActorOnlineStatus = (type: string, id: string): boolean | undefined => { + if (type === "agent") { + const agent = agents.find((a) => a.id === id); + if (!agent) return undefined; + return agent.status !== "offline"; + } + if (type === "member") { + if (!onlineUserIds) return undefined; + return onlineUserIds.includes(id); + } + return undefined; + }; + + return { getMemberName, getAgentName, getActorName, getActorInitials, getActorAvatarUrl, getActorOnlineStatus }; } diff --git a/packages/core/workspace/queries.ts b/packages/core/workspace/queries.ts index 1924e7a27..028f3dcb9 100644 --- a/packages/core/workspace/queries.ts +++ b/packages/core/workspace/queries.ts @@ -5,6 +5,7 @@ export const workspaceKeys = { all: (wsId: string) => ["workspaces", wsId] as const, list: () => ["workspaces", "list"] as const, members: (wsId: string) => ["workspaces", wsId, "members"] as const, + onlineMembers: (wsId: string) => ["workspaces", wsId, "online-members"] as const, agents: (wsId: string) => ["workspaces", wsId, "agents"] as const, skills: (wsId: string) => ["workspaces", wsId, "skills"] as const, assigneeFrequency: (wsId: string) => ["workspaces", wsId, "assignee-frequency"] as const, @@ -24,6 +25,16 @@ export function memberListOptions(wsId: string) { }); } +export function onlineMembersOptions(wsId: string) { + return queryOptions({ + queryKey: workspaceKeys.onlineMembers(wsId), + queryFn: async () => { + const res = await api.getOnlineMembers(wsId); + return res.user_ids; + }, + }); +} + export function agentListOptions(wsId: string) { return queryOptions({ queryKey: workspaceKeys.agents(wsId), diff --git a/packages/ui/components/common/actor-avatar.tsx b/packages/ui/components/common/actor-avatar.tsx index 1629dc1fc..de835f96e 100644 --- a/packages/ui/components/common/actor-avatar.tsx +++ b/packages/ui/components/common/actor-avatar.tsx @@ -9,6 +9,7 @@ interface ActorAvatarProps { initials: string; avatarUrl?: string | null; isAgent?: boolean; + isOnline?: boolean; size?: number; className?: string; } @@ -18,6 +19,7 @@ function ActorAvatar({ initials, avatarUrl, isAgent, + isOnline, size = 20, className, }: ActorAvatarProps) { @@ -28,28 +30,46 @@ function ActorAvatar({ setImgError(false); }, [avatarUrl]); + // Status dot size scales with avatar size + const dotSize = Math.max(6, Math.round(size * 0.3)); + return (
- {avatarUrl && !imgError ? ( - {name} setImgError(true)} +
+ {avatarUrl && !imgError ? ( + {name} setImgError(true)} + /> + ) : isAgent ? ( +
+ +
+ ) : ( +
+ {initials} +
+ )} +
+ {isOnline !== undefined && ( + - ) : isAgent ? ( - - ) : ( - initials )}
); diff --git a/packages/views/common/actor-avatar.tsx b/packages/views/common/actor-avatar.tsx index 036074118..af3b3045f 100644 --- a/packages/views/common/actor-avatar.tsx +++ b/packages/views/common/actor-avatar.tsx @@ -8,16 +8,18 @@ interface ActorAvatarProps { actorId: string; size?: number; className?: string; + showOnlineStatus?: boolean; } -export function ActorAvatar({ actorType, actorId, size, className }: ActorAvatarProps) { - const { getActorName, getActorInitials, getActorAvatarUrl } = useActorName(); +export function ActorAvatar({ actorType, actorId, size, className, showOnlineStatus = true }: ActorAvatarProps) { + const { getActorName, getActorInitials, getActorAvatarUrl, getActorOnlineStatus } = useActorName(); return ( diff --git a/server/cmd/server/integration_test.go b/server/cmd/server/integration_test.go index 5bd1549d0..826081a60 100644 --- a/server/cmd/server/integration_test.go +++ b/server/cmd/server/integration_test.go @@ -758,6 +758,14 @@ func TestWebSocketIntegration(t *testing.T) { // Allow Hub goroutine to process the register and add client to room time.Sleep(100 * time.Millisecond) + // Consume the member:online presence event sent on first connection + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + _, _, err = conn.ReadMessage() + if err != nil { + t.Fatalf("failed to read presence message: %v", err) + } + conn.SetReadDeadline(time.Time{}) + // Create an issue — this should trigger a WebSocket broadcast resp := authRequest(t, "POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{ "title": "WebSocket test issue", diff --git a/server/cmd/server/router.go b/server/cmd/server/router.go index e9bc9ecd1..d46c790ff 100644 --- a/server/cmd/server/router.go +++ b/server/cmd/server/router.go @@ -162,6 +162,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route r.Use(middleware.RequireWorkspaceMemberFromURL(queries, "id")) r.Get("/", h.GetWorkspace) r.Get("/members", h.ListMembersWithUser) + r.Get("/members/online", h.GetOnlineMembers) r.Post("/leave", h.LeaveWorkspace) }) // Admin-level access diff --git a/server/internal/handler/workspace.go b/server/internal/handler/workspace.go index 5e4b42987..1bfae190d 100644 --- a/server/internal/handler/workspace.go +++ b/server/internal/handler/workspace.go @@ -317,6 +317,13 @@ func (h *Handler) ListMembersWithUser(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, resp) } +// GetOnlineMembers returns the user IDs of members currently connected via WebSocket. +func (h *Handler) GetOnlineMembers(w http.ResponseWriter, r *http.Request) { + workspaceID := workspaceIDFromURL(r, "id") + userIDs := h.Hub.OnlineUserIDs(workspaceID) + writeJSON(w, http.StatusOK, map[string]any{"user_ids": userIDs}) +} + type CreateMemberRequest struct { Email string `json:"email"` Role string `json:"role"` diff --git a/server/internal/realtime/hub.go b/server/internal/realtime/hub.go index 30d864882..a74d06bc6 100644 --- a/server/internal/realtime/hub.go +++ b/server/internal/realtime/hub.go @@ -2,6 +2,7 @@ package realtime import ( "context" + "encoding/json" "log/slog" "net/http" "os" @@ -120,6 +121,14 @@ func (h *Hub) Run() { if h.rooms[room] == nil { h.rooms[room] = make(map[*Client]bool) } + // Check if user was already online in this workspace + wasOnline := false + for c := range h.rooms[room] { + if c.userID == client.userID { + wasOnline = true + break + } + } h.rooms[room][client] = true total := 0 for _, r := range h.rooms { @@ -128,6 +137,10 @@ func (h *Hub) Run() { h.mu.Unlock() slog.Info("ws client connected", "workspace_id", room, "total_clients", total) + if !wasOnline { + h.broadcastPresence(room, client.userID, true) + } + case client := <-h.unregister: h.mu.Lock() room := client.workspaceID @@ -140,6 +153,16 @@ func (h *Hub) Run() { } } } + // Check if user still has connections in this workspace + stillOnline := false + if clients, ok := h.rooms[room]; ok { + for c := range clients { + if c.userID == client.userID { + stillOnline = true + break + } + } + } total := 0 for _, r := range h.rooms { total += len(r) @@ -147,6 +170,10 @@ func (h *Hub) Run() { h.mu.Unlock() slog.Info("ws client disconnected", "workspace_id", room, "total_clients", total) + if !stillOnline { + h.broadcastPresence(room, client.userID, false) + } + case message := <-h.broadcast: // Global broadcast for daemon events (no workspace filtering) h.mu.RLock() @@ -272,6 +299,42 @@ func (h *Hub) Broadcast(message []byte) { h.broadcast <- message } +// OnlineUserIDs returns the unique user IDs with active WebSocket connections +// in the given workspace. +func (h *Hub) OnlineUserIDs(workspaceID string) []string { + h.mu.RLock() + defer h.mu.RUnlock() + + seen := make(map[string]bool) + if clients, ok := h.rooms[workspaceID]; ok { + for c := range clients { + seen[c.userID] = true + } + } + + ids := make([]string, 0, len(seen)) + for id := range seen { + ids = append(ids, id) + } + return ids +} + +// broadcastPresence sends a member:online or member:offline event to the workspace. +func (h *Hub) broadcastPresence(workspaceID, userID string, online bool) { + eventType := "member:offline" + if online { + eventType = "member:online" + } + data, err := json.Marshal(map[string]any{ + "type": eventType, + "payload": map[string]any{"user_id": userID}, + }) + if err != nil { + return + } + h.BroadcastToWorkspace(workspaceID, data) +} + // HandleWebSocket upgrades an HTTP connection to WebSocket with JWT, PAT, or cookie auth. func HandleWebSocket(hub *Hub, mc MembershipChecker, pr PATResolver, w http.ResponseWriter, r *http.Request) { workspaceID := r.URL.Query().Get("workspace_id") diff --git a/server/internal/realtime/hub_test.go b/server/internal/realtime/hub_test.go index 2187f8d58..535923364 100644 --- a/server/internal/realtime/hub_test.go +++ b/server/internal/realtime/hub_test.go @@ -60,6 +60,19 @@ func connectWS(t *testing.T, server *httptest.Server) *websocket.Conn { return conn } +// readOneMessage reads and discards exactly one message from the connection. +// Used to consume the member:online presence event sent when the first client +// for a user registers. +func readOneMessage(t *testing.T, conn *websocket.Conn) { + t.Helper() + conn.SetReadDeadline(time.Now().Add(2 * time.Second)) + _, _, err := conn.ReadMessage() + if err != nil { + t.Fatalf("expected presence message but got error: %v", err) + } + conn.SetReadDeadline(time.Time{}) +} + // totalClients counts all clients across all rooms. func totalClients(hub *Hub) int { hub.mu.RLock() @@ -92,9 +105,12 @@ func TestHub_Broadcast(t *testing.T) { conn1 := connectWS(t, server) defer conn1.Close() + // First connection triggers member:online; consume it before proceeding. + time.Sleep(50 * time.Millisecond) + readOneMessage(t, conn1) + conn2 := connectWS(t, server) defer conn2.Close() - time.Sleep(50 * time.Millisecond) msg := []byte(`{"type":"issue:created","data":"test"}`) @@ -147,7 +163,13 @@ func TestHub_BroadcastToMultipleClients(t *testing.T) { const numClients = 5 conns := make([]*websocket.Conn, numClients) - for i := 0; i < numClients; i++ { + conns[0] = connectWS(t, server) + defer conns[0].Close() + // First connection triggers member:online; consume it before adding more clients. + time.Sleep(50 * time.Millisecond) + readOneMessage(t, conns[0]) + + for i := 1; i < numClients; i++ { conns[i] = connectWS(t, server) defer conns[i].Close() } @@ -182,6 +204,7 @@ func TestHub_MultipleBroadcasts(t *testing.T) { defer conn.Close() time.Sleep(50 * time.Millisecond) + readOneMessage(t, conn) // consume member:online presence event messages := []string{ `{"type":"issue:created"}`, diff --git a/server/pkg/protocol/events.go b/server/pkg/protocol/events.go index b7997ca91..0237acd06 100644 --- a/server/pkg/protocol/events.go +++ b/server/pkg/protocol/events.go @@ -45,6 +45,8 @@ const ( EventMemberAdded = "member:added" EventMemberUpdated = "member:updated" EventMemberRemoved = "member:removed" + EventMemberOnline = "member:online" + EventMemberOffline = "member:offline" // Subscriber events EventSubscriberAdded = "subscriber:added"