mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-08 11:58:34 +02:00
Hardening deletion when cc pair relationships are left over (#3154)
* more logs * this fence should be set to None * type hinting * reset deletion attempt if conditions are inconsistent * always clean up in db if we reach reconciliation * add reset method * more logging * harden up error checking
This commit is contained in:
parent
d69180aeb8
commit
6e698ac84a
@ -192,7 +192,8 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
)
|
||||
if attempt_id:
|
||||
task_logger.info(
|
||||
f"Indexing queued: index_attempt={attempt_id} "
|
||||
f"Connector indexing queued: "
|
||||
f"index_attempt={attempt_id} "
|
||||
f"cc_pair={cc_pair.id} "
|
||||
f"search_settings={search_settings_instance.id} "
|
||||
)
|
||||
@ -383,7 +384,6 @@ def try_creating_indexing_task(
|
||||
payload.index_attempt_id = index_attempt_id
|
||||
payload.celery_task_id = result.id
|
||||
redis_connector_index.set_fence(payload)
|
||||
|
||||
except Exception:
|
||||
redis_connector_index.set_fence(None)
|
||||
task_logger.exception(
|
||||
@ -516,7 +516,8 @@ def connector_indexing_task(
|
||||
logger.debug("Sentry DSN not provided, skipping Sentry initialization")
|
||||
|
||||
logger.info(
|
||||
f"Indexing spawned task starting: attempt={index_attempt_id} "
|
||||
f"Indexing spawned task starting: "
|
||||
f"attempt={index_attempt_id} "
|
||||
f"tenant={tenant_id} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id}"
|
||||
|
@ -177,7 +177,17 @@ def document_by_cc_pair_cleanup_task(
|
||||
f"Max celery task retries reached. Marking doc as dirty for reconciliation: "
|
||||
f"tenant={tenant_id} doc={document_id}"
|
||||
)
|
||||
with get_session_with_tenant(tenant_id):
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
# delete the cc pair relationship now and let reconciliation clean it up
|
||||
# in vespa
|
||||
delete_document_by_connector_credential_pair__no_commit(
|
||||
db_session=db_session,
|
||||
document_id=document_id,
|
||||
connector_credential_pair_identifier=ConnectorCredentialPairIdentifier(
|
||||
connector_id=connector_id,
|
||||
credential_id=credential_id,
|
||||
),
|
||||
)
|
||||
mark_document_as_modified(document_id, db_session)
|
||||
return False
|
||||
|
||||
|
@ -444,11 +444,22 @@ def monitor_connector_deletion_taskset(
|
||||
db_session, cc_pair.connector_id, cc_pair.credential_id
|
||||
)
|
||||
if len(doc_ids) > 0:
|
||||
# if this happens, documents somehow got added while deletion was in progress. Likely a bug
|
||||
# gating off pruning and indexing work before deletion starts
|
||||
# NOTE(rkuo): if this happens, documents somehow got added while
|
||||
# deletion was in progress. Likely a bug gating off pruning and indexing
|
||||
# work before deletion starts.
|
||||
task_logger.warning(
|
||||
f"Connector deletion - documents still found after taskset completion: "
|
||||
f"cc_pair={cc_pair_id} num={len(doc_ids)}"
|
||||
"Connector deletion - documents still found after taskset completion. "
|
||||
"Clearing the current deletion attempt and allowing deletion to restart: "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"docs_deleted={fence_data.num_tasks} "
|
||||
f"docs_remaining={len(doc_ids)}"
|
||||
)
|
||||
|
||||
# We don't want to waive off why we get into this state, but resetting
|
||||
# our attempt and letting the deletion restart is a good way to recover
|
||||
redis_connector.delete.reset()
|
||||
raise RuntimeError(
|
||||
"Connector deletion - documents still found after taskset completion"
|
||||
)
|
||||
|
||||
# clean up the rest of the related Postgres entities
|
||||
@ -512,8 +523,7 @@ def monitor_connector_deletion_taskset(
|
||||
f"docs_deleted={fence_data.num_tasks}"
|
||||
)
|
||||
|
||||
redis_connector.delete.taskset_clear()
|
||||
redis_connector.delete.set_fence(None)
|
||||
redis_connector.delete.reset()
|
||||
|
||||
|
||||
def monitor_ccpair_pruning_taskset(
|
||||
@ -645,26 +655,34 @@ def monitor_ccpair_indexing_taskset(
|
||||
result_state = result.state
|
||||
|
||||
status_int = redis_connector_index.get_completion()
|
||||
if status_int is None:
|
||||
if status_int is None: # completion signal not set ... check for errors
|
||||
# If we get here, and then the task both sets the completion signal and finishes,
|
||||
# we will incorrectly abort the task. We must check result state, then check
|
||||
# get_completion again to avoid the race condition.
|
||||
if result_state in READY_STATES:
|
||||
# IF the task state is READY, THEN generator_complete should be set
|
||||
# if it isn't, then the worker crashed
|
||||
task_logger.info(
|
||||
f"Connector indexing aborted: "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id} "
|
||||
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
|
||||
)
|
||||
|
||||
index_attempt = get_index_attempt(db_session, payload.index_attempt_id)
|
||||
if index_attempt:
|
||||
mark_attempt_failed(
|
||||
index_attempt_id=payload.index_attempt_id,
|
||||
db_session=db_session,
|
||||
failure_reason="Connector indexing aborted or exceptioned.",
|
||||
if redis_connector_index.get_completion() is None:
|
||||
# IF the task state is READY, THEN generator_complete should be set
|
||||
# if it isn't, then the worker crashed
|
||||
msg = (
|
||||
f"Connector indexing aborted or exceptioned: "
|
||||
f"attempt={payload.index_attempt_id} "
|
||||
f"celery_task={payload.celery_task_id} "
|
||||
f"result_state={result_state} "
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id} "
|
||||
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
|
||||
)
|
||||
task_logger.warning(msg)
|
||||
|
||||
redis_connector_index.reset()
|
||||
index_attempt = get_index_attempt(db_session, payload.index_attempt_id)
|
||||
if index_attempt:
|
||||
mark_attempt_failed(
|
||||
index_attempt_id=payload.index_attempt_id,
|
||||
db_session=db_session,
|
||||
failure_reason=msg,
|
||||
)
|
||||
|
||||
redis_connector_index.reset()
|
||||
return
|
||||
|
||||
status_enum = HTTPStatus(status_int)
|
||||
|
@ -14,6 +14,7 @@ from danswer.configs.constants import DanswerCeleryPriority
|
||||
from danswer.configs.constants import DanswerCeleryQueues
|
||||
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
|
||||
from danswer.db.document import construct_document_select_for_connector_credential_pair
|
||||
from danswer.db.models import Document as DbDocument
|
||||
|
||||
|
||||
class RedisConnectorDeletionFenceData(BaseModel):
|
||||
@ -98,7 +99,8 @@ class RedisConnectorDelete:
|
||||
stmt = construct_document_select_for_connector_credential_pair(
|
||||
cc_pair.connector_id, cc_pair.credential_id
|
||||
)
|
||||
for doc in db_session.scalars(stmt).yield_per(1):
|
||||
for doc_temp in db_session.scalars(stmt).yield_per(1):
|
||||
doc: DbDocument = doc_temp
|
||||
current_time = time.monotonic()
|
||||
if current_time - last_lock_time >= (
|
||||
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
|
||||
@ -130,6 +132,10 @@ class RedisConnectorDelete:
|
||||
|
||||
return len(async_results)
|
||||
|
||||
def reset(self) -> None:
|
||||
self.redis.delete(self.taskset_key)
|
||||
self.redis.delete(self.fence_key)
|
||||
|
||||
@staticmethod
|
||||
def remove_from_taskset(id: int, task_id: str, r: redis.Redis) -> None:
|
||||
taskset_key = f"{RedisConnectorDelete.TASKSET_PREFIX}_{id}"
|
||||
|
@ -865,21 +865,31 @@ def connector_run_once(
|
||||
)
|
||||
if attempt_id:
|
||||
logger.info(
|
||||
f"try_creating_indexing_task succeeded: cc_pair={cc_pair.id} attempt_id={attempt_id}"
|
||||
f"connector_run_once - try_creating_indexing_task succeeded: "
|
||||
f"connector={run_info.connector_id} "
|
||||
f"cc_pair={cc_pair.id} "
|
||||
f"attempt={attempt_id} "
|
||||
)
|
||||
index_attempt_ids.append(attempt_id)
|
||||
else:
|
||||
logger.info(f"try_creating_indexing_task failed: cc_pair={cc_pair.id}")
|
||||
logger.info(
|
||||
f"connector_run_once - try_creating_indexing_task failed: "
|
||||
f"connector={run_info.connector_id} "
|
||||
f"cc_pair={cc_pair.id}"
|
||||
)
|
||||
|
||||
if not index_attempt_ids:
|
||||
msg = "No new indexing attempts created, indexing jobs are queued or running."
|
||||
logger.info(msg)
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="No new indexing attempts created, indexing jobs are queued or running.",
|
||||
detail=msg,
|
||||
)
|
||||
|
||||
msg = f"Successfully created {len(index_attempt_ids)} index attempts. {index_attempt_ids}"
|
||||
return StatusResponse(
|
||||
success=True,
|
||||
message=f"Successfully created {len(index_attempt_ids)} index attempts",
|
||||
message=msg,
|
||||
data=index_attempt_ids,
|
||||
)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user