diff --git a/backend/danswer/background/indexing/checkpointing.py b/backend/danswer/background/indexing/checkpointing.py index d9c87625aad..673540b0ea4 100644 --- a/backend/danswer/background/indexing/checkpointing.py +++ b/backend/danswer/background/indexing/checkpointing.py @@ -13,7 +13,7 @@ def _2010_dt() -> datetime.datetime: def _2020_dt() -> datetime.datetime: - return datetime.datetime(year=2010, month=1, day=1, tzinfo=datetime.timezone.utc) + return datetime.datetime(year=2020, month=1, day=1, tzinfo=datetime.timezone.utc) def _default_end_time( @@ -34,7 +34,7 @@ def _default_end_time( return _2010_dt() if last_successful_run < _2020_dt(): - return last_successful_run + datetime.timedelta(days=365 * 5) + return min(last_successful_run + datetime.timedelta(days=365 * 5), _2020_dt()) return last_successful_run + datetime.timedelta(days=180) diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 5c562cf3b10..cacc8f8c98b 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -112,6 +112,7 @@ def _run_indexing( net_doc_change = 0 document_count = 0 chunk_count = 0 + run_end_dt = None for ind, (window_start, window_end) in enumerate( get_time_windows_for_index_attempt( last_successful_run=datetime.fromtimestamp( @@ -129,6 +130,12 @@ def _run_indexing( try: for doc_batch in doc_batch_generator: + # check if connector is disabled mid run and stop if so + db_session.refresh(db_connector) + if db_connector.disabled: + # let the `except` block handle this + raise RuntimeError("Connector was disabled mid run") + logger.debug( f"Indexing batch of documents: {[doc.to_short_descriptor() for doc in doc_batch]}" ) @@ -159,40 +166,40 @@ def _run_indexing( new_docs_indexed=net_doc_change, ) - # check if connector is disabled mid run and stop if so - db_session.refresh(db_connector) - if db_connector.disabled: - # let the `except` block handle this - raise RuntimeError("Connector was disabled mid run") - + run_end_dt = window_end update_connector_credential_pair( db_session=db_session, connector_id=db_connector.id, credential_id=db_credential.id, attempt_status=IndexingStatus.IN_PROGRESS, net_docs=net_doc_change, - run_dt=window_end, + run_dt=run_end_dt, ) except Exception as e: logger.info( - f"Failed connector elapsed time: {time.time() - start_time} seconds" + f"Connector run ran into exception after elapsed time: {time.time() - start_time} seconds" ) # Only mark the attempt as a complete failure if this is the first indexing window. # Otherwise, some progress was made - the next run will not start from the beginning. # In this case, it is not accurate to mark it as a failure. When the next run begins, - # if that fails immediately, it will be marked as a failure - if ind == 0: + # if that fails immediately, it will be marked as a failure. + # + # NOTE: if the connector is manually disabled, we should mark it as a failure regardless + # to give better clarity in the UI, as the next run will never happen. + if ind == 0 or db_connector.disabled: mark_attempt_failed(index_attempt, db_session, failure_reason=str(e)) + update_connector_credential_pair( + db_session=db_session, + connector_id=index_attempt.connector.id, + credential_id=index_attempt.credential.id, + attempt_status=IndexingStatus.FAILED, + net_docs=net_doc_change, + ) + raise e - update_connector_credential_pair( - db_session=db_session, - connector_id=index_attempt.connector.id, - credential_id=index_attempt.credential.id, - attempt_status=IndexingStatus.FAILED, - net_docs=net_doc_change, - run_dt=window_end, - ) - raise e + # break => similar to success case. As mentioned above, if the next run fails for the same + # reason it will then be marked as a failure + break mark_attempt_succeeded(index_attempt, db_session) update_connector_credential_pair( @@ -201,7 +208,7 @@ def _run_indexing( credential_id=db_credential.id, attempt_status=IndexingStatus.SUCCESS, net_docs=net_doc_change, - run_dt=window_end, + run_dt=run_end_dt, ) logger.info( diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py index d199a2acb1a..b451f70c425 100644 --- a/backend/danswer/db/connector_credential_pair.py +++ b/backend/danswer/db/connector_credential_pair.py @@ -86,7 +86,10 @@ def update_connector_credential_pair( cc_pair.last_attempt_status = attempt_status # simply don't update last_successful_index_time if run_dt is not specified # at worst, this would result in re-indexing documents that were already indexed - if attempt_status == IndexingStatus.SUCCESS and run_dt is not None: + if ( + attempt_status == IndexingStatus.SUCCESS + or attempt_status == IndexingStatus.IN_PROGRESS + ) and run_dt is not None: cc_pair.last_successful_index_time = run_dt if net_docs is not None: cc_pair.total_docs_indexed += net_docs diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index 8f9f6b22185..3de57ec6133 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -76,6 +76,7 @@ services: - CONFLUENCE_CONNECTOR_LABELS_TO_SKIP=${CONFLUENCE_CONNECTOR_LABELS_TO_SKIP:-} - GONG_CONNECTOR_START_TIME=${GONG_CONNECTOR_START_TIME:-} - EXPERIMENTAL_SIMPLE_JOB_CLIENT_ENABLED=${EXPERIMENTAL_SIMPLE_JOB_CLIENT_ENABLED:-} + - EXPERIMENTAL_CHECKPOINTING_ENABLED=${EXPERIMENTAL_CHECKPOINTING_ENABLED:-} # Danswer SlackBot Configs - DANSWER_BOT_SLACK_APP_TOKEN=${DANSWER_BOT_SLACK_APP_TOKEN:-} - DANSWER_BOT_SLACK_BOT_TOKEN=${DANSWER_BOT_SLACK_BOT_TOKEN:-}