Fix Json Output Issue and Fix miscount of new docs per Index Attempt (#641)

This commit is contained in:
Yuhong Sun 2023-10-29 00:34:28 -07:00 committed by GitHub
parent 26b491fb0c
commit 57ecab0098
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 75 additions and 19 deletions

View File

@ -0,0 +1,32 @@
"""Add Total Docs for Index Attempt
Revision ID: d61e513bef0a
Revises: 46625e4745d4
Create Date: 2023-10-27 23:02:43.369964
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d61e513bef0a"
down_revision = "46625e4745d4"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"index_attempt",
sa.Column("new_docs_indexed", sa.Integer(), nullable=True),
)
op.alter_column(
"index_attempt", "num_docs_indexed", new_column_name="total_docs_indexed"
)
def downgrade() -> None:
op.alter_column(
"index_attempt", "total_docs_indexed", new_column_name="num_docs_indexed"
)
op.drop_column("index_attempt", "new_docs_indexed")

View File

@ -300,15 +300,18 @@ def _run_indexing(
document_count += len(doc_batch)
# commit transaction so that the `update` below begins
# with a brand new tracsaction. Postgres uses the start
# with a brand new transaction. Postgres uses the start
# of the transactions when computing `NOW()`, so if we have
# a long running transaction, the `time_updated` field will
# be inaccurate
db_session.commit()
# This new value is updated every batch, so UI can refresh per batch update
update_docs_indexed(
db_session=db_session,
index_attempt=attempt,
num_docs_indexed=document_count,
total_docs_indexed=document_count,
new_docs_indexed=net_doc_change,
)
# check if connector is disabled mid run and stop if so

View File

@ -90,9 +90,14 @@ def mark_attempt_failed(
def update_docs_indexed(
db_session: Session, index_attempt: IndexAttempt, num_docs_indexed: int
db_session: Session,
index_attempt: IndexAttempt,
total_docs_indexed: int,
new_docs_indexed: int,
) -> None:
index_attempt.num_docs_indexed = num_docs_indexed
index_attempt.total_docs_indexed = total_docs_indexed
index_attempt.new_docs_indexed = new_docs_indexed
db_session.add(index_attempt)
db_session.commit()

View File

@ -267,7 +267,8 @@ class IndexAttempt(Base):
nullable=True,
)
status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
num_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0)
new_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0)
total_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0)
error_msg: Mapped[str | None] = mapped_column(
Text, default=None
) # only filled if status = "failed"

View File

@ -3,6 +3,7 @@ import math
import re
from collections.abc import Generator
from collections.abc import Iterator
from json.decoder import JSONDecodeError
from typing import cast
from typing import Optional
from typing import Tuple
@ -92,11 +93,18 @@ def separate_answer_quotes(
try:
model_raw_json = json.loads(answer_raw, strict=False)
return extract_answer_quotes_json(model_raw_json)
except ValueError:
if is_json_prompt:
logger.error("Model did not output in json format as expected.")
raise
return extract_answer_quotes_freeform(answer_raw)
except JSONDecodeError:
# LLMs get confused when handling the list in the json. Sometimes it doesn't attend
# enough to the previous { token so it just ends the list of quotes and stops there
# here, we add logic to try to fix this LLM error.
try:
model_raw_json = json.loads(answer_raw + "}", strict=False)
return extract_answer_quotes_json(model_raw_json)
except JSONDecodeError:
if is_json_prompt:
logger.error("Model did not output in json format as expected.")
raise
return extract_answer_quotes_freeform(answer_raw)
def match_quotes_to_docs(

View File

@ -277,6 +277,13 @@ def _index_vespa_chunks(
if chunk_already_existed:
already_existing_documents.add(chunk.source_document.id)
# In the logic below, we check if the chunk comes from a doc that has already been
# added to already_existing_document. This works because the chunks are ordered
# and because the Document chunks are not separated into different batches.
# The first chunk is processed first and if it exists, then its entire document
# is marked as already existing, so if the document length increases and new chunks
# are added, they must come last in processing and the doc would already be in
# already existing documents.
insertion_records.add(
DocumentInsertionRecord(
document_id=chunk.source_document.id,

View File

@ -23,7 +23,7 @@ class CCPairFullInfo(BaseModel):
cc_pair_model: ConnectorCredentialPair,
index_attempt_models: list[IndexAttempt],
latest_deletion_attempt: DeletionAttemptSnapshot | None,
num_docs_indexed: int, # not ideal, but this must be computed seperately
num_docs_indexed: int, # not ideal, but this must be computed separately
) -> "CCPairFullInfo":
return cls(
id=cc_pair_model.id,

View File

@ -311,7 +311,7 @@ class IndexAttemptRequest(BaseModel):
class IndexAttemptSnapshot(BaseModel):
id: int
status: IndexingStatus | None
num_docs_indexed: int
new_docs_indexed: int
error_msg: str | None
time_started: str | None
time_updated: str
@ -323,7 +323,7 @@ class IndexAttemptSnapshot(BaseModel):
return IndexAttemptSnapshot(
id=index_attempt.id,
status=index_attempt.status,
num_docs_indexed=index_attempt.num_docs_indexed or 0,
new_docs_indexed=index_attempt.new_docs_indexed or 0,
error_msg=index_attempt.error_msg,
time_started=index_attempt.time_started.isoformat()
if index_attempt.time_started

View File

@ -46,7 +46,7 @@ export function IndexingAttemptsTable({ ccPair }: { ccPair: CCPairFullInfo }) {
size="xs"
/>
</TableCell>
<TableCell>{indexAttempt.num_docs_indexed}</TableCell>
<TableCell>{indexAttempt.new_docs_indexed}</TableCell>
<TableCell>
<Text className="flex flex-wrap whitespace-normal">
{indexAttempt.error_msg || "-"}

View File

@ -44,12 +44,12 @@ function CCPairIndexingStatusDisplay({
errorMsg={ccPairsIndexingStatus?.latest_index_attempt?.error_msg}
size="xs"
/>
{ccPairsIndexingStatus?.latest_index_attempt?.num_docs_indexed &&
{ccPairsIndexingStatus?.latest_index_attempt?.new_docs_indexed &&
ccPairsIndexingStatus?.latest_index_attempt?.status === "in_progress" ? (
<div className="text-xs mt-0.5">
<div>
<i>Current Run:</i>{" "}
{ccPairsIndexingStatus.latest_index_attempt.num_docs_indexed} docs
{ccPairsIndexingStatus.latest_index_attempt.new_docs_indexed} docs
indexed
</div>
<div>

View File

@ -7,7 +7,7 @@ export const getDocsProcessedPerMinute = (
!indexAttempt ||
!indexAttempt.time_started ||
!indexAttempt.time_updated ||
indexAttempt.num_docs_indexed === 0
indexAttempt.new_docs_indexed === 0
) {
return null;
}
@ -22,5 +22,5 @@ export const getDocsProcessedPerMinute = (
if (seconds < 10) {
return null;
}
return (indexAttempt.num_docs_indexed / seconds) * 60;
return (indexAttempt.new_docs_indexed / seconds) * 60;
};

View File

@ -131,7 +131,7 @@ export interface GoogleSitesConfig {
export interface IndexAttemptSnapshot {
id: number;
status: ValidStatuses | null;
num_docs_indexed: number;
new_docs_indexed: number;
error_msg: string | null;
time_started: string | null;
time_updated: string;