mirror of
https://github.com/multica-ai/multica.git
synced 2026-06-17 11:48:42 +02:00
feat: add online status indicator on agent & member avatars (#821)
* feat: add online status indicator dot on agent & member avatars
Backend:
- Track member presence via WebSocket connections in the Hub
- Broadcast member:online/offline events when users connect/disconnect
- Add GET /api/workspaces/{id}/members/online endpoint
- Add member:online and member:offline event type constants
Frontend:
- Add isOnline prop to ActorAvatar with a status dot at top-right corner
- Green dot = online, gray dot = offline, no dot = status unknown
- Fetch online member list via new query, update optimistically on WS events
- Derive agent online status from existing agent.status field
- Wire online status through ActorAvatar views wrapper (enabled by default)
* fix: address code review — fix hub tests and avatar rounding
1. Hub tests: consume the member:online presence event from the first
connection before asserting on broadcast messages.
2. ActorAvatar: use rounded-[inherit] on the inner wrapper so callers
can override rounding (e.g. rounded-lg for agent list items).
* fix: consume member:online presence event in integration test
Same fix as the hub unit tests — read and discard the member:online
event before asserting on issue:created in TestWebSocketIntegration.
This commit is contained in:
@@ -525,6 +525,10 @@ export class ApiClient {
|
||||
return this.fetch(`/api/workspaces/${workspaceId}/members`);
|
||||
}
|
||||
|
||||
async getOnlineMembers(workspaceId: string): Promise<{ user_ids: string[] }> {
|
||||
return this.fetch(`/api/workspaces/${workspaceId}/members/online`);
|
||||
}
|
||||
|
||||
async createMember(workspaceId: string, data: CreateMemberRequest): Promise<MemberWithUser> {
|
||||
return this.fetch(`/api/workspaces/${workspaceId}/members`, {
|
||||
method: "POST",
|
||||
|
||||
@@ -25,6 +25,8 @@ import type {
|
||||
MemberAddedPayload,
|
||||
WorkspaceDeletedPayload,
|
||||
MemberRemovedPayload,
|
||||
MemberOnlinePayload,
|
||||
MemberOfflinePayload,
|
||||
IssueUpdatedPayload,
|
||||
IssueCreatedPayload,
|
||||
IssueDeletedPayload,
|
||||
@@ -133,6 +135,7 @@ export function useRealtimeSync(
|
||||
"issue_reaction:added", "issue_reaction:removed",
|
||||
"subscriber:added", "subscriber:removed",
|
||||
"daemon:heartbeat",
|
||||
"member:online", "member:offline",
|
||||
]);
|
||||
|
||||
const unsubAny = ws.onAny((msg) => {
|
||||
@@ -242,6 +245,33 @@ export function useRealtimeSync(
|
||||
if (issue_id) qc.invalidateQueries({ queryKey: issueKeys.subscribers(issue_id) });
|
||||
});
|
||||
|
||||
// --- Member presence handlers (optimistic cache updates) ---
|
||||
|
||||
const unsubMemberOnline = ws.on("member:online", (p) => {
|
||||
const { user_id } = p as MemberOnlinePayload;
|
||||
if (!user_id) return;
|
||||
const wsId = workspaceStore.getState().workspace?.id;
|
||||
if (wsId) {
|
||||
qc.setQueryData<string[]>(workspaceKeys.onlineMembers(wsId), (old) => {
|
||||
if (!old) return [user_id];
|
||||
if (old.includes(user_id)) return old;
|
||||
return [...old, user_id];
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
const unsubMemberOffline = ws.on("member:offline", (p) => {
|
||||
const { user_id } = p as MemberOfflinePayload;
|
||||
if (!user_id) return;
|
||||
const wsId = workspaceStore.getState().workspace?.id;
|
||||
if (wsId) {
|
||||
qc.setQueryData<string[]>(workspaceKeys.onlineMembers(wsId), (old) => {
|
||||
if (!old) return old;
|
||||
return old.filter((id) => id !== user_id);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// --- Side-effect handlers (toast, navigation) ---
|
||||
|
||||
const unsubWsDeleted = ws.on("workspace:deleted", (p) => {
|
||||
@@ -302,6 +332,8 @@ export function useRealtimeSync(
|
||||
unsubWsDeleted();
|
||||
unsubMemberRemoved();
|
||||
unsubMemberAdded();
|
||||
unsubMemberOnline();
|
||||
unsubMemberOffline();
|
||||
timers.forEach(clearTimeout);
|
||||
timers.clear();
|
||||
};
|
||||
@@ -320,6 +352,7 @@ export function useRealtimeSync(
|
||||
qc.invalidateQueries({ queryKey: inboxKeys.all(wsId) });
|
||||
qc.invalidateQueries({ queryKey: workspaceKeys.agents(wsId) });
|
||||
qc.invalidateQueries({ queryKey: workspaceKeys.members(wsId) });
|
||||
qc.invalidateQueries({ queryKey: workspaceKeys.onlineMembers(wsId) });
|
||||
qc.invalidateQueries({ queryKey: workspaceKeys.skills(wsId) });
|
||||
qc.invalidateQueries({ queryKey: projectKeys.all(wsId) });
|
||||
qc.invalidateQueries({ queryKey: runtimeKeys.all(wsId) });
|
||||
|
||||
@@ -34,6 +34,8 @@ export type WSEventType =
|
||||
| "member:added"
|
||||
| "member:updated"
|
||||
| "member:removed"
|
||||
| "member:online"
|
||||
| "member:offline"
|
||||
| "daemon:heartbeat"
|
||||
| "daemon:register"
|
||||
| "skill:created"
|
||||
@@ -149,6 +151,14 @@ export interface MemberRemovedPayload {
|
||||
workspace_id: string;
|
||||
}
|
||||
|
||||
export interface MemberOnlinePayload {
|
||||
user_id: string;
|
||||
}
|
||||
|
||||
export interface MemberOfflinePayload {
|
||||
user_id: string;
|
||||
}
|
||||
|
||||
export interface SubscriberAddedPayload {
|
||||
issue_id: string;
|
||||
user_type: string;
|
||||
|
||||
@@ -2,12 +2,13 @@
|
||||
|
||||
import { useQuery } from "@tanstack/react-query";
|
||||
import { useWorkspaceId } from "../hooks";
|
||||
import { memberListOptions, agentListOptions } from "./queries";
|
||||
import { memberListOptions, agentListOptions, onlineMembersOptions } from "./queries";
|
||||
|
||||
export function useActorName() {
|
||||
const wsId = useWorkspaceId();
|
||||
const { data: members = [] } = useQuery(memberListOptions(wsId));
|
||||
const { data: agents = [] } = useQuery(agentListOptions(wsId));
|
||||
const { data: onlineUserIds } = useQuery(onlineMembersOptions(wsId));
|
||||
|
||||
const getMemberName = (userId: string) => {
|
||||
const m = members.find((m) => m.user_id === userId);
|
||||
@@ -41,5 +42,18 @@ export function useActorName() {
|
||||
return null;
|
||||
};
|
||||
|
||||
return { getMemberName, getAgentName, getActorName, getActorInitials, getActorAvatarUrl };
|
||||
const getActorOnlineStatus = (type: string, id: string): boolean | undefined => {
|
||||
if (type === "agent") {
|
||||
const agent = agents.find((a) => a.id === id);
|
||||
if (!agent) return undefined;
|
||||
return agent.status !== "offline";
|
||||
}
|
||||
if (type === "member") {
|
||||
if (!onlineUserIds) return undefined;
|
||||
return onlineUserIds.includes(id);
|
||||
}
|
||||
return undefined;
|
||||
};
|
||||
|
||||
return { getMemberName, getAgentName, getActorName, getActorInitials, getActorAvatarUrl, getActorOnlineStatus };
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ export const workspaceKeys = {
|
||||
all: (wsId: string) => ["workspaces", wsId] as const,
|
||||
list: () => ["workspaces", "list"] as const,
|
||||
members: (wsId: string) => ["workspaces", wsId, "members"] as const,
|
||||
onlineMembers: (wsId: string) => ["workspaces", wsId, "online-members"] as const,
|
||||
agents: (wsId: string) => ["workspaces", wsId, "agents"] as const,
|
||||
skills: (wsId: string) => ["workspaces", wsId, "skills"] as const,
|
||||
assigneeFrequency: (wsId: string) => ["workspaces", wsId, "assignee-frequency"] as const,
|
||||
@@ -24,6 +25,16 @@ export function memberListOptions(wsId: string) {
|
||||
});
|
||||
}
|
||||
|
||||
export function onlineMembersOptions(wsId: string) {
|
||||
return queryOptions({
|
||||
queryKey: workspaceKeys.onlineMembers(wsId),
|
||||
queryFn: async () => {
|
||||
const res = await api.getOnlineMembers(wsId);
|
||||
return res.user_ids;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export function agentListOptions(wsId: string) {
|
||||
return queryOptions({
|
||||
queryKey: workspaceKeys.agents(wsId),
|
||||
|
||||
@@ -9,6 +9,7 @@ interface ActorAvatarProps {
|
||||
initials: string;
|
||||
avatarUrl?: string | null;
|
||||
isAgent?: boolean;
|
||||
isOnline?: boolean;
|
||||
size?: number;
|
||||
className?: string;
|
||||
}
|
||||
@@ -18,6 +19,7 @@ function ActorAvatar({
|
||||
initials,
|
||||
avatarUrl,
|
||||
isAgent,
|
||||
isOnline,
|
||||
size = 20,
|
||||
className,
|
||||
}: ActorAvatarProps) {
|
||||
@@ -28,28 +30,46 @@ function ActorAvatar({
|
||||
setImgError(false);
|
||||
}, [avatarUrl]);
|
||||
|
||||
// Status dot size scales with avatar size
|
||||
const dotSize = Math.max(6, Math.round(size * 0.3));
|
||||
|
||||
return (
|
||||
<div
|
||||
data-slot="avatar"
|
||||
className={cn(
|
||||
"inline-flex shrink-0 items-center justify-center rounded-full font-medium overflow-hidden",
|
||||
"relative inline-flex shrink-0 items-center justify-center rounded-full font-medium overflow-visible",
|
||||
"bg-muted text-muted-foreground",
|
||||
className
|
||||
)}
|
||||
style={{ width: size, height: size, fontSize: size * 0.45 }}
|
||||
title={name}
|
||||
>
|
||||
{avatarUrl && !imgError ? (
|
||||
<img
|
||||
src={avatarUrl}
|
||||
alt={name}
|
||||
className="h-full w-full object-cover"
|
||||
onError={() => setImgError(true)}
|
||||
<div className="h-full w-full overflow-hidden rounded-[inherit]">
|
||||
{avatarUrl && !imgError ? (
|
||||
<img
|
||||
src={avatarUrl}
|
||||
alt={name}
|
||||
className="h-full w-full object-cover"
|
||||
onError={() => setImgError(true)}
|
||||
/>
|
||||
) : isAgent ? (
|
||||
<div className="flex h-full w-full items-center justify-center">
|
||||
<Bot style={{ width: size * 0.55, height: size * 0.55 }} />
|
||||
</div>
|
||||
) : (
|
||||
<div className="flex h-full w-full items-center justify-center">
|
||||
{initials}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
{isOnline !== undefined && (
|
||||
<span
|
||||
className={cn(
|
||||
"absolute right-0 top-0 z-10 rounded-full ring-2 ring-background",
|
||||
isOnline ? "bg-success" : "bg-muted-foreground/40"
|
||||
)}
|
||||
style={{ width: dotSize, height: dotSize, transform: "translate(25%, -25%)" }}
|
||||
/>
|
||||
) : isAgent ? (
|
||||
<Bot style={{ width: size * 0.55, height: size * 0.55 }} />
|
||||
) : (
|
||||
initials
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
|
||||
@@ -8,16 +8,18 @@ interface ActorAvatarProps {
|
||||
actorId: string;
|
||||
size?: number;
|
||||
className?: string;
|
||||
showOnlineStatus?: boolean;
|
||||
}
|
||||
|
||||
export function ActorAvatar({ actorType, actorId, size, className }: ActorAvatarProps) {
|
||||
const { getActorName, getActorInitials, getActorAvatarUrl } = useActorName();
|
||||
export function ActorAvatar({ actorType, actorId, size, className, showOnlineStatus = true }: ActorAvatarProps) {
|
||||
const { getActorName, getActorInitials, getActorAvatarUrl, getActorOnlineStatus } = useActorName();
|
||||
return (
|
||||
<ActorAvatarBase
|
||||
name={getActorName(actorType, actorId)}
|
||||
initials={getActorInitials(actorType, actorId)}
|
||||
avatarUrl={getActorAvatarUrl(actorType, actorId)}
|
||||
isAgent={actorType === "agent"}
|
||||
isOnline={showOnlineStatus ? getActorOnlineStatus(actorType, actorId) : undefined}
|
||||
size={size}
|
||||
className={className}
|
||||
/>
|
||||
|
||||
@@ -758,6 +758,14 @@ func TestWebSocketIntegration(t *testing.T) {
|
||||
// Allow Hub goroutine to process the register and add client to room
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Consume the member:online presence event sent on first connection
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, _, err = conn.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to read presence message: %v", err)
|
||||
}
|
||||
conn.SetReadDeadline(time.Time{})
|
||||
|
||||
// Create an issue — this should trigger a WebSocket broadcast
|
||||
resp := authRequest(t, "POST", "/api/issues?workspace_id="+testWorkspaceID, map[string]any{
|
||||
"title": "WebSocket test issue",
|
||||
|
||||
@@ -162,6 +162,7 @@ func NewRouter(pool *pgxpool.Pool, hub *realtime.Hub, bus *events.Bus) chi.Route
|
||||
r.Use(middleware.RequireWorkspaceMemberFromURL(queries, "id"))
|
||||
r.Get("/", h.GetWorkspace)
|
||||
r.Get("/members", h.ListMembersWithUser)
|
||||
r.Get("/members/online", h.GetOnlineMembers)
|
||||
r.Post("/leave", h.LeaveWorkspace)
|
||||
})
|
||||
// Admin-level access
|
||||
|
||||
@@ -317,6 +317,13 @@ func (h *Handler) ListMembersWithUser(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, http.StatusOK, resp)
|
||||
}
|
||||
|
||||
// GetOnlineMembers returns the user IDs of members currently connected via WebSocket.
|
||||
func (h *Handler) GetOnlineMembers(w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := workspaceIDFromURL(r, "id")
|
||||
userIDs := h.Hub.OnlineUserIDs(workspaceID)
|
||||
writeJSON(w, http.StatusOK, map[string]any{"user_ids": userIDs})
|
||||
}
|
||||
|
||||
type CreateMemberRequest struct {
|
||||
Email string `json:"email"`
|
||||
Role string `json:"role"`
|
||||
|
||||
@@ -2,6 +2,7 @@ package realtime
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -120,6 +121,14 @@ func (h *Hub) Run() {
|
||||
if h.rooms[room] == nil {
|
||||
h.rooms[room] = make(map[*Client]bool)
|
||||
}
|
||||
// Check if user was already online in this workspace
|
||||
wasOnline := false
|
||||
for c := range h.rooms[room] {
|
||||
if c.userID == client.userID {
|
||||
wasOnline = true
|
||||
break
|
||||
}
|
||||
}
|
||||
h.rooms[room][client] = true
|
||||
total := 0
|
||||
for _, r := range h.rooms {
|
||||
@@ -128,6 +137,10 @@ func (h *Hub) Run() {
|
||||
h.mu.Unlock()
|
||||
slog.Info("ws client connected", "workspace_id", room, "total_clients", total)
|
||||
|
||||
if !wasOnline {
|
||||
h.broadcastPresence(room, client.userID, true)
|
||||
}
|
||||
|
||||
case client := <-h.unregister:
|
||||
h.mu.Lock()
|
||||
room := client.workspaceID
|
||||
@@ -140,6 +153,16 @@ func (h *Hub) Run() {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Check if user still has connections in this workspace
|
||||
stillOnline := false
|
||||
if clients, ok := h.rooms[room]; ok {
|
||||
for c := range clients {
|
||||
if c.userID == client.userID {
|
||||
stillOnline = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
total := 0
|
||||
for _, r := range h.rooms {
|
||||
total += len(r)
|
||||
@@ -147,6 +170,10 @@ func (h *Hub) Run() {
|
||||
h.mu.Unlock()
|
||||
slog.Info("ws client disconnected", "workspace_id", room, "total_clients", total)
|
||||
|
||||
if !stillOnline {
|
||||
h.broadcastPresence(room, client.userID, false)
|
||||
}
|
||||
|
||||
case message := <-h.broadcast:
|
||||
// Global broadcast for daemon events (no workspace filtering)
|
||||
h.mu.RLock()
|
||||
@@ -272,6 +299,42 @@ func (h *Hub) Broadcast(message []byte) {
|
||||
h.broadcast <- message
|
||||
}
|
||||
|
||||
// OnlineUserIDs returns the unique user IDs with active WebSocket connections
|
||||
// in the given workspace.
|
||||
func (h *Hub) OnlineUserIDs(workspaceID string) []string {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
|
||||
seen := make(map[string]bool)
|
||||
if clients, ok := h.rooms[workspaceID]; ok {
|
||||
for c := range clients {
|
||||
seen[c.userID] = true
|
||||
}
|
||||
}
|
||||
|
||||
ids := make([]string, 0, len(seen))
|
||||
for id := range seen {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// broadcastPresence sends a member:online or member:offline event to the workspace.
|
||||
func (h *Hub) broadcastPresence(workspaceID, userID string, online bool) {
|
||||
eventType := "member:offline"
|
||||
if online {
|
||||
eventType = "member:online"
|
||||
}
|
||||
data, err := json.Marshal(map[string]any{
|
||||
"type": eventType,
|
||||
"payload": map[string]any{"user_id": userID},
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
h.BroadcastToWorkspace(workspaceID, data)
|
||||
}
|
||||
|
||||
// HandleWebSocket upgrades an HTTP connection to WebSocket with JWT, PAT, or cookie auth.
|
||||
func HandleWebSocket(hub *Hub, mc MembershipChecker, pr PATResolver, w http.ResponseWriter, r *http.Request) {
|
||||
workspaceID := r.URL.Query().Get("workspace_id")
|
||||
|
||||
@@ -60,6 +60,19 @@ func connectWS(t *testing.T, server *httptest.Server) *websocket.Conn {
|
||||
return conn
|
||||
}
|
||||
|
||||
// readOneMessage reads and discards exactly one message from the connection.
|
||||
// Used to consume the member:online presence event sent when the first client
|
||||
// for a user registers.
|
||||
func readOneMessage(t *testing.T, conn *websocket.Conn) {
|
||||
t.Helper()
|
||||
conn.SetReadDeadline(time.Now().Add(2 * time.Second))
|
||||
_, _, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
t.Fatalf("expected presence message but got error: %v", err)
|
||||
}
|
||||
conn.SetReadDeadline(time.Time{})
|
||||
}
|
||||
|
||||
// totalClients counts all clients across all rooms.
|
||||
func totalClients(hub *Hub) int {
|
||||
hub.mu.RLock()
|
||||
@@ -92,9 +105,12 @@ func TestHub_Broadcast(t *testing.T) {
|
||||
|
||||
conn1 := connectWS(t, server)
|
||||
defer conn1.Close()
|
||||
// First connection triggers member:online; consume it before proceeding.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
readOneMessage(t, conn1)
|
||||
|
||||
conn2 := connectWS(t, server)
|
||||
defer conn2.Close()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
msg := []byte(`{"type":"issue:created","data":"test"}`)
|
||||
@@ -147,7 +163,13 @@ func TestHub_BroadcastToMultipleClients(t *testing.T) {
|
||||
|
||||
const numClients = 5
|
||||
conns := make([]*websocket.Conn, numClients)
|
||||
for i := 0; i < numClients; i++ {
|
||||
conns[0] = connectWS(t, server)
|
||||
defer conns[0].Close()
|
||||
// First connection triggers member:online; consume it before adding more clients.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
readOneMessage(t, conns[0])
|
||||
|
||||
for i := 1; i < numClients; i++ {
|
||||
conns[i] = connectWS(t, server)
|
||||
defer conns[i].Close()
|
||||
}
|
||||
@@ -182,6 +204,7 @@ func TestHub_MultipleBroadcasts(t *testing.T) {
|
||||
defer conn.Close()
|
||||
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
readOneMessage(t, conn) // consume member:online presence event
|
||||
|
||||
messages := []string{
|
||||
`{"type":"issue:created"}`,
|
||||
|
||||
@@ -45,6 +45,8 @@ const (
|
||||
EventMemberAdded = "member:added"
|
||||
EventMemberUpdated = "member:updated"
|
||||
EventMemberRemoved = "member:removed"
|
||||
EventMemberOnline = "member:online"
|
||||
EventMemberOffline = "member:offline"
|
||||
|
||||
// Subscriber events
|
||||
EventSubscriberAdded = "subscriber:added"
|
||||
|
||||
Reference in New Issue
Block a user