From 4400a945e3349d02184ff646093f2209409b129a Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 10 Jan 2025 14:18:49 -0800 Subject: [PATCH] optimize another index attempt check --- ...add_index_to_index_attempt_time_created.py | 36 +++++++++++++++++++ .../background/celery/tasks/indexing/tasks.py | 9 +++++ backend/onyx/db/index_attempt.py | 14 ++------ backend/onyx/db/models.py | 3 +- 4 files changed, 50 insertions(+), 12 deletions(-) create mode 100644 backend/alembic/versions/0f7ff6d75b57_add_index_to_index_attempt_time_created.py diff --git a/backend/alembic/versions/0f7ff6d75b57_add_index_to_index_attempt_time_created.py b/backend/alembic/versions/0f7ff6d75b57_add_index_to_index_attempt_time_created.py new file mode 100644 index 00000000000..23db56bd61e --- /dev/null +++ b/backend/alembic/versions/0f7ff6d75b57_add_index_to_index_attempt_time_created.py @@ -0,0 +1,36 @@ +"""add index to index_attempt.time_created + +Revision ID: 0f7ff6d75b57 +Revises: 369644546676 +Create Date: 2025-01-10 14:01:14.067144 + +""" +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0f7ff6d75b57" +down_revision = "369644546676" +branch_labels: None = None +depends_on: None = None + + +def upgrade() -> None: + op.create_index( + op.f("ix_index_attempt_status"), + "index_attempt", + ["status"], + unique=False, + ) + + op.create_index( + op.f("ix_index_attempt_time_created"), + "index_attempt", + ["time_created"], + unique=False, + ) + + +def downgrade() -> None: + op.drop_index(op.f("ix_index_attempt_time_created"), table_name="index_attempt") + + op.drop_index(op.f("ix_index_attempt_status"), table_name="index_attempt") diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 9fd73972d0e..bbc64d2420e 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -417,6 +417,15 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: unfenced_attempt_ids = get_unfenced_index_attempt_ids( db_session, redis_client ) + + if tenant_id in debug_tenants: + ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK) + task_logger.info( + f"check_for_indexing after get unfenced lock: " + f"tenant={tenant_id} " + f"ttl={ttl}" + ) + for attempt_id in unfenced_attempt_ids: # debugging logic - remove after we're done if tenant_id in debug_tenants: diff --git a/backend/onyx/db/index_attempt.py b/backend/onyx/db/index_attempt.py index 20e8521d207..692a27976f5 100644 --- a/backend/onyx/db/index_attempt.py +++ b/backend/onyx/db/index_attempt.py @@ -9,7 +9,6 @@ from sqlalchemy import desc from sqlalchemy import func from sqlalchemy import select from sqlalchemy import update -from sqlalchemy.orm import joinedload from sqlalchemy.orm import Session from onyx.connectors.models import Document @@ -118,21 +117,14 @@ def get_in_progress_index_attempts( def get_all_index_attempts_by_status( status: IndexingStatus, db_session: Session ) -> list[IndexAttempt]: - """This eagerly loads the connector and credential so that the db_session can be expired - before running long-living indexing jobs, which causes increasing memory usage. + """Returns index attempts with the given status. + Only recommend calling this with non-terminal states as the full list of + terminal statuses may be quite large. Results are ordered by time_created (oldest to newest).""" stmt = select(IndexAttempt) stmt = stmt.where(IndexAttempt.status == status) stmt = stmt.order_by(IndexAttempt.time_created) - stmt = stmt.options( - joinedload(IndexAttempt.connector_credential_pair).joinedload( - ConnectorCredentialPair.connector - ), - joinedload(IndexAttempt.connector_credential_pair).joinedload( - ConnectorCredentialPair.credential - ), - ) new_attempts = db_session.scalars(stmt) return list(new_attempts.all()) diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index ff1c98d13d8..a1f7967641e 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -763,7 +763,7 @@ class IndexAttempt(Base): # the run once API from_beginning: Mapped[bool] = mapped_column(Boolean) status: Mapped[IndexingStatus] = mapped_column( - Enum(IndexingStatus, native_enum=False) + Enum(IndexingStatus, native_enum=False, index=True) ) # The two below may be slightly out of sync if user switches Embedding Model new_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0) @@ -782,6 +782,7 @@ class IndexAttempt(Base): time_created: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), + index=True, ) # when the actual indexing run began # NOTE: will use the api_server clock rather than DB server clock