Split chunks up by newline

This commit is contained in:
Weves 2023-05-13 16:09:56 -07:00 committed by Chris Weaver
parent b825b39763
commit c68220103d

View File

@ -5,6 +5,50 @@ import { SearchBar } from "./SearchBar";
import { SearchResultsDisplay } from "./SearchResultsDisplay"; import { SearchResultsDisplay } from "./SearchResultsDisplay";
import { Quote, Document } from "./types"; import { Quote, Document } from "./types";
const processSingleChunk = (
chunk: string,
currPartialChunk: string | null
): [{ [key: string]: any } | null, string | null] => {
const completeChunk = chunk + (currPartialChunk || "");
try {
// every complete chunk should be valid JSON
const chunkJson = JSON.parse(chunk);
return [chunkJson, null];
} catch (err) {
// if it's not valid JSON, then it's probably an incomplete chunk
return [null, completeChunk];
}
};
const processRawChunkString = (
rawChunkString: string,
previousPartialChunk: string | null
): [any[], string | null] => {
/* This is required because, in practice, we see that nginx does not send over
each chunk one at a time even with buffering turned off. Instead,
chunks are sometimes in batches or are sometimes incomplete */
if (!rawChunkString) {
return [[], null];
}
const chunkSections = rawChunkString
.split("\n")
.filter((chunk) => chunk.length > 0);
let parsedChunkSections: any[] = [];
let currPartialChunk = previousPartialChunk;
chunkSections.forEach((chunk) => {
const [processedChunk, partialChunk] = processSingleChunk(
chunk,
currPartialChunk
);
if (processedChunk) {
parsedChunkSections.push(processedChunk);
} else {
currPartialChunk = partialChunk;
}
});
return [parsedChunkSections, currPartialChunk];
};
const searchRequestStreamed = async ( const searchRequestStreamed = async (
query: string, query: string,
updateCurrentAnswer: (val: string) => void, updateCurrentAnswer: (val: string) => void,
@ -24,6 +68,7 @@ const searchRequestStreamed = async (
const reader = response.body?.getReader(); const reader = response.body?.getReader();
const decoder = new TextDecoder("utf-8"); const decoder = new TextDecoder("utf-8");
let previousPartialChunk = null;
while (true) { while (true) {
const rawChunk = await reader?.read(); const rawChunk = await reader?.read();
if (!rawChunk) { if (!rawChunk) {
@ -35,23 +80,31 @@ const searchRequestStreamed = async (
} }
// Process each chunk as it arrives // Process each chunk as it arrives
const chunk = decoder.decode(value, { stream: true }); const [completedChunks, partialChunk] = processRawChunkString(
if (!chunk) { decoder.decode(value, { stream: true }),
previousPartialChunk
);
if (!completedChunks.length && !partialChunk) {
break; break;
} }
const chunkJson = JSON.parse(chunk); if (partialChunk) {
const answerChunk = chunkJson.answer_data; previousPartialChunk = partialChunk;
}
completedChunks.forEach((chunk) => {
// TODO: clean up response / this logic
const answerChunk = chunk.answer_data;
if (answerChunk) { if (answerChunk) {
answer += answerChunk; answer += answerChunk;
updateCurrentAnswer(answer); updateCurrentAnswer(answer);
} else { } else {
const docs = chunkJson.top_documents as any[]; const docs = chunk.top_documents as any[];
if (docs) { if (docs) {
updateDocs(docs.map((doc) => JSON.parse(doc))); updateDocs(docs.map((doc) => JSON.parse(doc) as Document));
} else { } else {
updateQuotes(chunkJson); updateQuotes(chunk as Record<string, Quote>);
} }
} }
});
} }
} catch (err) { } catch (err) {
console.error("Fetch error:", err); console.error("Fetch error:", err);
@ -76,6 +129,11 @@ export const SearchSection: React.FC<{}> = () => {
searchRequestStreamed(query, setAnswer, setQuotes, setDocuments).then( searchRequestStreamed(query, setAnswer, setQuotes, setDocuments).then(
() => { () => {
setIsFetching(false); setIsFetching(false);
// if no quotes were given, set to empty object so that the SearchResultsDisplay
// component knows that the search was successful but no quotes were found
if (!quotes) {
setQuotes({});
}
} }
); );
}} }}