diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 9c8317734..59e2de6b6 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -271,6 +271,8 @@ def kickoff_indexing_jobs( # Don't include jobs waiting in the Dask queue that just haven't started running # Also (rarely) don't include for jobs that started but haven't updated the indexing tables yet with Session(engine) as db_session: + # get_not_started_index_attempts orders its returned results from oldest to newest + # we must process attempts in a FIFO manner to prevent connector starvation new_indexing_attempts = [ (attempt, attempt.embedding_model) for attempt in get_not_started_index_attempts(db_session) diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index 91b13cfd6..5a86a3e53 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -65,9 +65,12 @@ def get_inprogress_index_attempts( def get_not_started_index_attempts(db_session: Session) -> list[IndexAttempt]: """This eagerly loads the connector and credential so that the db_session can be expired - before running long-living indexing jobs, which causes increasing memory usage""" + before running long-living indexing jobs, which causes increasing memory usage. + + Results are ordered by time_created (oldest to newest).""" stmt = select(IndexAttempt) stmt = stmt.where(IndexAttempt.status == IndexingStatus.NOT_STARTED) + stmt = stmt.order_by(IndexAttempt.time_created) stmt = stmt.options( joinedload(IndexAttempt.connector), joinedload(IndexAttempt.credential) )