From e3aab8e85ee48423c2be13fac70003273ab196af Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Sun, 13 Apr 2025 15:57:47 -0700 Subject: [PATCH] Improve index attempt display (#4511) --- .../d961aca62eb3_update_status_length.py | 57 ++ .../background/celery/tasks/indexing/tasks.py | 52 +- .../background/celery/tasks/indexing/utils.py | 73 ++- backend/onyx/configs/app_configs.py | 8 - .../onyx/connectors/confluence/connector.py | 28 +- .../connectors/confluence/onyx_confluence.py | 8 +- backend/onyx/db/connector_credential_pair.py | 17 +- backend/onyx/db/enums.py | 14 +- backend/onyx/db/index_attempt.py | 20 + backend/onyx/db/models.py | 3 + backend/onyx/server/documents/connector.py | 1 + backend/onyx/server/documents/models.py | 34 +- .../common_utils/managers/cc_pair.py | 2 + .../common_utils/managers/connector.py | 2 + .../integration/tests/indexing/conftest.py | 18 + .../tests/indexing/test_checkpointing.py | 14 - .../indexing/test_repeated_error_state.py | 204 +++++++ .../docker_compose/docker-compose.dev.yml | 1 - .../docker_compose/docker-compose.gpu-dev.yml | 1 - .../docker-compose.multitenant-dev.yml | 1 - deployment/helm/charts/onyx/values.yaml | 4 + deployment/kubernetes/env-configmap.yaml | 5 +- .../connector/[ccPairId]/ConfigDisplay.tsx | 131 +++-- .../[ccPairId]/IndexingAttemptsTable.tsx | 16 +- .../[ccPairId]/ModifyStatusButtonCluster.tsx | 107 ---- .../connector/[ccPairId]/ReIndexButton.tsx | 135 ----- .../connector/[ccPairId]/ReIndexModal.tsx | 162 ++++++ .../connector/[ccPairId]/ReIndexPopup.tsx | 90 ++++ web/src/app/admin/connector/[ccPairId]/lib.ts | 26 +- .../app/admin/connector/[ccPairId]/page.tsx | 504 +++++++++++++----- .../app/admin/connector/[ccPairId]/types.ts | 9 + .../app/admin/connector/[ccPairId]/unused.txt | 0 .../connector/[ccPairId]/useStatusChange.tsx | 75 +++ .../status/CCPairIndexingStatusTable.tsx | 93 +--- web/src/components/Status.tsx | 50 +- .../credentials/CredentialSection.tsx | 80 ++- web/src/components/ui/button.tsx | 8 +- .../ui/dropdown-menu-with-tooltip.tsx | 57 ++ web/src/lib/connectors/credentials.ts | 1 + web/src/lib/types.ts | 1 + 40 files changed, 1479 insertions(+), 633 deletions(-) create mode 100644 backend/alembic/versions/d961aca62eb3_update_status_length.py create mode 100644 backend/tests/integration/tests/indexing/conftest.py create mode 100644 backend/tests/integration/tests/indexing/test_repeated_error_state.py delete mode 100644 web/src/app/admin/connector/[ccPairId]/ModifyStatusButtonCluster.tsx delete mode 100644 web/src/app/admin/connector/[ccPairId]/ReIndexButton.tsx create mode 100644 web/src/app/admin/connector/[ccPairId]/ReIndexModal.tsx create mode 100644 web/src/app/admin/connector/[ccPairId]/ReIndexPopup.tsx create mode 100644 web/src/app/admin/connector/[ccPairId]/unused.txt create mode 100644 web/src/app/admin/connector/[ccPairId]/useStatusChange.tsx create mode 100644 web/src/components/ui/dropdown-menu-with-tooltip.tsx diff --git a/backend/alembic/versions/d961aca62eb3_update_status_length.py b/backend/alembic/versions/d961aca62eb3_update_status_length.py new file mode 100644 index 000000000..58de115f7 --- /dev/null +++ b/backend/alembic/versions/d961aca62eb3_update_status_length.py @@ -0,0 +1,57 @@ +"""Update status length + +Revision ID: d961aca62eb3 +Revises: cf90764725d8 +Create Date: 2025-03-23 16:10:05.683965 + +""" + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "d961aca62eb3" +down_revision = "cf90764725d8" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # Drop the existing enum type constraint + op.execute("ALTER TABLE connector_credential_pair ALTER COLUMN status TYPE varchar") + + # Create new enum type with all values + op.execute( + "ALTER TABLE connector_credential_pair ALTER COLUMN status TYPE VARCHAR(20) USING status::varchar(20)" + ) + + # Update the enum type to include all possible values + op.alter_column( + "connector_credential_pair", + "status", + type_=sa.Enum( + "SCHEDULED", + "INITIAL_INDEXING", + "ACTIVE", + "PAUSED", + "DELETING", + "INVALID", + name="connectorcredentialpairstatus", + native_enum=False, + ), + existing_type=sa.String(20), + nullable=False, + ) + + op.add_column( + "connector_credential_pair", + sa.Column( + "in_repeated_error_state", sa.Boolean, default=False, server_default="false" + ), + ) + + +def downgrade() -> None: + # no need to convert back to the old enum type, since we're not using it anymore + op.drop_column("connector_credential_pair", "in_repeated_error_state") diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 2158b283b..acce48f20 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -26,6 +26,7 @@ from onyx.background.celery.celery_utils import httpx_init_vespa_pool from onyx.background.celery.memory_monitoring import emit_process_memory from onyx.background.celery.tasks.indexing.utils import get_unfenced_index_attempt_ids from onyx.background.celery.tasks.indexing.utils import IndexingCallback +from onyx.background.celery.tasks.indexing.utils import is_in_repeated_error_state from onyx.background.celery.tasks.indexing.utils import should_index from onyx.background.celery.tasks.indexing.utils import try_creating_indexing_task from onyx.background.celery.tasks.indexing.utils import validate_indexing_fences @@ -54,11 +55,12 @@ from onyx.connectors.exceptions import ConnectorValidationError from onyx.db.connector import mark_ccpair_with_indexing_trigger from onyx.db.connector_credential_pair import fetch_connector_credential_pairs from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id +from onyx.db.connector_credential_pair import set_cc_pair_repeated_error_state from onyx.db.engine import get_session_with_current_tenant +from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.enums import IndexingMode from onyx.db.enums import IndexingStatus from onyx.db.index_attempt import get_index_attempt -from onyx.db.index_attempt import get_last_attempt_for_cc_pair from onyx.db.index_attempt import mark_attempt_canceled from onyx.db.index_attempt import mark_attempt_failed from onyx.db.search_settings import get_active_search_settings_list @@ -241,6 +243,16 @@ def monitor_ccpair_indexing_taskset( if not payload: return + # if the CC Pair is `SCHEDULED`, moved it to `INITIAL_INDEXING`. A CC Pair + # should only ever be `SCHEDULED` if it's a new connector. + cc_pair = get_connector_credential_pair_from_id(db_session, cc_pair_id) + if cc_pair is None: + raise RuntimeError(f"CC Pair {cc_pair_id} not found") + + if cc_pair.status == ConnectorCredentialPairStatus.SCHEDULED: + cc_pair.status = ConnectorCredentialPairStatus.INITIAL_INDEXING + db_session.commit() + elapsed_started_str = None if payload.started: elapsed_started = datetime.now(timezone.utc) - payload.started @@ -355,6 +367,22 @@ def monitor_ccpair_indexing_taskset( redis_connector_index.reset() + # mark the CC Pair as `ACTIVE` if it's not already + if ( + # it should never technically be in this state, but we'll handle it anyway + cc_pair.status == ConnectorCredentialPairStatus.SCHEDULED + or cc_pair.status == ConnectorCredentialPairStatus.INITIAL_INDEXING + ): + cc_pair.status = ConnectorCredentialPairStatus.ACTIVE + db_session.commit() + + # if the index attempt is successful, clear the repeated error state + if cc_pair.in_repeated_error_state: + index_attempt = get_index_attempt(db_session, payload.index_attempt_id) + if index_attempt and index_attempt.status.is_successful(): + cc_pair.in_repeated_error_state = False + db_session.commit() + @shared_task( name=OnyxCeleryTask.CHECK_FOR_INDEXING, @@ -441,6 +469,21 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None: for cc_pair_entry in cc_pairs: cc_pair_ids.append(cc_pair_entry.id) + # mark CC Pairs that are repeatedly failing as in repeated error state + with get_session_with_current_tenant() as db_session: + current_search_settings = get_current_search_settings(db_session) + for cc_pair_id in cc_pair_ids: + if is_in_repeated_error_state( + cc_pair_id=cc_pair_id, + search_settings_id=current_search_settings.id, + db_session=db_session, + ): + set_cc_pair_repeated_error_state( + db_session=db_session, + cc_pair_id=cc_pair_id, + in_repeated_error_state=True, + ) + # kick off index attempts for cc_pair_id in cc_pair_ids: lock_beat.reacquire() @@ -480,13 +523,8 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None: ) continue - last_attempt = get_last_attempt_for_cc_pair( - cc_pair.id, search_settings_instance.id, db_session - ) - if not should_index( cc_pair=cc_pair, - last_index=last_attempt, search_settings_instance=search_settings_instance, secondary_index_building=len(search_settings_list) > 1, db_session=db_session, @@ -494,7 +532,6 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None: task_logger.info( f"check_for_indexing - Not indexing cc_pair_id: {cc_pair_id} " f"search_settings={search_settings_instance.id}, " - f"last_attempt={last_attempt.id if last_attempt else None}, " f"secondary_index_building={len(search_settings_list) > 1}" ) continue @@ -502,7 +539,6 @@ def check_for_indexing(self: Task, *, tenant_id: str) -> int | None: task_logger.info( f"check_for_indexing - Will index cc_pair_id: {cc_pair_id} " f"search_settings={search_settings_instance.id}, " - f"last_attempt={last_attempt.id if last_attempt else None}, " f"secondary_index_building={len(search_settings_list) > 1}" ) diff --git a/backend/onyx/background/celery/tasks/indexing/utils.py b/backend/onyx/background/celery/tasks/indexing/utils.py index 0fd75db29..0430963ad 100644 --- a/backend/onyx/background/celery/tasks/indexing/utils.py +++ b/backend/onyx/background/celery/tasks/indexing/utils.py @@ -22,6 +22,7 @@ from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisConstants +from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.engine import get_db_current_time from onyx.db.engine import get_session_with_current_tenant from onyx.db.enums import ConnectorCredentialPairStatus @@ -31,6 +32,8 @@ from onyx.db.index_attempt import create_index_attempt from onyx.db.index_attempt import delete_index_attempt from onyx.db.index_attempt import get_all_index_attempts_by_status from onyx.db.index_attempt import get_index_attempt +from onyx.db.index_attempt import get_last_attempt_for_cc_pair +from onyx.db.index_attempt import get_recent_attempts_for_cc_pair from onyx.db.index_attempt import mark_attempt_failed from onyx.db.models import ConnectorCredentialPair from onyx.db.models import IndexAttempt @@ -44,6 +47,8 @@ from onyx.utils.logger import setup_logger logger = setup_logger() +NUM_REPEAT_ERRORS_BEFORE_REPEATED_ERROR_STATE = 5 + def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[int]: """Gets a list of unfenced index attempts. Should not be possible, so we'd typically @@ -346,9 +351,42 @@ def validate_indexing_fences( return +def is_in_repeated_error_state( + cc_pair_id: int, search_settings_id: int, db_session: Session +) -> bool: + """Checks if the cc pair / search setting combination is in a repeated error state.""" + cc_pair = get_connector_credential_pair_from_id( + db_session=db_session, + cc_pair_id=cc_pair_id, + ) + if not cc_pair: + raise RuntimeError( + f"is_in_repeated_error_state - could not find cc_pair with id={cc_pair_id}" + ) + + # if the connector doesn't have a refresh_freq, a single failed attempt is enough + number_of_failed_attempts_in_a_row_needed = ( + NUM_REPEAT_ERRORS_BEFORE_REPEATED_ERROR_STATE + if cc_pair.connector.refresh_freq is not None + else 1 + ) + + most_recent_index_attempts = get_recent_attempts_for_cc_pair( + cc_pair_id=cc_pair_id, + search_settings_id=search_settings_id, + limit=number_of_failed_attempts_in_a_row_needed, + db_session=db_session, + ) + return len( + most_recent_index_attempts + ) >= number_of_failed_attempts_in_a_row_needed and all( + attempt.status == IndexingStatus.FAILED + for attempt in most_recent_index_attempts + ) + + def should_index( cc_pair: ConnectorCredentialPair, - last_index: IndexAttempt | None, search_settings_instance: SearchSettings, secondary_index_building: bool, db_session: Session, @@ -362,6 +400,16 @@ def should_index( Return True if we should try to index, False if not. """ connector = cc_pair.connector + last_index_attempt = get_last_attempt_for_cc_pair( + cc_pair_id=cc_pair.id, + search_settings_id=search_settings_instance.id, + db_session=db_session, + ) + all_recent_errored = is_in_repeated_error_state( + cc_pair_id=cc_pair.id, + search_settings_id=search_settings_instance.id, + db_session=db_session, + ) # uncomment for debugging # task_logger.info(f"_should_index: " @@ -388,24 +436,24 @@ def should_index( # When switching over models, always index at least once if search_settings_instance.status == IndexModelStatus.FUTURE: - if last_index: + if last_index_attempt: # No new index if the last index attempt succeeded # Once is enough. The model will never be able to swap otherwise. - if last_index.status == IndexingStatus.SUCCESS: + if last_index_attempt.status == IndexingStatus.SUCCESS: # print( # f"Not indexing cc_pair={cc_pair.id}: FUTURE model with successful last index attempt={last_index.id}" # ) return False # No new index if the last index attempt is waiting to start - if last_index.status == IndexingStatus.NOT_STARTED: + if last_index_attempt.status == IndexingStatus.NOT_STARTED: # print( # f"Not indexing cc_pair={cc_pair.id}: FUTURE model with NOT_STARTED last index attempt={last_index.id}" # ) return False # No new index if the last index attempt is running - if last_index.status == IndexingStatus.IN_PROGRESS: + if last_index_attempt.status == IndexingStatus.IN_PROGRESS: # print( # f"Not indexing cc_pair={cc_pair.id}: FUTURE model with IN_PROGRESS last index attempt={last_index.id}" # ) @@ -439,18 +487,27 @@ def should_index( return True # if no attempt has ever occurred, we should index regardless of refresh_freq - if not last_index: + if not last_index_attempt: return True if connector.refresh_freq is None: # print(f"Not indexing cc_pair={cc_pair.id}: refresh_freq is None") return False + # if in the "initial" phase, we should always try and kick-off indexing + # as soon as possible if there is no ongoing attempt. In other words, + # no delay UNLESS we're repeatedly failing to index. + if ( + cc_pair.status == ConnectorCredentialPairStatus.INITIAL_INDEXING + and not all_recent_errored + ): + return True + current_db_time = get_db_current_time(db_session) - time_since_index = current_db_time - last_index.time_updated + time_since_index = current_db_time - last_index_attempt.time_updated if time_since_index.total_seconds() < connector.refresh_freq: # print( - # f"Not indexing cc_pair={cc_pair.id}: Last index attempt={last_index.id} " + # f"Not indexing cc_pair={cc_pair.id}: Last index attempt={last_index_attempt.id} " # f"too recent ({time_since_index.total_seconds()}s < {connector.refresh_freq}s)" # ) return False diff --git a/backend/onyx/configs/app_configs.py b/backend/onyx/configs/app_configs.py index a293df624..2d5a4fd94 100644 --- a/backend/onyx/configs/app_configs.py +++ b/backend/onyx/configs/app_configs.py @@ -483,14 +483,6 @@ CONTINUE_ON_CONNECTOR_FAILURE = os.environ.get( DISABLE_INDEX_UPDATE_ON_SWAP = ( os.environ.get("DISABLE_INDEX_UPDATE_ON_SWAP", "").lower() == "true" ) -# Controls how many worker processes we spin up to index documents in the -# background. This is useful for speeding up indexing, but does require a -# fairly large amount of memory in order to increase substantially, since -# each worker loads the embedding models into memory. -NUM_INDEXING_WORKERS = int(os.environ.get("NUM_INDEXING_WORKERS") or 1) -NUM_SECONDARY_INDEXING_WORKERS = int( - os.environ.get("NUM_SECONDARY_INDEXING_WORKERS") or NUM_INDEXING_WORKERS -) # More accurate results at the expense of indexing speed and index size (stores additional 4 MINI_CHUNK vectors) ENABLE_MULTIPASS_INDEXING = ( os.environ.get("ENABLE_MULTIPASS_INDEXING", "").lower() == "true" diff --git a/backend/onyx/connectors/confluence/connector.py b/backend/onyx/connectors/confluence/connector.py index b17c52fd1..debd6bc25 100644 --- a/backend/onyx/connectors/confluence/connector.py +++ b/backend/onyx/connectors/confluence/connector.py @@ -101,6 +101,7 @@ class ConfluenceConnector( self.labels_to_skip = labels_to_skip self.timezone_offset = timezone_offset self._confluence_client: OnyxConfluence | None = None + self._low_timeout_confluence_client: OnyxConfluence | None = None self._fetched_titles: set[str] = set() self.allow_images = False @@ -156,6 +157,12 @@ class ConfluenceConnector( raise ConnectorMissingCredentialError("Confluence") return self._confluence_client + @property + def low_timeout_confluence_client(self) -> OnyxConfluence: + if self._low_timeout_confluence_client is None: + raise ConnectorMissingCredentialError("Confluence") + return self._low_timeout_confluence_client + def set_credentials_provider( self, credentials_provider: CredentialsProviderInterface ) -> None: @@ -163,13 +170,27 @@ class ConfluenceConnector( # raises exception if there's a problem confluence_client = OnyxConfluence( - self.is_cloud, self.wiki_base, credentials_provider + is_cloud=self.is_cloud, + url=self.wiki_base, + credentials_provider=credentials_provider, ) confluence_client._probe_connection(**self.probe_kwargs) confluence_client._initialize_connection(**self.final_kwargs) self._confluence_client = confluence_client + # create a low timeout confluence client for sync flows + low_timeout_confluence_client = OnyxConfluence( + is_cloud=self.is_cloud, + url=self.wiki_base, + credentials_provider=credentials_provider, + timeout=3, + ) + low_timeout_confluence_client._probe_connection(**self.probe_kwargs) + low_timeout_confluence_client._initialize_connection(**self.final_kwargs) + + self._low_timeout_confluence_client = low_timeout_confluence_client + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: raise NotImplementedError("Use set_credentials_provider with this connector.") @@ -521,11 +542,8 @@ class ConfluenceConnector( yield doc_metadata_list def validate_connector_settings(self) -> None: - if self._confluence_client is None: - raise ConnectorMissingCredentialError("Confluence credentials not loaded.") - try: - spaces = self._confluence_client.get_all_spaces(limit=1) + spaces = self.low_timeout_confluence_client.get_all_spaces(limit=1) except HTTPError as e: status_code = e.response.status_code if e.response else None if status_code == 401: diff --git a/backend/onyx/connectors/confluence/onyx_confluence.py b/backend/onyx/connectors/confluence/onyx_confluence.py index 9e7cbc34c..608adc150 100644 --- a/backend/onyx/connectors/confluence/onyx_confluence.py +++ b/backend/onyx/connectors/confluence/onyx_confluence.py @@ -72,12 +72,14 @@ class OnyxConfluence: CREDENTIAL_PREFIX = "connector:confluence:credential" CREDENTIAL_TTL = 300 # 5 min + PROBE_TIMEOUT = 5 # 5 seconds def __init__( self, is_cloud: bool, url: str, credentials_provider: CredentialsProviderInterface, + timeout: int | None = None, ) -> None: self._is_cloud = is_cloud self._url = url.rstrip("/") @@ -100,11 +102,13 @@ class OnyxConfluence: self._kwargs: Any = None - self.shared_base_kwargs = { + self.shared_base_kwargs: dict[str, str | int | bool] = { "api_version": "cloud" if is_cloud else "latest", "backoff_and_retry": True, "cloud": is_cloud, } + if timeout: + self.shared_base_kwargs["timeout"] = timeout def _renew_credentials(self) -> tuple[dict[str, Any], bool]: """credential_json - the current json credentials @@ -191,6 +195,8 @@ class OnyxConfluence: **kwargs: Any, ) -> None: merged_kwargs = {**self.shared_base_kwargs, **kwargs} + # add special timeout to make sure that we don't hang indefinitely + merged_kwargs["timeout"] = self.PROBE_TIMEOUT with self._credentials_provider: credentials, _ = self._renew_credentials() diff --git a/backend/onyx/db/connector_credential_pair.py b/backend/onyx/db/connector_credential_pair.py index 94881ab1b..b0e9e727e 100644 --- a/backend/onyx/db/connector_credential_pair.py +++ b/backend/onyx/db/connector_credential_pair.py @@ -7,6 +7,7 @@ from sqlalchemy import desc from sqlalchemy import exists from sqlalchemy import Select from sqlalchemy import select +from sqlalchemy import update from sqlalchemy.orm import aliased from sqlalchemy.orm import joinedload from sqlalchemy.orm import selectinload @@ -394,6 +395,20 @@ def update_connector_credential_pair( ) +def set_cc_pair_repeated_error_state( + db_session: Session, + cc_pair_id: int, + in_repeated_error_state: bool, +) -> None: + stmt = ( + update(ConnectorCredentialPair) + .where(ConnectorCredentialPair.id == cc_pair_id) + .values(in_repeated_error_state=in_repeated_error_state) + ) + db_session.execute(stmt) + db_session.commit() + + def delete_connector_credential_pair__no_commit( db_session: Session, connector_id: int, @@ -457,7 +472,7 @@ def add_credential_to_connector( access_type: AccessType, groups: list[int] | None, auto_sync_options: dict | None = None, - initial_status: ConnectorCredentialPairStatus = ConnectorCredentialPairStatus.ACTIVE, + initial_status: ConnectorCredentialPairStatus = ConnectorCredentialPairStatus.SCHEDULED, last_successful_index_time: datetime | None = None, seeding_flow: bool = False, is_user_file: bool = False, diff --git a/backend/onyx/db/enums.py b/backend/onyx/db/enums.py index c5a3ced2f..073009699 100644 --- a/backend/onyx/db/enums.py +++ b/backend/onyx/db/enums.py @@ -18,6 +18,12 @@ class IndexingStatus(str, PyEnum): } return self in terminal_states + def is_successful(self) -> bool: + return ( + self == IndexingStatus.SUCCESS + or self == IndexingStatus.COMPLETED_WITH_ERRORS + ) + class IndexingMode(str, PyEnum): UPDATE = "update" @@ -73,13 +79,19 @@ class ChatSessionSharedStatus(str, PyEnum): class ConnectorCredentialPairStatus(str, PyEnum): + SCHEDULED = "SCHEDULED" + INITIAL_INDEXING = "INITIAL_INDEXING" ACTIVE = "ACTIVE" PAUSED = "PAUSED" DELETING = "DELETING" INVALID = "INVALID" def is_active(self) -> bool: - return self == ConnectorCredentialPairStatus.ACTIVE + return ( + self == ConnectorCredentialPairStatus.ACTIVE + or self == ConnectorCredentialPairStatus.SCHEDULED + or self == ConnectorCredentialPairStatus.INITIAL_INDEXING + ) class AccessType(str, PyEnum): diff --git a/backend/onyx/db/index_attempt.py b/backend/onyx/db/index_attempt.py index b5c864ffb..43764b743 100644 --- a/backend/onyx/db/index_attempt.py +++ b/backend/onyx/db/index_attempt.py @@ -59,6 +59,7 @@ def get_recent_completed_attempts_for_cc_pair( limit: int, db_session: Session, ) -> list[IndexAttempt]: + """Most recent to least recent.""" return ( db_session.query(IndexAttempt) .filter( @@ -74,6 +75,25 @@ def get_recent_completed_attempts_for_cc_pair( ) +def get_recent_attempts_for_cc_pair( + cc_pair_id: int, + search_settings_id: int, + limit: int, + db_session: Session, +) -> list[IndexAttempt]: + """Most recent to least recent.""" + return ( + db_session.query(IndexAttempt) + .filter( + IndexAttempt.connector_credential_pair_id == cc_pair_id, + IndexAttempt.search_settings_id == search_settings_id, + ) + .order_by(IndexAttempt.time_updated.desc()) + .limit(limit) + .all() + ) + + def get_index_attempt( db_session: Session, index_attempt_id: int ) -> IndexAttempt | None: diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index bc55e73d2..f93ec2cc6 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -437,6 +437,9 @@ class ConnectorCredentialPair(Base): status: Mapped[ConnectorCredentialPairStatus] = mapped_column( Enum(ConnectorCredentialPairStatus, native_enum=False), nullable=False ) + # this is separate from the `status` above, since a connector can be `INITIAL_INDEXING`, `ACTIVE`, + # or `PAUSED` and still be in a repeated error state. + in_repeated_error_state: Mapped[bool] = mapped_column(Boolean, default=False) connector_id: Mapped[int] = mapped_column( ForeignKey("connector.id"), primary_key=True ) diff --git a/backend/onyx/server/documents/connector.py b/backend/onyx/server/documents/connector.py index fa8078457..14376599b 100644 --- a/backend/onyx/server/documents/connector.py +++ b/backend/onyx/server/documents/connector.py @@ -783,6 +783,7 @@ def get_connector_indexing_status( name=cc_pair.name, in_progress=in_progress, cc_pair_status=cc_pair.status, + in_repeated_error_state=cc_pair.in_repeated_error_state, connector=ConnectorSnapshot.from_connector_db_model( connector, connector_to_cc_pair_ids.get(connector.id, []) ), diff --git a/backend/onyx/server/documents/models.py b/backend/onyx/server/documents/models.py index 04cfa304f..8e1072b7e 100644 --- a/backend/onyx/server/documents/models.py +++ b/backend/onyx/server/documents/models.py @@ -1,4 +1,5 @@ from datetime import datetime +from datetime import timezone from typing import Any from typing import Generic from typing import TypeVar @@ -128,6 +129,7 @@ class CredentialBase(BaseModel): class CredentialSnapshot(CredentialBase): id: int user_id: UUID | None + user_email: str | None = None time_created: datetime time_updated: datetime @@ -141,6 +143,7 @@ class CredentialSnapshot(CredentialBase): else credential.credential_json ), user_id=credential.user_id, + user_email=credential.user.email if credential.user else None, admin_public=credential.admin_public, time_created=credential.time_created, time_updated=credential.time_updated, @@ -207,6 +210,7 @@ class CCPairFullInfo(BaseModel): id: int name: str status: ConnectorCredentialPairStatus + in_repeated_error_state: bool num_docs_indexed: int connector: ConnectorSnapshot credential: CredentialSnapshot @@ -220,6 +224,13 @@ class CCPairFullInfo(BaseModel): creator: UUID | None creator_email: str | None + # information on syncing/indexing + last_indexed: datetime | None + last_pruned: datetime | None + last_permission_sync: datetime | None + overall_indexing_speed: float | None + latest_checkpoint_description: str | None + @classmethod def from_models( cls, @@ -237,7 +248,8 @@ class CCPairFullInfo(BaseModel): # there is a mismatch between these two numbers which may confuse users. last_indexing_status = last_index_attempt.status if last_index_attempt else None if ( - last_indexing_status == IndexingStatus.SUCCESS + # only need to do this if the last indexing attempt is still in progress + last_indexing_status == IndexingStatus.IN_PROGRESS and number_of_index_attempts == 1 and last_index_attempt and last_index_attempt.new_docs_indexed @@ -246,10 +258,18 @@ class CCPairFullInfo(BaseModel): last_index_attempt.new_docs_indexed if last_index_attempt else 0 ) + overall_indexing_speed = num_docs_indexed / ( + ( + datetime.now(tz=timezone.utc) - cc_pair_model.connector.time_created + ).total_seconds() + / 60 + ) + return cls( id=cc_pair_model.id, name=cc_pair_model.name, status=cc_pair_model.status, + in_repeated_error_state=cc_pair_model.in_repeated_error_state, num_docs_indexed=num_docs_indexed, connector=ConnectorSnapshot.from_connector_db_model( cc_pair_model.connector @@ -268,6 +288,15 @@ class CCPairFullInfo(BaseModel): creator_email=( cc_pair_model.creator.email if cc_pair_model.creator else None ), + last_indexed=( + last_index_attempt.time_started if last_index_attempt else None + ), + last_pruned=cc_pair_model.last_pruned, + last_permission_sync=( + last_index_attempt.time_started if last_index_attempt else None + ), + overall_indexing_speed=overall_indexing_speed, + latest_checkpoint_description=None, ) @@ -308,6 +337,9 @@ class ConnectorIndexingStatus(ConnectorStatus): """Represents the full indexing status of a connector""" cc_pair_status: ConnectorCredentialPairStatus + # this is separate from the `status` above, since a connector can be `INITIAL_INDEXING`, `ACTIVE`, + # or `PAUSED` and still be in a repeated error state. + in_repeated_error_state: bool owner: str last_finished_status: IndexingStatus | None last_status: IndexingStatus | None diff --git a/backend/tests/integration/common_utils/managers/cc_pair.py b/backend/tests/integration/common_utils/managers/cc_pair.py index 4bc0760ab..903e730d6 100644 --- a/backend/tests/integration/common_utils/managers/cc_pair.py +++ b/backend/tests/integration/common_utils/managers/cc_pair.py @@ -69,6 +69,7 @@ class CCPairManager: connector_specific_config: dict[str, Any] | None = None, credential_json: dict[str, Any] | None = None, user_performing_action: DATestUser | None = None, + refresh_freq: int | None = None, ) -> DATestCCPair: connector = ConnectorManager.create( name=name, @@ -78,6 +79,7 @@ class CCPairManager: access_type=access_type, groups=groups, user_performing_action=user_performing_action, + refresh_freq=refresh_freq, ) credential = CredentialManager.create( credential_json=credential_json, diff --git a/backend/tests/integration/common_utils/managers/connector.py b/backend/tests/integration/common_utils/managers/connector.py index d26c0a3fb..2e0f9e517 100644 --- a/backend/tests/integration/common_utils/managers/connector.py +++ b/backend/tests/integration/common_utils/managers/connector.py @@ -23,6 +23,7 @@ class ConnectorManager: access_type: AccessType = AccessType.PUBLIC, groups: list[int] | None = None, user_performing_action: DATestUser | None = None, + refresh_freq: int | None = None, ) -> DATestConnector: name = f"{name}-connector" if name else f"test-connector-{uuid4()}" @@ -36,6 +37,7 @@ class ConnectorManager: ), access_type=access_type, groups=groups or [], + refresh_freq=refresh_freq, ) response = requests.post( diff --git a/backend/tests/integration/tests/indexing/conftest.py b/backend/tests/integration/tests/indexing/conftest.py new file mode 100644 index 000000000..ff34e566b --- /dev/null +++ b/backend/tests/integration/tests/indexing/conftest.py @@ -0,0 +1,18 @@ +import httpx +import pytest + +from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_HOST +from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_PORT + + +@pytest.fixture +def mock_server_client() -> httpx.Client: + print( + f"Initializing mock server client with host: " + f"{MOCK_CONNECTOR_SERVER_HOST} and port: " + f"{MOCK_CONNECTOR_SERVER_PORT}" + ) + return httpx.Client( + base_url=f"http://{MOCK_CONNECTOR_SERVER_HOST}:{MOCK_CONNECTOR_SERVER_PORT}", + timeout=5.0, + ) diff --git a/backend/tests/integration/tests/indexing/test_checkpointing.py b/backend/tests/integration/tests/indexing/test_checkpointing.py index f1766fd93..0bb6c5337 100644 --- a/backend/tests/integration/tests/indexing/test_checkpointing.py +++ b/backend/tests/integration/tests/indexing/test_checkpointing.py @@ -4,7 +4,6 @@ from datetime import timedelta from datetime import timezone import httpx -import pytest from onyx.configs.constants import DocumentSource from onyx.connectors.mock_connector.connector import MockConnectorCheckpoint @@ -26,19 +25,6 @@ from tests.integration.common_utils.test_models import DATestUser from tests.integration.common_utils.vespa import vespa_fixture -@pytest.fixture -def mock_server_client() -> httpx.Client: - print( - f"Initializing mock server client with host: " - f"{MOCK_CONNECTOR_SERVER_HOST} and port: " - f"{MOCK_CONNECTOR_SERVER_PORT}" - ) - return httpx.Client( - base_url=f"http://{MOCK_CONNECTOR_SERVER_HOST}:{MOCK_CONNECTOR_SERVER_PORT}", - timeout=5.0, - ) - - def test_mock_connector_basic_flow( mock_server_client: httpx.Client, vespa_client: vespa_fixture, diff --git a/backend/tests/integration/tests/indexing/test_repeated_error_state.py b/backend/tests/integration/tests/indexing/test_repeated_error_state.py new file mode 100644 index 000000000..2ddd79da8 --- /dev/null +++ b/backend/tests/integration/tests/indexing/test_repeated_error_state.py @@ -0,0 +1,204 @@ +import time +import uuid + +import httpx + +from onyx.background.celery.tasks.indexing.utils import ( + NUM_REPEAT_ERRORS_BEFORE_REPEATED_ERROR_STATE, +) +from onyx.configs.constants import DocumentSource +from onyx.connectors.mock_connector.connector import MockConnectorCheckpoint +from onyx.connectors.models import InputType +from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id +from onyx.db.engine import get_session_context_manager +from onyx.db.enums import IndexingStatus +from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_HOST +from tests.integration.common_utils.constants import MOCK_CONNECTOR_SERVER_PORT +from tests.integration.common_utils.managers.cc_pair import CCPairManager +from tests.integration.common_utils.managers.document import DocumentManager +from tests.integration.common_utils.managers.index_attempt import IndexAttemptManager +from tests.integration.common_utils.test_document_utils import create_test_document +from tests.integration.common_utils.test_models import DATestUser +from tests.integration.common_utils.vespa import vespa_fixture + + +def test_repeated_error_state_detection_and_recovery( + mock_server_client: httpx.Client, + vespa_client: vespa_fixture, + admin_user: DATestUser, +) -> None: + """Test that a connector is marked as in a repeated error state after + NUM_REPEAT_ERRORS_BEFORE_REPEATED_ERROR_STATE consecutive failures, and + that it recovers after a successful indexing. + + This test ensures we properly wait for the required number of indexing attempts + to fail before checking that the connector is in a repeated error state.""" + + # Create test document for successful response + test_doc = create_test_document() + + # First, set up the mock server to consistently fail + error_response = { + "documents": [], + "checkpoint": MockConnectorCheckpoint(has_more=False).model_dump(mode="json"), + "failures": [], + "unhandled_exception": "Simulated unhandled error for testing repeated errors", + } + + # Create a list of failure responses with at least the same length + # as NUM_REPEAT_ERRORS_BEFORE_REPEATED_ERROR_STATE + failure_behaviors = [error_response] * ( + 5 * NUM_REPEAT_ERRORS_BEFORE_REPEATED_ERROR_STATE + ) + + response = mock_server_client.post( + "/set-behavior", + json=failure_behaviors, + ) + assert response.status_code == 200 + + # Create a new CC pair for testing + cc_pair = CCPairManager.create_from_scratch( + name=f"mock-repeated-error-{uuid.uuid4()}", + source=DocumentSource.MOCK_CONNECTOR, + input_type=InputType.POLL, + connector_specific_config={ + "mock_server_host": MOCK_CONNECTOR_SERVER_HOST, + "mock_server_port": MOCK_CONNECTOR_SERVER_PORT, + }, + user_performing_action=admin_user, + refresh_freq=60 * 60, # a very long time + ) + + # Wait for the required number of failed indexing attempts + # This shouldn't take long, since we keep retrying while we haven't + # succeeded yet + start_time = time.monotonic() + while True: + index_attempts_page = IndexAttemptManager.get_index_attempt_page( + cc_pair_id=cc_pair.id, + page=0, + page_size=100, + user_performing_action=admin_user, + ) + index_attempts = [ + ia + for ia in index_attempts_page.items + if ia.status and ia.status.is_terminal() + ] + if len(index_attempts) == NUM_REPEAT_ERRORS_BEFORE_REPEATED_ERROR_STATE: + break + + if time.monotonic() - start_time > 180: + raise TimeoutError( + "Did not get required number of failed attempts within 180 seconds" + ) + + # make sure that we don't mark the connector as in repeated error state + # before we have the required number of failed attempts + with get_session_context_manager() as db_session: + cc_pair_obj = get_connector_credential_pair_from_id( + db_session=db_session, + cc_pair_id=cc_pair.id, + ) + assert cc_pair_obj is not None + assert not cc_pair_obj.in_repeated_error_state + + time.sleep(2) + + # Verify we have the correct number of failed attempts + assert len(index_attempts) == NUM_REPEAT_ERRORS_BEFORE_REPEATED_ERROR_STATE + for attempt in index_attempts: + assert attempt.status == IndexingStatus.FAILED + + # Check if the connector is in a repeated error state + start_time = time.monotonic() + while True: + with get_session_context_manager() as db_session: + cc_pair_obj = get_connector_credential_pair_from_id( + db_session=db_session, + cc_pair_id=cc_pair.id, + ) + assert cc_pair_obj is not None + if cc_pair_obj.in_repeated_error_state: + break + + if time.monotonic() - start_time > 30: + assert False, "CC pair did not enter repeated error state within 30 seconds" + + time.sleep(2) + + # Reset the mock server state + response = mock_server_client.post("/reset") + assert response.status_code == 200 + + # Now set up the mock server to succeed + success_response = { + "documents": [test_doc.model_dump(mode="json")], + "checkpoint": MockConnectorCheckpoint(has_more=False).model_dump(mode="json"), + "failures": [], + } + + response = mock_server_client.post( + "/set-behavior", + json=[success_response], + ) + assert response.status_code == 200 + + # Run another indexing attempt that should succeed + CCPairManager.run_once( + cc_pair, from_beginning=True, user_performing_action=admin_user + ) + + recovery_index_attempt = IndexAttemptManager.wait_for_index_attempt_start( + cc_pair_id=cc_pair.id, + index_attempts_to_ignore=[index_attempt.id for index_attempt in index_attempts], + user_performing_action=admin_user, + ) + + IndexAttemptManager.wait_for_index_attempt_completion( + index_attempt_id=recovery_index_attempt.id, + cc_pair_id=cc_pair.id, + user_performing_action=admin_user, + ) + + # Validate the indexing succeeded + finished_recovery_attempt = IndexAttemptManager.get_index_attempt_by_id( + index_attempt_id=recovery_index_attempt.id, + cc_pair_id=cc_pair.id, + user_performing_action=admin_user, + ) + assert finished_recovery_attempt.status == IndexingStatus.SUCCESS + + # Verify the document was indexed + with get_session_context_manager() as db_session: + documents = DocumentManager.fetch_documents_for_cc_pair( + cc_pair_id=cc_pair.id, + db_session=db_session, + vespa_client=vespa_client, + ) + assert len(documents) == 1 + assert documents[0].id == test_doc.id + + # Verify the CC pair is no longer in a repeated error state + start = time.monotonic() + while True: + with get_session_context_manager() as db_session: + cc_pair_obj = get_connector_credential_pair_from_id( + db_session=db_session, + cc_pair_id=cc_pair.id, + ) + assert cc_pair_obj is not None + if not cc_pair_obj.in_repeated_error_state: + break + + elapsed = time.monotonic() - start + if elapsed > 30: + raise TimeoutError( + "CC pair did not exit repeated error state within 30 seconds" + ) + + print( + f"Waiting for CC pair to exit repeated error state. elapsed={elapsed:.2f}" + ) + time.sleep(1) diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index 65fda9143..c9e0f7e65 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -199,7 +199,6 @@ services: - INDEXING_MODEL_SERVER_HOST=${INDEXING_MODEL_SERVER_HOST:-indexing_model_server} # Indexing Configs - VESPA_SEARCHER_THREADS=${VESPA_SEARCHER_THREADS:-} - - NUM_INDEXING_WORKERS=${NUM_INDEXING_WORKERS:-} - ENABLED_CONNECTOR_TYPES=${ENABLED_CONNECTOR_TYPES:-} - DISABLE_INDEX_UPDATE_ON_SWAP=${DISABLE_INDEX_UPDATE_ON_SWAP:-} - DASK_JOB_CLIENT_ENABLED=${DASK_JOB_CLIENT_ENABLED:-} diff --git a/deployment/docker_compose/docker-compose.gpu-dev.yml b/deployment/docker_compose/docker-compose.gpu-dev.yml index 022aac0af..5e9210372 100644 --- a/deployment/docker_compose/docker-compose.gpu-dev.yml +++ b/deployment/docker_compose/docker-compose.gpu-dev.yml @@ -163,7 +163,6 @@ services: - INDEXING_MODEL_SERVER_HOST=${INDEXING_MODEL_SERVER_HOST:-indexing_model_server} # Indexing Configs - VESPA_SEARCHER_THREADS=${VESPA_SEARCHER_THREADS:-} - - NUM_INDEXING_WORKERS=${NUM_INDEXING_WORKERS:-} - ENABLED_CONNECTOR_TYPES=${ENABLED_CONNECTOR_TYPES:-} - DISABLE_INDEX_UPDATE_ON_SWAP=${DISABLE_INDEX_UPDATE_ON_SWAP:-} - DASK_JOB_CLIENT_ENABLED=${DASK_JOB_CLIENT_ENABLED:-} diff --git a/deployment/docker_compose/docker-compose.multitenant-dev.yml b/deployment/docker_compose/docker-compose.multitenant-dev.yml index cba620ba0..53273dc40 100644 --- a/deployment/docker_compose/docker-compose.multitenant-dev.yml +++ b/deployment/docker_compose/docker-compose.multitenant-dev.yml @@ -182,7 +182,6 @@ services: - INDEXING_MODEL_SERVER_HOST=${INDEXING_MODEL_SERVER_HOST:-indexing_model_server} # Indexing Configs - VESPA_SEARCHER_THREADS=${VESPA_SEARCHER_THREADS:-} - - NUM_INDEXING_WORKERS=${NUM_INDEXING_WORKERS:-} - ENABLED_CONNECTOR_TYPES=${ENABLED_CONNECTOR_TYPES:-} - DISABLE_INDEX_UPDATE_ON_SWAP=${DISABLE_INDEX_UPDATE_ON_SWAP:-} - DASK_JOB_CLIENT_ENABLED=${DASK_JOB_CLIENT_ENABLED:-} diff --git a/deployment/helm/charts/onyx/values.yaml b/deployment/helm/charts/onyx/values.yaml index 44bbc6dca..31d83fd9d 100644 --- a/deployment/helm/charts/onyx/values.yaml +++ b/deployment/helm/charts/onyx/values.yaml @@ -468,6 +468,10 @@ configMap: JIRA_API_VERSION: "" GONG_CONNECTOR_START_TIME: "" NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP: "" + # Worker Parallelism + CELERY_WORKER_INDEXING_CONCURRENCY: "" + CELERY_WORKER_LIGHT_CONCURRENCY: "" + CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER: "" # OnyxBot SlackBot Configs DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER: "" DANSWER_BOT_DISPLAY_ERROR_MSGS: "" diff --git a/deployment/kubernetes/env-configmap.yaml b/deployment/kubernetes/env-configmap.yaml index 0ce70ec29..85047ba11 100644 --- a/deployment/kubernetes/env-configmap.yaml +++ b/deployment/kubernetes/env-configmap.yaml @@ -49,7 +49,6 @@ data: MIN_THREADS_ML_MODELS: "" # Indexing Configs VESPA_SEARCHER_THREADS: "" - NUM_INDEXING_WORKERS: "" ENABLED_CONNECTOR_TYPES: "" DISABLE_INDEX_UPDATE_ON_SWAP: "" DASK_JOB_CLIENT_ENABLED: "" @@ -62,6 +61,10 @@ data: NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP: "" MAX_DOCUMENT_CHARS: "" MAX_FILE_SIZE_BYTES: "" + # Worker Parallelism + CELERY_WORKER_INDEXING_CONCURRENCY: "" + CELERY_WORKER_LIGHT_CONCURRENCY: "" + CELERY_WORKER_LIGHT_PREFETCH_MULTIPLIER: "" # OnyxBot SlackBot Configs DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER: "" DANSWER_BOT_DISPLAY_ERROR_MSGS: "" diff --git a/web/src/app/admin/connector/[ccPairId]/ConfigDisplay.tsx b/web/src/app/admin/connector/[ccPairId]/ConfigDisplay.tsx index b205cb4c5..35e770334 100644 --- a/web/src/app/admin/connector/[ccPairId]/ConfigDisplay.tsx +++ b/web/src/app/admin/connector/[ccPairId]/ConfigDisplay.tsx @@ -1,7 +1,5 @@ -import CardSection from "@/components/admin/CardSection"; import { getNameFromPath } from "@/lib/fileUtils"; import { ValidSources } from "@/lib/types"; -import Title from "@/components/ui/title"; import { EditIcon } from "@/components/icons/icons"; import { useState } from "react"; @@ -44,7 +42,15 @@ function buildConfigEntries( return obj; } -function ConfigItem({ label, value }: { label: string; value: any }) { +function ConfigItem({ + label, + value, + onEdit, +}: { + label: string; + value: any; + onEdit?: () => void; +}) { const [isExpanded, setIsExpanded] = useState(false); const isExpandable = Array.isArray(value) && value.length > 5; @@ -52,11 +58,11 @@ function ConfigItem({ label, value }: { label: string; value: any }) { if (Array.isArray(value)) { const displayedItems = isExpanded ? value : value.slice(0, 5); return ( -