mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-29 01:10:58 +02:00
enforce index attempt deduping on secondary indexing. (#2054)
* enforce index attempt deduping on secondary indexing. * black fix * typo fixes --------- Co-authored-by: Richard Kuo <rkuo@rkuo.com>
This commit is contained in:
parent
a8a4ad9546
commit
cc3856ef6d
@ -103,7 +103,7 @@ def cleanup_connector_credential_pair_task(
|
|||||||
@build_celery_task_wrapper(name_cc_prune_task)
|
@build_celery_task_wrapper(name_cc_prune_task)
|
||||||
@celery_app.task(soft_time_limit=JOB_TIMEOUT)
|
@celery_app.task(soft_time_limit=JOB_TIMEOUT)
|
||||||
def prune_documents_task(connector_id: int, credential_id: int) -> None:
|
def prune_documents_task(connector_id: int, credential_id: int) -> None:
|
||||||
"""connector pruning task. For a cc pair, this task pulls all docuement IDs from the source
|
"""connector pruning task. For a cc pair, this task pulls all document IDs from the source
|
||||||
and compares those IDs to locally stored documents and deletes all locally stored IDs missing
|
and compares those IDs to locally stored documents and deletes all locally stored IDs missing
|
||||||
from the most recently pulled document ID list"""
|
from the most recently pulled document ID list"""
|
||||||
with Session(get_sqlalchemy_engine()) as db_session:
|
with Session(get_sqlalchemy_engine()) as db_session:
|
||||||
|
@ -72,10 +72,18 @@ def _should_create_new_indexing(
|
|||||||
# When switching over models, always index at least once
|
# When switching over models, always index at least once
|
||||||
if model.status == IndexModelStatus.FUTURE:
|
if model.status == IndexModelStatus.FUTURE:
|
||||||
if last_index:
|
if last_index:
|
||||||
# secondary indexes should not index again after success
|
# No new index if the last index attempt succeeded
|
||||||
# or else the model will never be able to swap
|
# Once is enough. The model will never be able to swap otherwise.
|
||||||
if last_index.status == IndexingStatus.SUCCESS:
|
if last_index.status == IndexingStatus.SUCCESS:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
# No new index if the last index attempt is waiting to start
|
||||||
|
if last_index.status == IndexingStatus.NOT_STARTED:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# No new index if the last index attempt is running
|
||||||
|
if last_index.status == IndexingStatus.IN_PROGRESS:
|
||||||
|
return False
|
||||||
else:
|
else:
|
||||||
if connector.id == 0: # Ingestion API
|
if connector.id == 0: # Ingestion API
|
||||||
return False
|
return False
|
||||||
|
@ -277,7 +277,7 @@ def mark_cc_pair__document_set_relationships_to_be_deleted__no_commit(
|
|||||||
`cc_pair_id` as not current and returns the list of all document set IDs
|
`cc_pair_id` as not current and returns the list of all document set IDs
|
||||||
affected.
|
affected.
|
||||||
|
|
||||||
NOTE: rases a `ValueError` if any of the document sets are currently syncing
|
NOTE: raises a `ValueError` if any of the document sets are currently syncing
|
||||||
to avoid getting into a bad state."""
|
to avoid getting into a bad state."""
|
||||||
document_set__cc_pair_relationships = db_session.scalars(
|
document_set__cc_pair_relationships = db_session.scalars(
|
||||||
select(DocumentSet__ConnectorCredentialPair).where(
|
select(DocumentSet__ConnectorCredentialPair).where(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user