feat: add ChatSessionManager with RxJS patterns

Introduce a production-quality session management system for AI chat:

Architecture:
- ChatSessionManager singleton manages all active chat sessions
- Sessions keyed by conversationId (not window) for multi-window support
- Multiple windows viewing same conversation share streaming state
- Reference counting with delayed cleanup for quick tab switching

Features:
- RxJS BehaviorSubject for reactive session state
- Subject streams for events (streaming, messages, errors)
- Automatic usage and cost tracking per session
- Resume functionality when generation is interrupted
- Proper abort handling with partial message saving

React Integration:
- useChatSession(conversationId) - main hook combining Dexie + session state
- useChatActions() - stable action functions
- useConversations() - reactive conversation list
- useStreamingContent() - lightweight streaming-only subscription

Migrated AIViewer to use new hooks, significantly simplifying the
component by removing direct DB operations and complex state management.

https://claude.ai/code/session_01HqtD9R33oqfB14Gu1V5wHC
This commit is contained in:
Claude
2026-01-31 09:51:49 +00:00
parent 93b359fd69
commit 75fdce994a
4 changed files with 1049 additions and 136 deletions

View File

@@ -3,6 +3,7 @@
*
* Chat interface for OpenAI-compatible AI providers.
* Uses sidebar pattern for conversation history.
* Powered by ChatSessionManager for multi-window support.
*/
import { useState, useEffect, useCallback, useRef, memo } from "react";
@@ -17,6 +18,7 @@ import {
Settings2,
MessageSquare,
RefreshCw,
Play,
} from "lucide-react";
import { cn } from "@/lib/utils";
import { Button } from "@/components/ui/button";
@@ -30,13 +32,12 @@ import {
SelectValue,
} from "@/components/ui/select";
import * as VisuallyHidden from "@radix-ui/react-visually-hidden";
import { useLLMProviders, useLLMModels } from "@/hooks/useLLM";
import {
useLLMProviders,
useLLMModels,
useLLMConversations,
useLLMConversation,
useLLMChat,
} from "@/hooks/useLLM";
useChatSession,
useChatActions,
useConversations,
} from "@/hooks/useChatSession";
import { formatTimestamp } from "@/hooks/useLocale";
import { useGrimoire } from "@/core/state";
import { AIProvidersViewer } from "./AIProvidersViewer";
@@ -159,7 +160,7 @@ const ConversationItem = memo(function ConversationItem({
});
// ─────────────────────────────────────────────────────────────
// Chat Panel
// Chat Panel (uses ChatSessionManager)
// ─────────────────────────────────────────────────────────────
function ChatPanel({
@@ -173,56 +174,54 @@ function ChatPanel({
modelId: string;
onConversationCreated: (id: string) => void;
}) {
const { conversation } = useLLMConversation(conversationId);
const { createConversation } = useLLMConversations();
const { isGenerating, sendMessage, cancel } = useLLMChat();
// Session manager hooks
const { messages, isLoading, streamingContent, error, canResume } =
useChatSession(conversationId, { providerInstanceId, modelId });
const { sendMessage, createConversation, stopGeneration, resumeGeneration } =
useChatActions();
// Local UI state
const [input, setInput] = useState("");
const [streamingContent, setStreamingContent] = useState("");
const [pendingUserMessage, setPendingUserMessage] = useState<string | null>(
null,
);
const [isWaitingForResponse, setIsWaitingForResponse] = useState(false);
const messagesEndRef = useRef<HTMLDivElement>(null);
const textareaRef = useRef<HTMLTextAreaElement>(null);
// Reset local state when switching conversations
// Reset input when switching conversations
useEffect(() => {
setStreamingContent("");
setPendingUserMessage(null);
setIsWaitingForResponse(false);
setInput("");
setPendingUserMessage(null);
textareaRef.current?.focus();
}, [conversationId]);
// Auto-scroll on new messages or streaming
useEffect(() => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}, [conversation?.messages, streamingContent, pendingUserMessage]);
}, [messages, streamingContent, pendingUserMessage]);
// Clear pending message once it appears in the conversation
useEffect(() => {
if (
pendingUserMessage &&
conversation?.messages.some(
messages.some(
(m) => m.role === "user" && m.content === pendingUserMessage,
)
) {
setPendingUserMessage(null);
}
}, [conversation?.messages, pendingUserMessage]);
}, [messages, pendingUserMessage]);
const handleSend = async () => {
if (!input.trim() || isGenerating || isWaitingForResponse) return;
if (!input.trim() || isLoading) return;
const userContent = input.trim();
setInput("");
setPendingUserMessage(userContent);
setIsWaitingForResponse(true);
// Import db for direct operations (avoids stale closure issues)
const db = (await import("@/services/db")).default;
try {
// Create or use existing conversation
// Create conversation if needed
let activeConversationId = conversationId;
if (!activeConversationId) {
activeConversationId = await createConversation(
@@ -232,103 +231,11 @@ function ChatPanel({
onConversationCreated(activeConversationId);
}
// Add user message directly to DB (not through hook - avoids stale closure)
const userMessage: LLMMessage = {
id: crypto.randomUUID(),
role: "user",
content: userContent,
timestamp: Date.now(),
};
const currentConv = await db.llmConversations.get(activeConversationId);
if (!currentConv) {
setPendingUserMessage(null);
setIsWaitingForResponse(false);
return;
}
const isFirstMessage = currentConv.messages.length === 0;
await db.llmConversations.update(activeConversationId, {
messages: [...currentConv.messages, userMessage],
updatedAt: Date.now(),
// Auto-title from first user message
title: isFirstMessage
? userContent.slice(0, 50) + (userContent.length > 50 ? "..." : "")
: currentConv.title,
});
// Add empty assistant message placeholder
const assistantMessage: LLMMessage = {
id: crypto.randomUUID(),
role: "assistant",
content: "",
timestamp: Date.now(),
};
const updatedConv = await db.llmConversations.get(activeConversationId);
if (!updatedConv) {
setIsWaitingForResponse(false);
return;
}
await db.llmConversations.update(activeConversationId, {
messages: [...updatedConv.messages, assistantMessage],
updatedAt: Date.now(),
});
setStreamingContent("");
// Get messages for API call (includes user message, excludes empty assistant)
const messagesForApi = updatedConv.messages;
let fullContent = "";
await sendMessage(
providerInstanceId,
modelId,
messagesForApi,
(token) => {
setIsWaitingForResponse(false);
fullContent += token;
setStreamingContent(fullContent);
},
async () => {
// Update assistant message with final content
const finalConv = await db.llmConversations.get(activeConversationId);
if (finalConv && finalConv.messages.length > 0) {
const messages = [...finalConv.messages];
messages[messages.length - 1] = {
...messages[messages.length - 1],
content: fullContent,
};
await db.llmConversations.update(activeConversationId, {
messages,
updatedAt: Date.now(),
});
}
setStreamingContent("");
setIsWaitingForResponse(false);
},
async (error) => {
// Update assistant message with error
const finalConv = await db.llmConversations.get(activeConversationId);
if (finalConv && finalConv.messages.length > 0) {
const messages = [...finalConv.messages];
messages[messages.length - 1] = {
...messages[messages.length - 1],
content: `Error: ${error}`,
};
await db.llmConversations.update(activeConversationId, {
messages,
updatedAt: Date.now(),
});
}
setStreamingContent("");
setIsWaitingForResponse(false);
},
);
} catch {
// Send via session manager (handles everything)
await sendMessage(activeConversationId, userContent);
} catch (err) {
console.error("Failed to send message:", err);
setPendingUserMessage(null);
setIsWaitingForResponse(false);
}
};
@@ -339,16 +246,44 @@ function ChatPanel({
}
};
const messages = conversation?.messages ?? [];
const handleStop = () => {
if (conversationId) {
stopGeneration(conversationId);
}
};
let displayMessages =
streamingContent && messages.length > 0
? [
...messages.slice(0, -1),
{ ...messages[messages.length - 1], content: streamingContent },
]
: messages;
const handleResume = () => {
if (conversationId) {
resumeGeneration(conversationId);
}
};
// Build display messages with streaming content overlay
let displayMessages = [...messages];
// If streaming, overlay streaming content on last assistant message
if (streamingContent && messages.length > 0) {
const lastMsg = messages[messages.length - 1];
if (lastMsg.role === "assistant") {
displayMessages = [
...messages.slice(0, -1),
{ ...lastMsg, content: streamingContent },
];
} else {
// Streaming but last message is user - add streaming as new message
displayMessages = [
...messages,
{
id: "streaming",
role: "assistant" as const,
content: streamingContent,
timestamp: Date.now(),
},
];
}
}
// Show pending user message optimistically
if (
pendingUserMessage &&
!messages.some((m) => m.role === "user" && m.content === pendingUserMessage)
@@ -364,9 +299,7 @@ function ChatPanel({
];
}
const showThinking =
isWaitingForResponse || (isGenerating && !streamingContent);
const isBusy = isGenerating || isWaitingForResponse;
const showThinking = isLoading && !streamingContent;
return (
<>
@@ -384,6 +317,29 @@ function ChatPanel({
{showThinking && <ThinkingIndicator />}
</>
)}
{/* Error display */}
{error && !isLoading && (
<div className="text-sm text-destructive bg-destructive/10 rounded-lg px-3 py-2">
{error}
</div>
)}
{/* Resume button */}
{canResume && conversationId && (
<div className="flex justify-center">
<Button
variant="outline"
size="sm"
onClick={handleResume}
className="gap-2"
>
<Play className="h-3 w-3" />
Resume
</Button>
</div>
)}
<div ref={messagesEndRef} />
</div>
</div>
@@ -398,13 +354,13 @@ function ChatPanel({
placeholder="Type a message..."
className="flex-1 min-h-[38px] max-h-[120px] resize-none text-sm"
rows={1}
disabled={isBusy}
disabled={isLoading}
/>
{isBusy ? (
{isLoading ? (
<Button
variant="outline"
size="icon"
onClick={cancel}
onClick={handleStop}
className="flex-shrink-0 h-[38px] w-[38px]"
>
<Square className="h-4 w-4" />
@@ -448,7 +404,8 @@ export function AIViewer({ subcommand }: AIViewerProps) {
loading: modelsLoading,
refresh: refreshModels,
} = useLLMModels(activeInstanceId);
const { conversations, deleteConversation } = useLLMConversations();
const { conversations } = useConversations();
const { deleteConversation } = useChatActions();
const [selectedConversationId, setSelectedConversationId] = useState<
string | null

297
src/hooks/useChatSession.ts Normal file
View File

@@ -0,0 +1,297 @@
/**
* React hooks for ChatSessionManager
*
* Provides reactive access to chat sessions with proper cleanup.
* Combines session state (streaming) with conversation data (Dexie).
*/
import { useEffect, useMemo, useCallback } from "react";
import { useLiveQuery } from "dexie-react-hooks";
import { map, distinctUntilChanged } from "rxjs/operators";
import { use$ } from "applesauce-react/hooks";
import { sessionManager } from "@/services/llm/session-manager";
import db from "@/services/db";
import type { ChatSessionState, LLMConversation } from "@/types/llm";
// ─────────────────────────────────────────────────────────────
// Main Session Hook
// ─────────────────────────────────────────────────────────────
interface UseChatSessionOptions {
/** Provider instance ID (required for new sessions) */
providerInstanceId?: string;
/** Model ID (required for new sessions) */
modelId?: string;
}
interface UseChatSessionResult {
// Conversation data (from Dexie - persistent)
conversation: LLMConversation | undefined;
messages: LLMConversation["messages"];
title: string;
// Session state (from SessionManager - transient)
session: ChatSessionState | undefined;
isLoading: boolean;
streamingContent: string;
error: string | undefined;
// Usage and cost
usage: ChatSessionState["usage"];
cost: number;
// Resume state
canResume: boolean;
finishReason: ChatSessionState["finishReason"];
}
/**
* Hook to access and manage a chat session.
* Automatically registers/unregisters with SessionManager on mount/unmount.
*
* @param conversationId - The conversation ID (null for no selection)
* @param options - Provider and model configuration
*/
export function useChatSession(
conversationId: string | null,
options: UseChatSessionOptions = {},
): UseChatSessionResult {
const { providerInstanceId, modelId } = options;
// Register/unregister session on mount/unmount
useEffect(() => {
if (!conversationId || !providerInstanceId || !modelId) return;
sessionManager.openSession(conversationId, providerInstanceId, modelId);
return () => {
sessionManager.closeSession(conversationId);
};
}, [conversationId, providerInstanceId, modelId]);
// Subscribe to session state from SessionManager
const session = use$(
() =>
sessionManager.sessions$.pipe(
map((sessions) =>
conversationId ? sessions.get(conversationId) : undefined,
),
distinctUntilChanged(),
),
[conversationId],
);
// Subscribe to conversation data from Dexie (reactive)
const conversation = useLiveQuery(
() =>
conversationId ? db.llmConversations.get(conversationId) : undefined,
[conversationId],
);
// Derive computed values
const result = useMemo(
(): UseChatSessionResult => ({
// Conversation data
conversation,
messages: conversation?.messages ?? [],
title: conversation?.title ?? "New conversation",
// Session state
session,
isLoading: session?.isLoading ?? false,
streamingContent: session?.streamingContent ?? "",
error: session?.lastError,
// Usage and cost
usage: session?.usage,
cost: session?.sessionCost ?? 0,
// Resume state
canResume: Boolean(
session &&
!session.isLoading &&
session.finishReason !== "stop" &&
session.finishReason !== "error",
),
finishReason: session?.finishReason,
}),
[conversation, session],
);
return result;
}
// ─────────────────────────────────────────────────────────────
// Actions Hook
// ─────────────────────────────────────────────────────────────
interface UseChatActionsResult {
/** Send a message to the current conversation */
sendMessage: (conversationId: string, content: string) => Promise<void>;
/** Create a new conversation */
createConversation: (
providerInstanceId: string,
modelId: string,
title?: string,
) => Promise<string>;
/** Delete a conversation */
deleteConversation: (conversationId: string) => Promise<void>;
/** Stop generation for a conversation */
stopGeneration: (conversationId: string) => void;
/** Resume generation for a conversation */
resumeGeneration: (conversationId: string) => Promise<void>;
}
/**
* Hook providing chat actions.
* These are stable functions that don't change between renders.
*/
export function useChatActions(): UseChatActionsResult {
const sendMessage = useCallback(
(conversationId: string, content: string) =>
sessionManager.sendMessage(conversationId, content),
[],
);
const createConversation = useCallback(
(providerInstanceId: string, modelId: string, title?: string) =>
sessionManager.createConversation(providerInstanceId, modelId, title),
[],
);
const deleteConversation = useCallback(
(conversationId: string) =>
sessionManager.deleteConversation(conversationId),
[],
);
const stopGeneration = useCallback(
(conversationId: string) => sessionManager.stopGeneration(conversationId),
[],
);
const resumeGeneration = useCallback(
(conversationId: string) => sessionManager.startGeneration(conversationId),
[],
);
return useMemo(
() => ({
sendMessage,
createConversation,
deleteConversation,
stopGeneration,
resumeGeneration,
}),
[
sendMessage,
createConversation,
deleteConversation,
stopGeneration,
resumeGeneration,
],
);
}
// ─────────────────────────────────────────────────────────────
// Conversations List Hook
// ─────────────────────────────────────────────────────────────
interface UseConversationsResult {
/** All conversations, sorted by updatedAt descending */
conversations: LLMConversation[];
/** Loading state */
isLoading: boolean;
}
/**
* Hook to get all conversations from Dexie.
* Automatically updates when conversations change.
*/
export function useConversations(): UseConversationsResult {
const conversations = useLiveQuery(
() => db.llmConversations.orderBy("updatedAt").reverse().toArray(),
[],
);
return useMemo(
() => ({
conversations: conversations ?? [],
isLoading: conversations === undefined,
}),
[conversations],
);
}
// ─────────────────────────────────────────────────────────────
// Streaming Events Hook
// ─────────────────────────────────────────────────────────────
/**
* Hook to subscribe to streaming updates for a specific conversation.
* Useful for components that only need streaming content, not full session state.
*/
export function useStreamingContent(conversationId: string | null): string {
const content = use$(
() =>
sessionManager.sessions$.pipe(
map((sessions) => {
if (!conversationId) return "";
const session = sessions.get(conversationId);
return session?.streamingContent ?? "";
}),
distinctUntilChanged(),
),
[conversationId],
);
return content ?? "";
}
// ─────────────────────────────────────────────────────────────
// Loading State Hook
// ─────────────────────────────────────────────────────────────
/**
* Hook to check if a conversation is currently loading.
*/
export function useIsLoading(conversationId: string | null): boolean {
const isLoading = use$(
() =>
sessionManager.sessions$.pipe(
map((sessions) => {
if (!conversationId) return false;
const session = sessions.get(conversationId);
return session?.isLoading ?? false;
}),
distinctUntilChanged(),
),
[conversationId],
);
return isLoading ?? false;
}
// ─────────────────────────────────────────────────────────────
// Active Sessions Hook
// ─────────────────────────────────────────────────────────────
/**
* Hook to get all active session IDs.
* Useful for debugging or status displays.
*/
export function useActiveSessions(): string[] {
const sessionIds = use$(
() =>
sessionManager.sessions$.pipe(
map((sessions) => Array.from(sessions.keys())),
),
[],
);
return sessionIds ?? [];
}

View File

@@ -0,0 +1,584 @@
/**
* Chat Session Manager
*
* Manages active chat sessions with RxJS patterns.
* Supports multiple concurrent sessions (one per conversation).
* Multiple windows can view the same conversation and share streaming state.
*
* Architecture:
* - Conversation data (messages) → Dexie (persistent, shared)
* - Session state (streaming) → Memory (transient, reactive)
*/
import { BehaviorSubject, Subject } from "rxjs";
import db from "@/services/db";
import { providerManager } from "./provider-manager";
import type {
ChatSessionState,
StreamingUpdateEvent,
MessageAddedEvent,
LoadingChangedEvent,
SessionErrorEvent,
LLMMessage,
LLMConversation,
} from "@/types/llm";
// Session cleanup delay (ms) - wait before cleaning up after last subscriber leaves
const CLEANUP_DELAY = 5000;
class ChatSessionManager {
// ─────────────────────────────────────────────────────────────
// Reactive State
// ─────────────────────────────────────────────────────────────
/**
* All active sessions, keyed by conversationId.
* Multiple sessions can be active simultaneously.
*/
sessions$ = new BehaviorSubject<Map<string, ChatSessionState>>(new Map());
// ─────────────────────────────────────────────────────────────
// Event Streams
// ─────────────────────────────────────────────────────────────
/** Emitted during streaming with incremental content */
streamingUpdate$ = new Subject<StreamingUpdateEvent>();
/** Emitted when a message is added to a conversation */
messageAdded$ = new Subject<MessageAddedEvent>();
/** Emitted when loading state changes */
loadingChanged$ = new Subject<LoadingChangedEvent>();
/** Emitted on errors */
error$ = new Subject<SessionErrorEvent>();
// ─────────────────────────────────────────────────────────────
// Internal State
// ─────────────────────────────────────────────────────────────
/** Cleanup timers for sessions with no subscribers */
private cleanupTimers = new Map<string, ReturnType<typeof setTimeout>>();
// ─────────────────────────────────────────────────────────────
// Session Lifecycle
// ─────────────────────────────────────────────────────────────
/**
* Open a session for a conversation.
* If session already exists, increments subscriber count.
* Call this when a window starts viewing a conversation.
*/
openSession(
conversationId: string,
providerInstanceId: string,
modelId: string,
): ChatSessionState {
// Cancel any pending cleanup
const cleanupTimer = this.cleanupTimers.get(conversationId);
if (cleanupTimer) {
clearTimeout(cleanupTimer);
this.cleanupTimers.delete(conversationId);
}
const sessions = this.sessions$.value;
const existing = sessions.get(conversationId);
if (existing) {
// Increment subscriber count
const updated: ChatSessionState = {
...existing,
subscriberCount: existing.subscriberCount + 1,
// Update provider/model if changed (e.g., user switched model)
providerInstanceId,
modelId,
};
this.updateSession(conversationId, updated);
return updated;
}
// Create new session
const session: ChatSessionState = {
conversationId,
providerInstanceId,
modelId,
isLoading: false,
streamingContent: "",
sessionCost: 0,
subscriberCount: 1,
lastActivity: Date.now(),
};
this.updateSession(conversationId, session);
return session;
}
/**
* Close a session subscription.
* Decrements subscriber count; cleans up when count reaches 0.
* Call this when a window stops viewing a conversation.
*/
closeSession(conversationId: string): void {
const sessions = this.sessions$.value;
const session = sessions.get(conversationId);
if (!session) return;
const updatedCount = session.subscriberCount - 1;
if (updatedCount <= 0) {
// Schedule cleanup after delay (in case user switches back quickly)
const timer = setTimeout(() => {
this.cleanupSession(conversationId);
this.cleanupTimers.delete(conversationId);
}, CLEANUP_DELAY);
this.cleanupTimers.set(conversationId, timer);
// Update count to 0
this.updateSession(conversationId, {
...session,
subscriberCount: 0,
});
} else {
// Just decrement
this.updateSession(conversationId, {
...session,
subscriberCount: updatedCount,
});
}
}
/**
* Clean up a session completely.
* Aborts any in-progress generation and removes from state.
*/
private cleanupSession(conversationId: string): void {
const sessions = this.sessions$.value;
const session = sessions.get(conversationId);
if (!session) return;
// Don't cleanup if subscribers came back
if (session.subscriberCount > 0) return;
// Abort any in-progress generation
session.abortController?.abort("Session closed");
// Remove from state
const newSessions = new Map(sessions);
newSessions.delete(conversationId);
this.sessions$.next(newSessions);
}
/**
* Get a session by conversation ID.
*/
getSession(conversationId: string): ChatSessionState | undefined {
return this.sessions$.value.get(conversationId);
}
/**
* Update a session in the state map.
*/
private updateSession(
conversationId: string,
session: ChatSessionState,
): void {
const newSessions = new Map(this.sessions$.value);
newSessions.set(conversationId, session);
this.sessions$.next(newSessions);
}
// ─────────────────────────────────────────────────────────────
// Conversation Management
// ─────────────────────────────────────────────────────────────
/**
* Create a new conversation and return its ID.
*/
async createConversation(
providerInstanceId: string,
modelId: string,
title?: string,
): Promise<string> {
const id = crypto.randomUUID();
const now = Date.now();
const conversation: LLMConversation = {
id,
title: title || "New conversation",
providerInstanceId,
modelId,
messages: [],
createdAt: now,
updatedAt: now,
};
await db.llmConversations.add(conversation);
return id;
}
/**
* Delete a conversation and its session.
*/
async deleteConversation(conversationId: string): Promise<void> {
// Clean up session first
const session = this.getSession(conversationId);
if (session) {
session.abortController?.abort("Conversation deleted");
const newSessions = new Map(this.sessions$.value);
newSessions.delete(conversationId);
this.sessions$.next(newSessions);
}
// Cancel any pending cleanup
const timer = this.cleanupTimers.get(conversationId);
if (timer) {
clearTimeout(timer);
this.cleanupTimers.delete(conversationId);
}
// Delete from Dexie
await db.llmConversations.delete(conversationId);
}
// ─────────────────────────────────────────────────────────────
// Message Handling
// ─────────────────────────────────────────────────────────────
/**
* Send a message and stream the response.
* This is the main entry point for chat interactions.
*/
async sendMessage(conversationId: string, content: string): Promise<void> {
const session = this.getSession(conversationId);
if (!session) {
throw new Error(`No session found for conversation ${conversationId}`);
}
if (session.isLoading) {
throw new Error("Session is already generating a response");
}
// Get conversation from Dexie
const conversation = await db.llmConversations.get(conversationId);
if (!conversation) {
throw new Error(`Conversation ${conversationId} not found`);
}
// Create user message
const userMessage: LLMMessage = {
id: crypto.randomUUID(),
role: "user",
content,
timestamp: Date.now(),
};
// Add user message to Dexie
const isFirstMessage = conversation.messages.length === 0;
await db.llmConversations.update(conversationId, {
messages: [...conversation.messages, userMessage],
updatedAt: Date.now(),
// Auto-title from first message
title: isFirstMessage
? content.slice(0, 50) + (content.length > 50 ? "..." : "")
: conversation.title,
});
this.messageAdded$.next({ conversationId, message: userMessage });
// Start generation
await this.startGeneration(conversationId);
}
/**
* Start or resume AI generation for a conversation.
*/
async startGeneration(conversationId: string): Promise<void> {
const session = this.getSession(conversationId);
if (!session) {
throw new Error(`No session found for conversation ${conversationId}`);
}
if (session.isLoading) {
return; // Already generating
}
// Get conversation from Dexie
const conversation = await db.llmConversations.get(conversationId);
if (!conversation || conversation.messages.length === 0) {
throw new Error("No messages in conversation");
}
// Create abort controller
const abortController = new AbortController();
// Update session to loading state
this.updateSession(conversationId, {
...session,
isLoading: true,
streamingContent: "",
abortController,
lastError: undefined,
finishReason: null,
lastActivity: Date.now(),
});
this.loadingChanged$.next({ conversationId, isLoading: true });
try {
// Stream response from provider
let fullContent = "";
let usage: ChatSessionState["usage"];
const chatGenerator = providerManager.chat(
session.providerInstanceId,
session.modelId,
conversation.messages,
{ signal: abortController.signal },
);
for await (const chunk of chatGenerator) {
// Check if session still exists and is loading
const currentSession = this.getSession(conversationId);
if (!currentSession?.isLoading) {
break;
}
if (chunk.type === "token" && chunk.content) {
fullContent += chunk.content;
// Update streaming content
this.updateSession(conversationId, {
...currentSession,
streamingContent: fullContent,
lastActivity: Date.now(),
});
this.streamingUpdate$.next({
conversationId,
content: fullContent,
});
} else if (chunk.type === "done") {
usage = chunk.usage;
} else if (chunk.type === "error") {
throw new Error(chunk.error || "Unknown error");
}
}
// Create assistant message
const assistantMessage: LLMMessage = {
id: crypto.randomUUID(),
role: "assistant",
content: fullContent,
timestamp: Date.now(),
};
// Add to Dexie
const updatedConv = await db.llmConversations.get(conversationId);
if (updatedConv) {
await db.llmConversations.update(conversationId, {
messages: [...updatedConv.messages, assistantMessage],
updatedAt: Date.now(),
});
}
this.messageAdded$.next({ conversationId, message: assistantMessage });
// Calculate cost if we have usage and pricing
let cost = 0;
if (usage) {
cost = await this.calculateCost(
session.providerInstanceId,
session.modelId,
usage.promptTokens,
usage.completionTokens,
);
}
// Update session to completed state
const finalSession = this.getSession(conversationId);
if (finalSession) {
this.updateSession(conversationId, {
...finalSession,
isLoading: false,
streamingContent: "",
abortController: undefined,
usage,
sessionCost: finalSession.sessionCost + cost,
finishReason: "stop",
lastActivity: Date.now(),
});
}
this.loadingChanged$.next({ conversationId, isLoading: false });
} catch (error) {
// Handle abort
if (error instanceof DOMException && error.name === "AbortError") {
const currentSession = this.getSession(conversationId);
if (currentSession) {
this.updateSession(conversationId, {
...currentSession,
isLoading: false,
abortController: undefined,
finishReason: null, // Can resume
lastActivity: Date.now(),
});
}
this.loadingChanged$.next({ conversationId, isLoading: false });
return;
}
// Handle error
const errorMessage =
error instanceof Error ? error.message : "Unknown error";
const currentSession = this.getSession(conversationId);
if (currentSession) {
this.updateSession(conversationId, {
...currentSession,
isLoading: false,
streamingContent: "",
abortController: undefined,
lastError: errorMessage,
finishReason: "error",
lastActivity: Date.now(),
});
}
this.error$.next({ conversationId, error: errorMessage });
this.loadingChanged$.next({ conversationId, isLoading: false });
}
}
/**
* Stop generation for a conversation.
*/
stopGeneration(conversationId: string): void {
const session = this.getSession(conversationId);
if (!session) return;
session.abortController?.abort("User stopped generation");
// If there's streaming content, save it as a partial message
if (session.streamingContent) {
this.savePartialMessage(conversationId, session.streamingContent);
}
this.updateSession(conversationId, {
...session,
isLoading: false,
streamingContent: "",
abortController: undefined,
finishReason: null, // Can resume
lastActivity: Date.now(),
});
this.loadingChanged$.next({ conversationId, isLoading: false });
}
/**
* Save partial streaming content as a message (when stopped mid-stream).
*/
private async savePartialMessage(
conversationId: string,
content: string,
): Promise<void> {
if (!content.trim()) return;
const conversation = await db.llmConversations.get(conversationId);
if (!conversation) return;
const assistantMessage: LLMMessage = {
id: crypto.randomUUID(),
role: "assistant",
content: content + "\n\n_(generation stopped)_",
timestamp: Date.now(),
};
await db.llmConversations.update(conversationId, {
messages: [...conversation.messages, assistantMessage],
updatedAt: Date.now(),
});
this.messageAdded$.next({ conversationId, message: assistantMessage });
}
// ─────────────────────────────────────────────────────────────
// Cost Calculation
// ─────────────────────────────────────────────────────────────
/**
* Calculate cost for a completion based on model pricing.
*/
private async calculateCost(
providerInstanceId: string,
modelId: string,
promptTokens: number,
completionTokens: number,
): Promise<number> {
try {
const models = await providerManager.listModels(providerInstanceId);
const model = models.find((m) => m.id === modelId);
if (!model?.pricing) {
return 0;
}
const inputCost =
(promptTokens / 1_000_000) * (model.pricing.inputPerMillion ?? 0);
const outputCost =
(completionTokens / 1_000_000) * (model.pricing.outputPerMillion ?? 0);
return inputCost + outputCost;
} catch {
return 0;
}
}
// ─────────────────────────────────────────────────────────────
// Utility Methods
// ─────────────────────────────────────────────────────────────
/**
* Check if a conversation can be resumed (was interrupted).
*/
canResume(conversationId: string): boolean {
const session = this.getSession(conversationId);
return Boolean(
session &&
!session.isLoading &&
session.finishReason !== "stop" &&
session.finishReason !== "error",
);
}
/**
* Get all active session IDs.
*/
getActiveSessionIds(): string[] {
return Array.from(this.sessions$.value.keys());
}
/**
* Clear all sessions (for cleanup/logout).
*/
clearAllSessions(): void {
// Abort all in-progress generations
for (const session of this.sessions$.value.values()) {
session.abortController?.abort("Clearing all sessions");
}
// Clear all cleanup timers
for (const timer of this.cleanupTimers.values()) {
clearTimeout(timer);
}
this.cleanupTimers.clear();
// Clear sessions
this.sessions$.next(new Map());
}
}
// Singleton instance
export const sessionManager = new ChatSessionManager();

View File

@@ -90,6 +90,81 @@ export interface ChatOptions {
signal?: AbortSignal;
}
// ─────────────────────────────────────────────────────────────
// Session State (for ChatSessionManager)
// ─────────────────────────────────────────────────────────────
/**
* Transient state for an active chat session.
* Multiple windows can view the same conversation and share this state.
* Messages are stored in Dexie; this tracks streaming/loading state.
*/
export interface ChatSessionState {
conversationId: string;
providerInstanceId: string;
modelId: string;
// Streaming state (shared across all windows viewing this conversation)
isLoading: boolean;
streamingContent: string;
abortController?: AbortController;
// Usage from last completion
usage?: {
promptTokens: number;
completionTokens: number;
};
// Cost tracking (USD)
sessionCost: number;
// For resume functionality
finishReason?: "stop" | "length" | "error" | null;
lastError?: string;
// Reference counting - how many windows have this session open
subscriberCount: number;
// Timing
lastActivity: number;
}
/**
* Event emitted during streaming updates.
*/
export interface StreamingUpdateEvent {
conversationId: string;
content: string;
usage?: {
promptTokens: number;
completionTokens: number;
};
}
/**
* Event emitted when a message is added to a conversation.
*/
export interface MessageAddedEvent {
conversationId: string;
message: LLMMessage;
}
/**
* Event emitted when session loading state changes.
*/
export interface LoadingChangedEvent {
conversationId: string;
isLoading: boolean;
}
/**
* Event emitted on session error.
*/
export interface SessionErrorEvent {
conversationId: string;
error: string;
}
// ─────────────────────────────────────────────────────────────
// Legacy types (for DB migration compatibility)
// ─────────────────────────────────────────────────────────────