From ae37f01f62def88ec32868a716d42972f2add448 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Fri, 7 Feb 2025 14:53:51 -0800 Subject: [PATCH] event driven indexing/docset/usergroup triggers (#3918) * WIP * trigger indexing immediately when the ccpair is created * add some logging and indexing trigger to the mock-credential endpoint * better comments * fix integration test --------- Co-authored-by: Richard Kuo (Danswer) --- backend/onyx/configs/constants.py | 5 ++ backend/onyx/server/documents/cc_pair.py | 28 ++++++++++ backend/onyx/server/documents/connector.py | 22 ++++++++ .../onyx/server/features/document_set/api.py | 26 +++++++++ backend/onyx/server/manage/administrative.py | 5 ++ .../test_index_attempt_pagination.py | 54 +++++++++++++++---- 6 files changed, 129 insertions(+), 11 deletions(-) diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 96b7d628b..0bf7085e1 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -263,6 +263,11 @@ class PostgresAdvisoryLocks(Enum): class OnyxCeleryQueues: + # "celery" is the default queue defined by celery and also the queue + # we are running in the primary worker to run system tasks + # Tasks running in this queue should be designed specifically to run quickly + PRIMARY = "celery" + # Light queue VESPA_METADATA_SYNC = "vespa_metadata_sync" DOC_PERMISSIONS_UPSERT = "doc_permissions_upsert" diff --git a/backend/onyx/server/documents/cc_pair.py b/backend/onyx/server/documents/cc_pair.py index 3ba5984df..7c2d77529 100644 --- a/backend/onyx/server/documents/cc_pair.py +++ b/backend/onyx/server/documents/cc_pair.py @@ -22,6 +22,8 @@ from onyx.background.celery.tasks.pruning.tasks import ( try_creating_prune_generator_task, ) from onyx.background.celery.versioned_apps.primary import app as primary_app +from onyx.configs.constants import OnyxCeleryPriority +from onyx.configs.constants import OnyxCeleryTask from onyx.db.connector_credential_pair import add_credential_to_connector from onyx.db.connector_credential_pair import ( get_connector_credential_pair_from_id_for_user, @@ -228,6 +230,13 @@ def update_cc_pair_status( db_session.commit() + # this speeds up the start of indexing by firing the check immediately + primary_app.send_task( + OnyxCeleryTask.CHECK_FOR_INDEXING, + kwargs=dict(tenant_id=tenant_id), + priority=OnyxCeleryPriority.HIGH, + ) + return JSONResponse( status_code=HTTPStatus.OK, content={"message": str(HTTPStatus.OK)} ) @@ -540,7 +549,14 @@ def associate_credential_to_connector( metadata: ConnectorCredentialPairMetadata, user: User | None = Depends(current_curator_or_admin_user), db_session: Session = Depends(get_session), + tenant_id: str = Depends(get_current_tenant_id), ) -> StatusResponse[int]: + """NOTE(rkuo): internally discussed and the consensus is this endpoint + and create_connector_with_mock_credential should be combined. + + The intent of this endpoint is to handle connectors that actually need credentials. + """ + fetch_ee_implementation_or_noop( "onyx.db.user_group", "validate_object_creation_for_user", None )( @@ -563,6 +579,18 @@ def associate_credential_to_connector( groups=metadata.groups, ) + # trigger indexing immediately + primary_app.send_task( + OnyxCeleryTask.CHECK_FOR_INDEXING, + priority=OnyxCeleryPriority.HIGH, + kwargs={"tenant_id": tenant_id}, + ) + + logger.info( + f"associate_credential_to_connector - running check_for_indexing: " + f"cc_pair={response.data}" + ) + return response except IntegrityError as e: logger.error(f"IntegrityError: {e}") diff --git a/backend/onyx/server/documents/connector.py b/backend/onyx/server/documents/connector.py index 21c5c586b..fbdc384a8 100644 --- a/backend/onyx/server/documents/connector.py +++ b/backend/onyx/server/documents/connector.py @@ -804,6 +804,14 @@ def create_connector_with_mock_credential( db_session: Session = Depends(get_session), tenant_id: str = Depends(get_current_tenant_id), ) -> StatusResponse: + """NOTE(rkuo): internally discussed and the consensus is this endpoint + and associate_credential_to_connector should be combined. + + The intent of this endpoint is to handle connectors that don't need credentials, + AKA web, file, etc ... but there isn't any reason a single endpoint couldn't + server this purpose. + """ + fetch_ee_implementation_or_noop( "onyx.db.user_group", "validate_object_creation_for_user", None )( @@ -841,6 +849,18 @@ def create_connector_with_mock_credential( groups=connector_data.groups, ) + # trigger indexing immediately + primary_app.send_task( + OnyxCeleryTask.CHECK_FOR_INDEXING, + priority=OnyxCeleryPriority.HIGH, + kwargs={"tenant_id": tenant_id}, + ) + + logger.info( + f"create_connector_with_mock_credential - running check_for_indexing: " + f"cc_pair={response.data}" + ) + create_milestone_and_report( user=user, distinct_id=user.email if user else tenant_id or "N/A", @@ -1005,6 +1025,8 @@ def connector_run_once( kwargs={"tenant_id": tenant_id}, ) + logger.info("connector_run_once - running check_for_indexing") + msg = f"Marked {num_triggers} index attempts with indexing triggers." return StatusResponse( success=True, diff --git a/backend/onyx/server/features/document_set/api.py b/backend/onyx/server/features/document_set/api.py index 0ba20ff0d..ed4c66156 100644 --- a/backend/onyx/server/features/document_set/api.py +++ b/backend/onyx/server/features/document_set/api.py @@ -6,11 +6,15 @@ from sqlalchemy.orm import Session from onyx.auth.users import current_curator_or_admin_user from onyx.auth.users import current_user +from onyx.background.celery.versioned_apps.primary import app as primary_app +from onyx.configs.constants import OnyxCeleryPriority +from onyx.configs.constants import OnyxCeleryTask from onyx.db.document_set import check_document_sets_are_public from onyx.db.document_set import fetch_all_document_sets_for_user from onyx.db.document_set import insert_document_set from onyx.db.document_set import mark_document_set_as_to_be_deleted from onyx.db.document_set import update_document_set +from onyx.db.engine import get_current_tenant_id from onyx.db.engine import get_session from onyx.db.models import User from onyx.server.features.document_set.models import CheckDocSetPublicRequest @@ -29,6 +33,7 @@ def create_document_set( document_set_creation_request: DocumentSetCreationRequest, user: User = Depends(current_curator_or_admin_user), db_session: Session = Depends(get_session), + tenant_id: str = Depends(get_current_tenant_id), ) -> int: fetch_ee_implementation_or_noop( "onyx.db.user_group", "validate_object_creation_for_user", None @@ -46,6 +51,13 @@ def create_document_set( ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) + + primary_app.send_task( + OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, + kwargs={"tenant_id": tenant_id}, + priority=OnyxCeleryPriority.HIGH, + ) + return document_set_db_model.id @@ -54,6 +66,7 @@ def patch_document_set( document_set_update_request: DocumentSetUpdateRequest, user: User = Depends(current_curator_or_admin_user), db_session: Session = Depends(get_session), + tenant_id: str = Depends(get_current_tenant_id), ) -> None: fetch_ee_implementation_or_noop( "onyx.db.user_group", "validate_object_creation_for_user", None @@ -72,12 +85,19 @@ def patch_document_set( except Exception as e: raise HTTPException(status_code=400, detail=str(e)) + primary_app.send_task( + OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, + kwargs={"tenant_id": tenant_id}, + priority=OnyxCeleryPriority.HIGH, + ) + @router.delete("/admin/document-set/{document_set_id}") def delete_document_set( document_set_id: int, user: User = Depends(current_curator_or_admin_user), db_session: Session = Depends(get_session), + tenant_id: str = Depends(get_current_tenant_id), ) -> None: try: mark_document_set_as_to_be_deleted( @@ -88,6 +108,12 @@ def delete_document_set( except Exception as e: raise HTTPException(status_code=400, detail=str(e)) + primary_app.send_task( + OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, + kwargs={"tenant_id": tenant_id}, + priority=OnyxCeleryPriority.HIGH, + ) + """Endpoints for non-admins""" diff --git a/backend/onyx/server/manage/administrative.py b/backend/onyx/server/manage/administrative.py index 687d0c4ad..c5ae083e2 100644 --- a/backend/onyx/server/manage/administrative.py +++ b/backend/onyx/server/manage/administrative.py @@ -197,6 +197,11 @@ def create_deletion_attempt_for_connector_id( kwargs={"tenant_id": tenant_id}, ) + logger.info( + f"create_deletion_attempt_for_connector_id - running check_for_connector_deletion: " + f"cc_pair={cc_pair.id}" + ) + if cc_pair.connector.source == DocumentSource.FILE: connector = cc_pair.connector file_store = get_default_file_store(db_session) diff --git a/backend/tests/integration/tests/index_attempt/test_index_attempt_pagination.py b/backend/tests/integration/tests/index_attempt/test_index_attempt_pagination.py index c4ce83499..85ca64c18 100644 --- a/backend/tests/integration/tests/index_attempt/test_index_attempt_pagination.py +++ b/backend/tests/integration/tests/index_attempt/test_index_attempt_pagination.py @@ -1,23 +1,23 @@ +import time from datetime import datetime from onyx.db.models import IndexingStatus from tests.integration.common_utils.managers.cc_pair import CCPairManager from tests.integration.common_utils.managers.index_attempt import IndexAttemptManager from tests.integration.common_utils.managers.user import UserManager -from tests.integration.common_utils.test_models import DATestIndexAttempt from tests.integration.common_utils.test_models import DATestUser def _verify_index_attempt_pagination( cc_pair_id: int, - index_attempts: list[DATestIndexAttempt], + index_attempt_ids: list[int], page_size: int = 5, user_performing_action: DATestUser | None = None, ) -> None: retrieved_attempts: list[int] = [] last_time_started = None # Track the last time_started seen - for i in range(0, len(index_attempts), page_size): + for i in range(0, len(index_attempt_ids), page_size): paginated_result = IndexAttemptManager.get_index_attempt_page( cc_pair_id=cc_pair_id, page=(i // page_size), @@ -26,9 +26,9 @@ def _verify_index_attempt_pagination( ) # Verify that the total items is equal to the length of the index attempts list - assert paginated_result.total_items == len(index_attempts) + assert paginated_result.total_items == len(index_attempt_ids) # Verify that the number of items in the page is equal to the page size - assert len(paginated_result.items) == min(page_size, len(index_attempts) - i) + assert len(paginated_result.items) == min(page_size, len(index_attempt_ids) - i) # Verify time ordering within the page (descending order) for attempt in paginated_result.items: @@ -42,7 +42,7 @@ def _verify_index_attempt_pagination( retrieved_attempts.extend([attempt.id for attempt in paginated_result.items]) # Create a set of all the expected index attempt IDs - all_expected_attempts = set(attempt.id for attempt in index_attempts) + all_expected_attempts = set(index_attempt_ids) # Create a set of all the retrieved index attempt IDs all_retrieved_attempts = set(retrieved_attempts) @@ -51,6 +51,9 @@ def _verify_index_attempt_pagination( def test_index_attempt_pagination(reset: None) -> None: + MAX_WAIT = 60 + all_attempt_ids: list[int] = [] + # Create an admin user to perform actions user_performing_action: DATestUser = UserManager.create( name="admin_performing_action", @@ -62,20 +65,49 @@ def test_index_attempt_pagination(reset: None) -> None: user_performing_action=user_performing_action, ) - # Create 300 successful index attempts + # Creating a CC pair will create an index attempt as well. wait for it. + start = time.monotonic() + while True: + paginated_result = IndexAttemptManager.get_index_attempt_page( + cc_pair_id=cc_pair.id, + page=0, + page_size=5, + user_performing_action=user_performing_action, + ) + + if paginated_result.total_items == 1: + all_attempt_ids.append(paginated_result.items[0].id) + print("Initial index attempt from cc_pair creation detected. Continuing...") + break + + elapsed = time.monotonic() - start + if elapsed > MAX_WAIT: + raise TimeoutError( + f"Initial index attempt: Not detected within {MAX_WAIT} seconds." + ) + + print( + f"Waiting for initial index attempt: elapsed={elapsed:.2f} timeout={MAX_WAIT}" + ) + time.sleep(1) + + # Create 299 successful index attempts (for 300 total) base_time = datetime.now() - all_attempts = IndexAttemptManager.create_test_index_attempts( - num_attempts=300, + generated_attempts = IndexAttemptManager.create_test_index_attempts( + num_attempts=299, cc_pair_id=cc_pair.id, status=IndexingStatus.SUCCESS, base_time=base_time, ) + for attempt in generated_attempts: + all_attempt_ids.append(attempt.id) + # Verify basic pagination with different page sizes print("Verifying basic pagination with page size 5") _verify_index_attempt_pagination( cc_pair_id=cc_pair.id, - index_attempts=all_attempts, + index_attempt_ids=all_attempt_ids, page_size=5, user_performing_action=user_performing_action, ) @@ -84,7 +116,7 @@ def test_index_attempt_pagination(reset: None) -> None: print("Verifying pagination with page size 100") _verify_index_attempt_pagination( cc_pair_id=cc_pair.id, - index_attempts=all_attempts, + index_attempt_ids=all_attempt_ids, page_size=100, user_performing_action=user_performing_action, )