diff --git a/backend/danswer/background/indexing/checkpointing.py b/backend/danswer/background/indexing/checkpointing.py index d1d4c8cf4..ec3ce5c8f 100644 --- a/backend/danswer/background/indexing/checkpointing.py +++ b/backend/danswer/background/indexing/checkpointing.py @@ -40,8 +40,13 @@ def _default_end_time( def find_end_time_for_indexing_attempt( - last_successful_run: datetime.datetime | None, source_type: DocumentSource + last_successful_run: datetime.datetime | None, + # source_type can be used to override the default for certain connectors, currently unused + source_type: DocumentSource, ) -> datetime.datetime | None: + """Is the current time unless the connector is run over a large period, in which case it is + split up into large time segments that become smaller as it approaches the present + """ # NOTE: source_type can be used to override the default for certain connectors end_of_window = _default_end_time(last_successful_run) now = datetime.datetime.now(tz=datetime.timezone.utc) diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index eb1b7943b..20c2c33a2 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -63,7 +63,9 @@ def _should_create_new_indexing( if not last_index: return True - # only one scheduled job per connector at a time + # Only one scheduled job per connector at a time + # Can schedule another one if the current one is already running however + # Because the currently running one will not be until the latest time if last_index.status == IndexingStatus.NOT_STARTED: return False diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py index ed5eac0db..04f3c58a6 100644 --- a/backend/danswer/db/index_attempt.py +++ b/backend/danswer/db/index_attempt.py @@ -166,20 +166,24 @@ def get_latest_index_attempts( def get_index_attempts_for_cc_pair( - db_session: Session, cc_pair_identifier: ConnectorCredentialPairIdentifier + db_session: Session, + cc_pair_identifier: ConnectorCredentialPairIdentifier, + disinclude_finished: bool = False, ) -> Sequence[IndexAttempt]: - stmt = ( - select(IndexAttempt) - .where( - and_( - IndexAttempt.connector_id == cc_pair_identifier.connector_id, - IndexAttempt.credential_id == cc_pair_identifier.credential_id, - ) - ) - .order_by( - IndexAttempt.time_created.desc(), + stmt = select(IndexAttempt).where( + and_( + IndexAttempt.connector_id == cc_pair_identifier.connector_id, + IndexAttempt.credential_id == cc_pair_identifier.credential_id, ) ) + if disinclude_finished: + stmt = stmt.where( + IndexAttempt.status.in_( + [IndexingStatus.NOT_STARTED, IndexingStatus.IN_PROGRESS] + ) + ) + + stmt = stmt.order_by(IndexAttempt.time_created.desc()) return db_session.execute(stmt).scalars().all() diff --git a/backend/danswer/server/documents/connector.py b/backend/danswer/server/documents/connector.py index ca3dd325a..28bb8d503 100644 --- a/backend/danswer/server/documents/connector.py +++ b/backend/danswer/server/documents/connector.py @@ -56,6 +56,7 @@ from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed from danswer.db.document import get_document_cnts_for_cc_pairs from danswer.db.engine import get_session from danswer.db.index_attempt import create_index_attempt +from danswer.db.index_attempt import get_index_attempts_for_cc_pair from danswer.db.index_attempt import get_latest_index_attempts from danswer.db.models import User from danswer.dynamic_configs.interface import ConfigNotFoundError @@ -512,10 +513,31 @@ def connector_run_once( detail="Connector has no valid credentials, cannot create index attempts.", ) + skipped_credentials = [ + credential_id + for credential_id in credential_ids + if get_index_attempts_for_cc_pair( + cc_pair_identifier=ConnectorCredentialPairIdentifier( + connector_id=run_info.connector_id, + credential_id=credential_id, + ), + disinclude_finished=True, + db_session=db_session, + ) + ] + index_attempt_ids = [ create_index_attempt(run_info.connector_id, credential_id, db_session) for credential_id in credential_ids + if credential_id not in skipped_credentials ] + + if not index_attempt_ids: + raise HTTPException( + status_code=400, + detail="No new indexing attempts created, indexing jobs are queued or running.", + ) + return StatusResponse( success=True, message=f"Successfully created {len(index_attempt_ids)} index attempts",