optimize another index attempt check

This commit is contained in:
Richard Kuo (Danswer)
2025-01-10 14:18:49 -08:00
parent cab7e60542
commit 4400a945e3
4 changed files with 50 additions and 12 deletions

View File

@@ -0,0 +1,36 @@
"""add index to index_attempt.time_created
Revision ID: 0f7ff6d75b57
Revises: 369644546676
Create Date: 2025-01-10 14:01:14.067144
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "0f7ff6d75b57"
down_revision = "369644546676"
branch_labels: None = None
depends_on: None = None
def upgrade() -> None:
op.create_index(
op.f("ix_index_attempt_status"),
"index_attempt",
["status"],
unique=False,
)
op.create_index(
op.f("ix_index_attempt_time_created"),
"index_attempt",
["time_created"],
unique=False,
)
def downgrade() -> None:
op.drop_index(op.f("ix_index_attempt_time_created"), table_name="index_attempt")
op.drop_index(op.f("ix_index_attempt_status"), table_name="index_attempt")

View File

@@ -417,6 +417,15 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
unfenced_attempt_ids = get_unfenced_index_attempt_ids( unfenced_attempt_ids = get_unfenced_index_attempt_ids(
db_session, redis_client db_session, redis_client
) )
if tenant_id in debug_tenants:
ttl = redis_client.ttl(OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK)
task_logger.info(
f"check_for_indexing after get unfenced lock: "
f"tenant={tenant_id} "
f"ttl={ttl}"
)
for attempt_id in unfenced_attempt_ids: for attempt_id in unfenced_attempt_ids:
# debugging logic - remove after we're done # debugging logic - remove after we're done
if tenant_id in debug_tenants: if tenant_id in debug_tenants:

View File

@@ -9,7 +9,6 @@ from sqlalchemy import desc
from sqlalchemy import func from sqlalchemy import func
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy import update from sqlalchemy import update
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from onyx.connectors.models import Document from onyx.connectors.models import Document
@@ -118,21 +117,14 @@ def get_in_progress_index_attempts(
def get_all_index_attempts_by_status( def get_all_index_attempts_by_status(
status: IndexingStatus, db_session: Session status: IndexingStatus, db_session: Session
) -> list[IndexAttempt]: ) -> list[IndexAttempt]:
"""This eagerly loads the connector and credential so that the db_session can be expired """Returns index attempts with the given status.
before running long-living indexing jobs, which causes increasing memory usage. Only recommend calling this with non-terminal states as the full list of
terminal statuses may be quite large.
Results are ordered by time_created (oldest to newest).""" Results are ordered by time_created (oldest to newest)."""
stmt = select(IndexAttempt) stmt = select(IndexAttempt)
stmt = stmt.where(IndexAttempt.status == status) stmt = stmt.where(IndexAttempt.status == status)
stmt = stmt.order_by(IndexAttempt.time_created) stmt = stmt.order_by(IndexAttempt.time_created)
stmt = stmt.options(
joinedload(IndexAttempt.connector_credential_pair).joinedload(
ConnectorCredentialPair.connector
),
joinedload(IndexAttempt.connector_credential_pair).joinedload(
ConnectorCredentialPair.credential
),
)
new_attempts = db_session.scalars(stmt) new_attempts = db_session.scalars(stmt)
return list(new_attempts.all()) return list(new_attempts.all())

View File

@@ -763,7 +763,7 @@ class IndexAttempt(Base):
# the run once API # the run once API
from_beginning: Mapped[bool] = mapped_column(Boolean) from_beginning: Mapped[bool] = mapped_column(Boolean)
status: Mapped[IndexingStatus] = mapped_column( status: Mapped[IndexingStatus] = mapped_column(
Enum(IndexingStatus, native_enum=False) Enum(IndexingStatus, native_enum=False, index=True)
) )
# The two below may be slightly out of sync if user switches Embedding Model # The two below may be slightly out of sync if user switches Embedding Model
new_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0) new_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0)
@@ -782,6 +782,7 @@ class IndexAttempt(Base):
time_created: Mapped[datetime.datetime] = mapped_column( time_created: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), DateTime(timezone=True),
server_default=func.now(), server_default=func.now(),
index=True,
) )
# when the actual indexing run began # when the actual indexing run began
# NOTE: will use the api_server clock rather than DB server clock # NOTE: will use the api_server clock rather than DB server clock