Fix Secondary Index Polling (#1020)

This commit is contained in:
Yuhong Sun 2024-01-29 19:34:25 -08:00 committed by GitHub
parent 0a9b854667
commit ce12dd4a5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 30 additions and 9 deletions

View File

@ -136,6 +136,7 @@ def _run_indexing(
last_successful_index_time = get_last_successful_attempt_time(
connector_id=db_connector.id,
credential_id=db_credential.id,
embedding_model=index_attempt.embedding_model,
db_session=db_session,
)

View File

@ -56,20 +56,40 @@ def get_connector_credential_pair_from_id(
def get_last_successful_attempt_time(
connector_id: int,
credential_id: int,
embedding_model: EmbeddingModel,
db_session: Session,
) -> float:
"""Gets the timestamp of the last successful index run stored in
the CC Pair row in the database"""
connector_credential_pair = get_connector_credential_pair(
connector_id, credential_id, db_session
if embedding_model.status == IndexModelStatus.PRESENT:
connector_credential_pair = get_connector_credential_pair(
connector_id, credential_id, db_session
)
if (
connector_credential_pair is None
or connector_credential_pair.last_successful_index_time is None
):
return 0.0
return connector_credential_pair.last_successful_index_time.timestamp()
# For Secondary Index we don't keep track of the latest success, so have to calculate it live
attempt = (
db_session.query(IndexAttempt)
.filter(
IndexAttempt.connector_id == connector_id,
IndexAttempt.credential_id == credential_id,
IndexAttempt.embedding_model_id == embedding_model.id,
IndexAttempt.status == IndexingStatus.SUCCESS,
)
.order_by(IndexAttempt.time_started.desc())
.first()
)
if (
connector_credential_pair is None
or connector_credential_pair.last_successful_index_time is None
):
if not attempt or not attempt.time_started:
return 0.0
return connector_credential_pair.last_successful_index_time.timestamp()
return attempt.time_started.timestamp()
def update_connector_credential_pair(
@ -262,7 +282,7 @@ def resync_cc_pair(
if only_include_success:
query = query.filter(IndexAttempt.status == IndexingStatus.SUCCESS)
latest_index_attempt = query.order_by(desc(IndexAttempt.time_updated)).first()
latest_index_attempt = query.order_by(desc(IndexAttempt.time_started)).first()
return latest_index_attempt
@ -274,7 +294,7 @@ def resync_cc_pair(
)
cc_pair.last_successful_index_time = (
last_success.time_updated if last_success else None
last_success.time_started if last_success else None
)
last_run = find_latest_index_attempt(