Mark incomplete Index Attempts as Failed on job restart (#371)

This commit is contained in:
Yuhong Sun
2023-08-31 17:43:03 -07:00
committed by GitHub
parent dac5aaea94
commit 43efa9da94
2 changed files with 22 additions and 0 deletions

View File

@@ -19,6 +19,7 @@ from danswer.datastores.indexing_pipeline import build_indexing_pipeline
from danswer.db.connector import disable_connector from danswer.db.connector import disable_connector
from danswer.db.connector import fetch_connectors from danswer.db.connector import fetch_connectors
from danswer.db.connector_credential_pair import get_last_successful_attempt_time from danswer.db.connector_credential_pair import get_last_successful_attempt_time
from danswer.db.connector_credential_pair import mark_all_in_progress_cc_pairs_failed
from danswer.db.connector_credential_pair import update_connector_credential_pair from danswer.db.connector_credential_pair import update_connector_credential_pair
from danswer.db.credentials import backend_update_credential_json from danswer.db.credentials import backend_update_credential_json
from danswer.db.engine import get_db_current_time from danswer.db.engine import get_db_current_time
@@ -410,6 +411,12 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non
client = Client(cluster) client = Client(cluster)
existing_jobs: dict[int, Future] = {} existing_jobs: dict[int, Future] = {}
engine = get_sqlalchemy_engine() engine = get_sqlalchemy_engine()
with Session(engine) as db_session:
# Previous version did not always clean up cc-pairs well leaving some connectors undeleteable
# This ensures that bad states get cleaned up
mark_all_in_progress_cc_pairs_failed(db_session)
while True: while True:
start = time.time() start = time.time()
start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S")

View File

@@ -3,6 +3,7 @@ from datetime import datetime
from fastapi import HTTPException from fastapi import HTTPException
from sqlalchemy import delete from sqlalchemy import delete
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy import update
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from danswer.db.connector import fetch_connector_by_id from danswer.db.connector import fetch_connector_by_id
@@ -92,6 +93,20 @@ def delete_connector_credential_pair(
db_session.execute(stmt) db_session.execute(stmt)
def mark_all_in_progress_cc_pairs_failed(
db_session: Session,
) -> None:
stmt = (
update(ConnectorCredentialPair)
.where(
ConnectorCredentialPair.last_attempt_status == IndexingStatus.IN_PROGRESS
)
.values(last_attempt_status=IndexingStatus.FAILED)
)
db_session.execute(stmt)
db_session.commit()
def add_credential_to_connector( def add_credential_to_connector(
connector_id: int, connector_id: int,
credential_id: int, credential_id: int,