order list of new attempts from oldest to newest to prevent connector starvation (#1918)

This commit is contained in:
rkuo-danswer 2024-07-24 14:02:20 -07:00 committed by GitHub
parent 4fb6a88f1e
commit 5afddacbb2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 6 additions and 1 deletions

View File

@ -271,6 +271,8 @@ def kickoff_indexing_jobs(
# Don't include jobs waiting in the Dask queue that just haven't started running
# Also (rarely) don't include for jobs that started but haven't updated the indexing tables yet
with Session(engine) as db_session:
# get_not_started_index_attempts orders its returned results from oldest to newest
# we must process attempts in a FIFO manner to prevent connector starvation
new_indexing_attempts = [
(attempt, attempt.embedding_model)
for attempt in get_not_started_index_attempts(db_session)

View File

@ -65,9 +65,12 @@ def get_inprogress_index_attempts(
def get_not_started_index_attempts(db_session: Session) -> list[IndexAttempt]:
"""This eagerly loads the connector and credential so that the db_session can be expired
before running long-living indexing jobs, which causes increasing memory usage"""
before running long-living indexing jobs, which causes increasing memory usage.
Results are ordered by time_created (oldest to newest)."""
stmt = select(IndexAttempt)
stmt = stmt.where(IndexAttempt.status == IndexingStatus.NOT_STARTED)
stmt = stmt.order_by(IndexAttempt.time_created)
stmt = stmt.options(
joinedload(IndexAttempt.connector), joinedload(IndexAttempt.credential)
)