Prevent Scheduling Multiple Queued Indexings (#997)

This commit is contained in:
Yuhong Sun 2024-01-24 16:31:29 -08:00 committed by GitHub
parent 50086526e2
commit 92628357df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 46 additions and 13 deletions

View File

@ -40,8 +40,13 @@ def _default_end_time(
def find_end_time_for_indexing_attempt(
last_successful_run: datetime.datetime | None, source_type: DocumentSource
last_successful_run: datetime.datetime | None,
# source_type can be used to override the default for certain connectors, currently unused
source_type: DocumentSource,
) -> datetime.datetime | None:
"""Is the current time unless the connector is run over a large period, in which case it is
split up into large time segments that become smaller as it approaches the present
"""
# NOTE: source_type can be used to override the default for certain connectors
end_of_window = _default_end_time(last_successful_run)
now = datetime.datetime.now(tz=datetime.timezone.utc)

View File

@ -63,7 +63,9 @@ def _should_create_new_indexing(
if not last_index:
return True
# only one scheduled job per connector at a time
# Only one scheduled job per connector at a time
# Can schedule another one if the current one is already running however
# Because the currently running one will not be until the latest time
if last_index.status == IndexingStatus.NOT_STARTED:
return False

View File

@ -166,20 +166,24 @@ def get_latest_index_attempts(
def get_index_attempts_for_cc_pair(
db_session: Session, cc_pair_identifier: ConnectorCredentialPairIdentifier
db_session: Session,
cc_pair_identifier: ConnectorCredentialPairIdentifier,
disinclude_finished: bool = False,
) -> Sequence[IndexAttempt]:
stmt = (
select(IndexAttempt)
.where(
and_(
IndexAttempt.connector_id == cc_pair_identifier.connector_id,
IndexAttempt.credential_id == cc_pair_identifier.credential_id,
)
)
.order_by(
IndexAttempt.time_created.desc(),
stmt = select(IndexAttempt).where(
and_(
IndexAttempt.connector_id == cc_pair_identifier.connector_id,
IndexAttempt.credential_id == cc_pair_identifier.credential_id,
)
)
if disinclude_finished:
stmt = stmt.where(
IndexAttempt.status.in_(
[IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS]
)
)
stmt = stmt.order_by(IndexAttempt.time_created.desc())
return db_session.execute(stmt).scalars().all()

View File

@ -56,6 +56,7 @@ from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
from danswer.db.document import get_document_cnts_for_cc_pairs
from danswer.db.engine import get_session
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_index_attempts_for_cc_pair
from danswer.db.index_attempt import get_latest_index_attempts
from danswer.db.models import User
from danswer.dynamic_configs.interface import ConfigNotFoundError
@ -512,10 +513,31 @@ def connector_run_once(
detail="Connector has no valid credentials, cannot create index attempts.",
)
skipped_credentials = [
credential_id
for credential_id in credential_ids
if get_index_attempts_for_cc_pair(
cc_pair_identifier=ConnectorCredentialPairIdentifier(
connector_id=run_info.connector_id,
credential_id=credential_id,
),
disinclude_finished=True,
db_session=db_session,
)
]
index_attempt_ids = [
create_index_attempt(run_info.connector_id, credential_id, db_session)
for credential_id in credential_ids
if credential_id not in skipped_credentials
]
if not index_attempt_ids:
raise HTTPException(
status_code=400,
detail="No new indexing attempts created, indexing jobs are queued or running.",
)
return StatusResponse(
success=True,
message=f"Successfully created {len(index_attempt_ids)} index attempts",