Fix count of docs for connector failure

This commit is contained in:
Weves
2023-08-13 17:24:05 -07:00
committed by Chris Weaver
parent be318433e3
commit e0cbd087f7
4 changed files with 14 additions and 6 deletions

View File

@@ -8,10 +8,7 @@ from pydantic import BaseModel
from danswer.chunking.models import EmbeddedIndexChunk from danswer.chunking.models import EmbeddedIndexChunk
from danswer.chunking.models import IndexChunk from danswer.chunking.models import IndexChunk
from danswer.chunking.models import InferenceChunk from danswer.chunking.models import InferenceChunk
from danswer.configs.constants import ALLOWED_GROUPS
from danswer.configs.constants import ALLOWED_USERS
from danswer.configs.constants import PUBLIC_DOC_PAT from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.connectors.models import Document
from danswer.connectors.models import IndexAttemptMetadata from danswer.connectors.models import IndexAttemptMetadata
@@ -55,7 +52,7 @@ T = TypeVar("T")
def _add_if_not_exists(l: list[T], item: T) -> list[T]: def _add_if_not_exists(l: list[T], item: T) -> list[T]:
if item not in l: if item in l:
return l return l
return l + [item] return l + [item]

View File

@@ -58,6 +58,9 @@ def _get_net_new_documents(
net_new_documents = 0 net_new_documents = 0
seen_documents: set[str] = set() seen_documents: set[str] = set()
for insertion_record in insertion_records: for insertion_record in insertion_records:
if insertion_record.already_existed:
continue
if insertion_record.document_id not in seen_documents: if insertion_record.document_id not in seen_documents:
net_new_documents += 1 net_new_documents += 1
seen_documents.add(insertion_record.document_id) seen_documents.add(insertion_record.document_id)
@@ -81,6 +84,7 @@ def _indexing_pipeline(
keyword_store_insertion_records = keyword_index.index( keyword_store_insertion_records = keyword_index.index(
chunks=chunks, index_attempt_metadata=index_attempt_metadata chunks=chunks, index_attempt_metadata=index_attempt_metadata
) )
logger.debug(f"Keyword store insertion records: {keyword_store_insertion_records}")
_upsert_insertion_records( _upsert_insertion_records(
insertion_records=keyword_store_insertion_records, insertion_records=keyword_store_insertion_records,
index_attempt_metadata=index_attempt_metadata, index_attempt_metadata=index_attempt_metadata,
@@ -94,6 +98,7 @@ def _indexing_pipeline(
vector_store_insertion_records = vector_index.index( vector_store_insertion_records = vector_index.index(
chunks=chunks_with_embeddings, index_attempt_metadata=index_attempt_metadata chunks=chunks_with_embeddings, index_attempt_metadata=index_attempt_metadata
) )
logger.debug(f"Vector store insertion records: {keyword_store_insertion_records}")
_upsert_insertion_records( _upsert_insertion_records(
insertion_records=vector_store_insertion_records, insertion_records=vector_store_insertion_records,
index_attempt_metadata=index_attempt_metadata, index_attempt_metadata=index_attempt_metadata,

View File

@@ -111,6 +111,8 @@ def index_qdrant_chunks(
cross_connector_document_metadata_map: dict[ cross_connector_document_metadata_map: dict[
str, CrossConnectorDocumentMetadata str, CrossConnectorDocumentMetadata
] = {} ] = {}
# document ids of documents that existed BEFORE this indexing
already_existing_documents: set[str] = set()
for chunk in chunks: for chunk in chunks:
document = chunk.source_document document = chunk.source_document
( (
@@ -130,6 +132,7 @@ def index_qdrant_chunks(
if should_delete_doc: if should_delete_doc:
# Processing the first chunk of the doc and the doc exists # Processing the first chunk of the doc and the doc exists
delete_qdrant_doc_chunks(document.id, collection, q_client) delete_qdrant_doc_chunks(document.id, collection, q_client)
already_existing_documents.add(document.id)
for minichunk_ind, embedding in enumerate(chunk.embeddings): for minichunk_ind, embedding in enumerate(chunk.embeddings):
qdrant_id = str(get_uuid_from_chunk(chunk, minichunk_ind)) qdrant_id = str(get_uuid_from_chunk(chunk, minichunk_ind))
@@ -137,7 +140,7 @@ def index_qdrant_chunks(
ChunkInsertionRecord( ChunkInsertionRecord(
document_id=document.id, document_id=document.id,
store_id=qdrant_id, store_id=qdrant_id,
already_existed=should_delete_doc, already_existed=document.id in already_existing_documents,
) )
) )
point_structs.append( point_structs.append(

View File

@@ -128,6 +128,8 @@ def index_typesense_chunks(
cross_connector_document_metadata_map: dict[ cross_connector_document_metadata_map: dict[
str, CrossConnectorDocumentMetadata str, CrossConnectorDocumentMetadata
] = {} ] = {}
# document ids of documents that existed BEFORE this indexing
already_existing_documents: set[str] = set()
for chunk in chunks: for chunk in chunks:
document = chunk.source_document document = chunk.source_document
( (
@@ -147,13 +149,14 @@ def index_typesense_chunks(
if should_delete_doc: if should_delete_doc:
# Processing the first chunk of the doc and the doc exists # Processing the first chunk of the doc and the doc exists
delete_typesense_doc_chunks(document.id, collection, ts_client) delete_typesense_doc_chunks(document.id, collection, ts_client)
already_existing_documents.add(document.id)
typesense_id = str(get_uuid_from_chunk(chunk)) typesense_id = str(get_uuid_from_chunk(chunk))
insertion_records.append( insertion_records.append(
ChunkInsertionRecord( ChunkInsertionRecord(
document_id=document.id, document_id=document.id,
store_id=typesense_id, store_id=typesense_id,
already_existed=should_delete_doc, already_existed=document.id in already_existing_documents,
) )
) )
new_documents.append( new_documents.append(