Files
danswer/web/src/lib/search/streamingUtils.ts
2023-09-01 20:32:22 -07:00

48 lines
1.4 KiB
TypeScript

type NonEmptyObject = { [k: string]: any };
const processSingleChunk = <T extends NonEmptyObject>(
chunk: string,
currPartialChunk: string | null
): [T | null, string | null] => {
const completeChunk = (currPartialChunk || "") + chunk;
try {
// every complete chunk should be valid JSON
const chunkJson = JSON.parse(completeChunk);
return [chunkJson, null];
} catch (err) {
// if it's not valid JSON, then it's probably an incomplete chunk
return [null, completeChunk];
}
};
export const processRawChunkString = <T extends NonEmptyObject>(
rawChunkString: string,
previousPartialChunk: string | null
): [T[], string | null] => {
/* This is required because, in practice, we see that nginx does not send over
each chunk one at a time even with buffering turned off. Instead,
chunks are sometimes in batches or are sometimes incomplete */
if (!rawChunkString) {
return [[], null];
}
const chunkSections = rawChunkString
.split("\n")
.filter((chunk) => chunk.length > 0);
let parsedChunkSections: T[] = [];
let currPartialChunk = previousPartialChunk;
chunkSections.forEach((chunk) => {
const [processedChunk, partialChunk] = processSingleChunk<T>(
chunk,
currPartialChunk
);
if (processedChunk) {
parsedChunkSections.push(processedChunk);
currPartialChunk = null;
} else {
currPartialChunk = partialChunk;
}
});
return [parsedChunkSections, currPartialChunk];
};