Fix experimental checkpointing + move check for disabled connector to the start of the batch (#703)

This commit is contained in:
Chris Weaver 2023-11-06 17:14:31 -08:00 committed by GitHub
parent 24b3b1fa9e
commit f5bf2e6374
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 34 additions and 23 deletions

View File

@ -13,7 +13,7 @@ def _2010_dt() -> datetime.datetime:
def _2020_dt() -> datetime.datetime:
return datetime.datetime(year=2010, month=1, day=1, tzinfo=datetime.timezone.utc)
return datetime.datetime(year=2020, month=1, day=1, tzinfo=datetime.timezone.utc)
def _default_end_time(
@ -34,7 +34,7 @@ def _default_end_time(
return _2010_dt()
if last_successful_run < _2020_dt():
return last_successful_run + datetime.timedelta(days=365 * 5)
return min(last_successful_run + datetime.timedelta(days=365 * 5), _2020_dt())
return last_successful_run + datetime.timedelta(days=180)

View File

@ -112,6 +112,7 @@ def _run_indexing(
net_doc_change = 0
document_count = 0
chunk_count = 0
run_end_dt = None
for ind, (window_start, window_end) in enumerate(
get_time_windows_for_index_attempt(
last_successful_run=datetime.fromtimestamp(
@ -129,6 +130,12 @@ def _run_indexing(
try:
for doc_batch in doc_batch_generator:
# check if connector is disabled mid run and stop if so
db_session.refresh(db_connector)
if db_connector.disabled:
# let the `except` block handle this
raise RuntimeError("Connector was disabled mid run")
logger.debug(
f"Indexing batch of documents: {[doc.to_short_descriptor() for doc in doc_batch]}"
)
@ -159,40 +166,40 @@ def _run_indexing(
new_docs_indexed=net_doc_change,
)
# check if connector is disabled mid run and stop if so
db_session.refresh(db_connector)
if db_connector.disabled:
# let the `except` block handle this
raise RuntimeError("Connector was disabled mid run")
run_end_dt = window_end
update_connector_credential_pair(
db_session=db_session,
connector_id=db_connector.id,
credential_id=db_credential.id,
attempt_status=IndexingStatus.IN_PROGRESS,
net_docs=net_doc_change,
run_dt=window_end,
run_dt=run_end_dt,
)
except Exception as e:
logger.info(
f"Failed connector elapsed time: {time.time() - start_time} seconds"
f"Connector run ran into exception after elapsed time: {time.time() - start_time} seconds"
)
# Only mark the attempt as a complete failure if this is the first indexing window.
# Otherwise, some progress was made - the next run will not start from the beginning.
# In this case, it is not accurate to mark it as a failure. When the next run begins,
# if that fails immediately, it will be marked as a failure
if ind == 0:
# if that fails immediately, it will be marked as a failure.
#
# NOTE: if the connector is manually disabled, we should mark it as a failure regardless
# to give better clarity in the UI, as the next run will never happen.
if ind == 0 or db_connector.disabled:
mark_attempt_failed(index_attempt, db_session, failure_reason=str(e))
update_connector_credential_pair(
db_session=db_session,
connector_id=index_attempt.connector.id,
credential_id=index_attempt.credential.id,
attempt_status=IndexingStatus.FAILED,
net_docs=net_doc_change,
)
raise e
update_connector_credential_pair(
db_session=db_session,
connector_id=index_attempt.connector.id,
credential_id=index_attempt.credential.id,
attempt_status=IndexingStatus.FAILED,
net_docs=net_doc_change,
run_dt=window_end,
)
raise e
# break => similar to success case. As mentioned above, if the next run fails for the same
# reason it will then be marked as a failure
break
mark_attempt_succeeded(index_attempt, db_session)
update_connector_credential_pair(
@ -201,7 +208,7 @@ def _run_indexing(
credential_id=db_credential.id,
attempt_status=IndexingStatus.SUCCESS,
net_docs=net_doc_change,
run_dt=window_end,
run_dt=run_end_dt,
)
logger.info(

View File

@ -86,7 +86,10 @@ def update_connector_credential_pair(
cc_pair.last_attempt_status = attempt_status
# simply don't update last_successful_index_time if run_dt is not specified
# at worst, this would result in re-indexing documents that were already indexed
if attempt_status == IndexingStatus.SUCCESS and run_dt is not None:
if (
attempt_status == IndexingStatus.SUCCESS
or attempt_status == IndexingStatus.IN_PROGRESS
) and run_dt is not None:
cc_pair.last_successful_index_time = run_dt
if net_docs is not None:
cc_pair.total_docs_indexed += net_docs

View File

@ -76,6 +76,7 @@ services:
- CONFLUENCE_CONNECTOR_LABELS_TO_SKIP=${CONFLUENCE_CONNECTOR_LABELS_TO_SKIP:-}
- GONG_CONNECTOR_START_TIME=${GONG_CONNECTOR_START_TIME:-}
- EXPERIMENTAL_SIMPLE_JOB_CLIENT_ENABLED=${EXPERIMENTAL_SIMPLE_JOB_CLIENT_ENABLED:-}
- EXPERIMENTAL_CHECKPOINTING_ENABLED=${EXPERIMENTAL_CHECKPOINTING_ENABLED:-}
# Danswer SlackBot Configs
- DANSWER_BOT_SLACK_APP_TOKEN=${DANSWER_BOT_SLACK_APP_TOKEN:-}
- DANSWER_BOT_SLACK_BOT_TOKEN=${DANSWER_BOT_SLACK_BOT_TOKEN:-}