mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-19 20:24:32 +02:00
Update SSE handling to accommodate slow networks (#2180)
This commit is contained in:
@@ -257,41 +257,6 @@ export const AIMessage = ({
|
||||
/>
|
||||
<div className="w-full">
|
||||
<div className="max-w-message-max break-words">
|
||||
{(!toolCall || toolCall.tool_name === SEARCH_TOOL_NAME) &&
|
||||
danswerSearchToolEnabledForPersona && (
|
||||
<>
|
||||
{query !== undefined &&
|
||||
handleShowRetrieved !== undefined &&
|
||||
isCurrentlyShowingRetrieved !== undefined &&
|
||||
!retrievalDisabled && (
|
||||
<div className="my-1">
|
||||
<SearchSummary
|
||||
query={query}
|
||||
hasDocs={hasDocs || false}
|
||||
messageId={messageId}
|
||||
finished={toolCall?.tool_result != undefined}
|
||||
isCurrentlyShowingRetrieved={
|
||||
isCurrentlyShowingRetrieved
|
||||
}
|
||||
handleShowRetrieved={handleShowRetrieved}
|
||||
handleSearchQueryEdit={handleSearchQueryEdit}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
{handleForceSearch &&
|
||||
content &&
|
||||
query === undefined &&
|
||||
!hasDocs &&
|
||||
!retrievalDisabled && (
|
||||
<div className="my-1">
|
||||
<SkippedSearch
|
||||
handleForceSearch={handleForceSearch}
|
||||
/>
|
||||
</div>
|
||||
)}
|
||||
</>
|
||||
)}
|
||||
|
||||
<div className="w-full ml-4">
|
||||
<div className="max-w-message-max break-words">
|
||||
{(!toolCall || toolCall.tool_name === SEARCH_TOOL_NAME) && (
|
||||
|
@@ -83,6 +83,7 @@ export async function* handleSSEStream<T extends PacketType>(
|
||||
): AsyncGenerator<T, void, unknown> {
|
||||
const reader = streamingResponse.body?.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
|
||||
while (true) {
|
||||
const rawChunk = await reader?.read();
|
||||
@@ -94,16 +95,43 @@ export async function* handleSSEStream<T extends PacketType>(
|
||||
break;
|
||||
}
|
||||
|
||||
const chunk = decoder.decode(value);
|
||||
const lines = chunk.split("\n").filter((line) => line.trim() !== "");
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() || "";
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.trim() === "") continue;
|
||||
|
||||
try {
|
||||
const data = JSON.parse(line) as T;
|
||||
yield data;
|
||||
} catch (error) {
|
||||
console.error("Error parsing SSE data:", error);
|
||||
|
||||
// Detect JSON objects (ie. check if parseable json has been accumulated)
|
||||
const jsonObjects = line.match(/\{[^{}]*\}/g);
|
||||
if (jsonObjects) {
|
||||
for (const jsonObj of jsonObjects) {
|
||||
try {
|
||||
const data = JSON.parse(jsonObj) as T;
|
||||
yield data;
|
||||
} catch (innerError) {
|
||||
console.error("Error parsing extracted JSON:", innerError);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process any remaining data in the buffer
|
||||
if (buffer.trim() !== "") {
|
||||
try {
|
||||
const data = JSON.parse(buffer) as T;
|
||||
yield data;
|
||||
} catch (error) {
|
||||
console.log("Problematic remaining buffer:", buffer);
|
||||
console.error("Error parsing remaining buffer:", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user