From c9a420ec4989c65dc19e9317150d855212b3e9b6 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Wed, 15 Jan 2025 17:27:12 -0800 Subject: [PATCH] better logging and reduce long sessions (#3673) * testing some tweaks based on issues seen with okteto * shorten session usage in indexing. still a couple of long running sessions to clean up * merge sessions * fixing detached session issues --------- Co-authored-by: Richard Kuo (Danswer) --- .../tasks/external_group_syncing/tasks.py | 2 +- .../background/celery/tasks/indexing/tasks.py | 3 +- .../onyx/background/indexing/job_client.py | 4 +- .../onyx/background/indexing/run_indexing.py | 394 +++++++++++------- backend/onyx/db/index_attempt.py | 29 +- 5 files changed, 260 insertions(+), 172 deletions(-) diff --git a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py index 9af9f38152..7ee701e07f 100644 --- a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py @@ -251,8 +251,8 @@ def connector_external_group_sync_generator_task( with get_session_with_tenant(tenant_id) as db_session: cc_pair = get_connector_credential_pair_from_id( - cc_pair_id=cc_pair_id, db_session=db_session, + cc_pair_id=cc_pair_id, ) if cc_pair is None: raise ValueError( diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index dd3d8117d2..42e63e5296 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -637,7 +637,8 @@ def validate_indexing_fence( mark_attempt_failed( payload.index_attempt_id, db_session, - "validate_indexing_fence - Canceling index attempt due to missing celery tasks", + f"validate_indexing_fence - Canceling index attempt due to missing celery tasks: " + f"index_attempt={payload.index_attempt_id}", ) except Exception: logger.exception( diff --git a/backend/onyx/background/indexing/job_client.py b/backend/onyx/background/indexing/job_client.py index a679eebe7f..c2208540c7 100644 --- a/backend/onyx/background/indexing/job_client.py +++ b/backend/onyx/background/indexing/job_client.py @@ -47,7 +47,9 @@ def _initializer( SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME) # Initialize a new engine with desired parameters - SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60) + SqlEngine.init_engine( + pool_size=4, max_overflow=12, pool_recycle=60, pool_pre_ping=True + ) # Proceed with executing the target function return func(*args, **kwargs) diff --git a/backend/onyx/background/indexing/run_indexing.py b/backend/onyx/background/indexing/run_indexing.py index 623eb8edbe..d3548f2623 100644 --- a/backend/onyx/background/indexing/run_indexing.py +++ b/backend/onyx/background/indexing/run_indexing.py @@ -4,6 +4,7 @@ from datetime import datetime from datetime import timedelta from datetime import timezone +from pydantic import BaseModel from sqlalchemy.orm import Session from onyx.background.indexing.checkpointing import get_time_windows_for_index_attempt @@ -11,6 +12,7 @@ from onyx.background.indexing.tracer import OnyxTracer from onyx.configs.app_configs import INDEXING_SIZE_WARNING_THRESHOLD from onyx.configs.app_configs import INDEXING_TRACER_INTERVAL from onyx.configs.app_configs import POLL_CONNECTOR_OFFSET +from onyx.configs.constants import DocumentSource from onyx.configs.constants import MilestoneRecordType from onyx.connectors.connector_runner import ConnectorRunner from onyx.connectors.factory import instantiate_connector @@ -21,12 +23,14 @@ from onyx.db.connector_credential_pair import get_last_successful_attempt_time from onyx.db.connector_credential_pair import update_connector_credential_pair from onyx.db.engine import get_session_with_tenant from onyx.db.enums import ConnectorCredentialPairStatus +from onyx.db.index_attempt import get_index_attempt from onyx.db.index_attempt import mark_attempt_canceled from onyx.db.index_attempt import mark_attempt_failed from onyx.db.index_attempt import mark_attempt_partially_succeeded from onyx.db.index_attempt import mark_attempt_succeeded from onyx.db.index_attempt import transition_attempt_to_in_progress from onyx.db.index_attempt import update_docs_indexed +from onyx.db.models import ConnectorCredentialPair from onyx.db.models import IndexAttempt from onyx.db.models import IndexingStatus from onyx.db.models import IndexModelStatus @@ -125,9 +129,21 @@ class ConnectorStopSignal(Exception): """A custom exception used to signal a stop in processing.""" +class RunIndexingContext(BaseModel): + index_name: str + cc_pair_id: int + connector_id: int + credential_id: int + source: DocumentSource + earliest_index_time: float + from_beginning: bool + is_primary: bool + search_settings_status: IndexModelStatus + + def _run_indexing( db_session: Session, - index_attempt: IndexAttempt, + index_attempt_id: int, tenant_id: str | None, callback: IndexingHeartbeatInterface | None = None, ) -> None: @@ -141,61 +157,76 @@ def _run_indexing( """ start_time = time.time() - if index_attempt.search_settings is None: - raise ValueError( - "Search settings must be set for indexing. This should not be possible." + with get_session_with_tenant(tenant_id) as db_session_temp: + index_attempt_start = get_index_attempt(db_session_temp, index_attempt_id) + if not index_attempt_start: + raise ValueError( + f"Index attempt {index_attempt_id} does not exist in DB. This should not be possible." + ) + + if index_attempt_start.search_settings is None: + raise ValueError( + "Search settings must be set for indexing. This should not be possible." + ) + + # search_settings = index_attempt_start.search_settings + db_connector = index_attempt_start.connector_credential_pair.connector + db_credential = index_attempt_start.connector_credential_pair.credential + ctx = RunIndexingContext( + index_name=index_attempt_start.search_settings.index_name, + cc_pair_id=index_attempt_start.connector_credential_pair.id, + connector_id=db_connector.id, + credential_id=db_credential.id, + source=db_connector.source, + earliest_index_time=( + db_connector.indexing_start.timestamp() + if db_connector.indexing_start + else 0 + ), + from_beginning=index_attempt_start.from_beginning, + # Only update cc-pair status for primary index jobs + # Secondary index syncs at the end when swapping + is_primary=( + index_attempt_start.search_settings.status == IndexModelStatus.PRESENT + ), + search_settings_status=index_attempt_start.search_settings.status, ) - search_settings = index_attempt.search_settings + last_successful_index_time = ( + ctx.earliest_index_time + if ctx.from_beginning + else get_last_successful_attempt_time( + connector_id=ctx.connector_id, + credential_id=ctx.credential_id, + earliest_index=ctx.earliest_index_time, + search_settings=index_attempt_start.search_settings, + db_session=db_session_temp, + ) + ) - index_name = search_settings.index_name - - # Only update cc-pair status for primary index jobs - # Secondary index syncs at the end when swapping - is_primary = search_settings.status == IndexModelStatus.PRESENT + embedding_model = DefaultIndexingEmbedder.from_db_search_settings( + search_settings=index_attempt_start.search_settings, + callback=callback, + ) # Indexing is only done into one index at a time document_index = get_default_document_index( - primary_index_name=index_name, secondary_index_name=None - ) - - embedding_model = DefaultIndexingEmbedder.from_db_search_settings( - search_settings=search_settings, - callback=callback, + primary_index_name=ctx.index_name, secondary_index_name=None ) indexing_pipeline = build_indexing_pipeline( - attempt_id=index_attempt.id, + attempt_id=index_attempt_id, embedder=embedding_model, document_index=document_index, ignore_time_skip=( - index_attempt.from_beginning - or (search_settings.status == IndexModelStatus.FUTURE) + ctx.from_beginning + or (ctx.search_settings_status == IndexModelStatus.FUTURE) ), db_session=db_session, tenant_id=tenant_id, callback=callback, ) - db_cc_pair = index_attempt.connector_credential_pair - db_connector = index_attempt.connector_credential_pair.connector - db_credential = index_attempt.connector_credential_pair.credential - earliest_index_time = ( - db_connector.indexing_start.timestamp() if db_connector.indexing_start else 0 - ) - - last_successful_index_time = ( - earliest_index_time - if index_attempt.from_beginning - else get_last_successful_attempt_time( - connector_id=db_connector.id, - credential_id=db_credential.id, - earliest_index=earliest_index_time, - search_settings=index_attempt.search_settings, - db_session=db_session, - ) - ) - if INDEXING_TRACER_INTERVAL > 0: logger.debug(f"Memory tracer starting: interval={INDEXING_TRACER_INTERVAL}") tracer = OnyxTracer() @@ -203,8 +234,8 @@ def _run_indexing( tracer.snap() index_attempt_md = IndexAttemptMetadata( - connector_id=db_connector.id, - credential_id=db_credential.id, + connector_id=ctx.connector_id, + credential_id=ctx.credential_id, ) batch_num = 0 @@ -220,19 +251,31 @@ def _run_indexing( source_type=db_connector.source, ) ): + cc_pair_loop: ConnectorCredentialPair | None = None + index_attempt_loop: IndexAttempt | None = None + try: window_start = max( window_start - timedelta(minutes=POLL_CONNECTOR_OFFSET), datetime(1970, 1, 1, tzinfo=timezone.utc), ) - connector_runner = _get_connector_runner( - db_session=db_session, - attempt=index_attempt, - start_time=window_start, - end_time=window_end, - tenant_id=tenant_id, - ) + with get_session_with_tenant(tenant_id) as db_session_temp: + index_attempt_loop_start = get_index_attempt( + db_session_temp, index_attempt_id + ) + if not index_attempt_loop_start: + raise RuntimeError( + f"Index attempt {index_attempt_id} not found in DB." + ) + + connector_runner = _get_connector_runner( + db_session=db_session_temp, + attempt=index_attempt_loop_start, + start_time=window_start, + end_time=window_end, + tenant_id=tenant_id, + ) all_connector_doc_ids: set[str] = set() @@ -249,24 +292,38 @@ def _run_indexing( raise ConnectorStopSignal("Connector stop signal detected") # TODO: should we move this into the above callback instead? - db_session.refresh(db_cc_pair) - if ( - ( - db_cc_pair.status == ConnectorCredentialPairStatus.PAUSED - and search_settings.status != IndexModelStatus.FUTURE + with get_session_with_tenant(tenant_id) as db_session_temp: + cc_pair_loop = get_connector_credential_pair_from_id( + db_session_temp, + ctx.cc_pair_id, ) - # if it's deleting, we don't care if this is a secondary index - or db_cc_pair.status == ConnectorCredentialPairStatus.DELETING - ): - # let the `except` block handle this - raise RuntimeError("Connector was disabled mid run") + if not cc_pair_loop: + raise RuntimeError(f"CC pair {ctx.cc_pair_id} not found in DB.") - db_session.refresh(index_attempt) - if index_attempt.status != IndexingStatus.IN_PROGRESS: - # Likely due to user manually disabling it or model swap - raise RuntimeError( - f"Index Attempt was canceled, status is {index_attempt.status}" + if ( + ( + cc_pair_loop.status == ConnectorCredentialPairStatus.PAUSED + and ctx.search_settings_status != IndexModelStatus.FUTURE + ) + # if it's deleting, we don't care if this is a secondary index + or cc_pair_loop.status == ConnectorCredentialPairStatus.DELETING + ): + # let the `except` block handle this + raise RuntimeError("Connector was disabled mid run") + + index_attempt_loop = get_index_attempt( + db_session_temp, index_attempt_id ) + if not index_attempt_loop: + raise RuntimeError( + f"Index attempt {index_attempt_id} not found in DB." + ) + + if index_attempt_loop.status != IndexingStatus.IN_PROGRESS: + # Likely due to user manually disabling it or model swap + raise RuntimeError( + f"Index Attempt was canceled, status is {index_attempt_loop.status}" + ) batch_description = [] @@ -312,13 +369,14 @@ def _run_indexing( callback.progress("_run_indexing", len(doc_batch_cleaned)) # This new value is updated every batch, so UI can refresh per batch update - update_docs_indexed( - db_session=db_session, - index_attempt=index_attempt, - total_docs_indexed=document_count, - new_docs_indexed=net_doc_change, - docs_removed_from_index=0, - ) + with get_session_with_tenant(tenant_id) as db_session_temp: + update_docs_indexed( + db_session=db_session_temp, + index_attempt_id=index_attempt_id, + total_docs_indexed=document_count, + new_docs_indexed=net_doc_change, + docs_removed_from_index=0, + ) tracer_counter += 1 if ( @@ -332,34 +390,36 @@ def _run_indexing( tracer.log_previous_diff(INDEXING_TRACER_NUM_PRINT_ENTRIES) run_end_dt = window_end - if is_primary: - update_connector_credential_pair( - db_session=db_session, - connector_id=db_connector.id, - credential_id=db_credential.id, - net_docs=net_doc_change, - run_dt=run_end_dt, - ) + if ctx.is_primary: + with get_session_with_tenant(tenant_id) as db_session_temp: + update_connector_credential_pair( + db_session=db_session_temp, + connector_id=ctx.connector_id, + credential_id=ctx.credential_id, + net_docs=net_doc_change, + run_dt=run_end_dt, + ) except Exception as e: logger.exception( f"Connector run exceptioned after elapsed time: {time.time() - start_time} seconds" ) if isinstance(e, ConnectorStopSignal): - mark_attempt_canceled( - index_attempt.id, - db_session, - reason=str(e), - ) - - if is_primary: - update_connector_credential_pair( - db_session=db_session, - connector_id=db_connector.id, - credential_id=db_credential.id, - net_docs=net_doc_change, + with get_session_with_tenant(tenant_id) as db_session_temp: + mark_attempt_canceled( + index_attempt_id, + db_session_temp, + reason=str(e), ) + if ctx.is_primary: + update_connector_credential_pair( + db_session=db_session_temp, + connector_id=ctx.connector_id, + credential_id=ctx.credential_id, + net_docs=net_doc_change, + ) + if INDEXING_TRACER_INTERVAL > 0: tracer.stop() raise e @@ -373,24 +433,30 @@ def _run_indexing( # to give better clarity in the UI, as the next run will never happen. if ( ind == 0 - or not db_cc_pair.status.is_active() - or index_attempt.status != IndexingStatus.IN_PROGRESS - ): - mark_attempt_failed( - index_attempt.id, - db_session, - failure_reason=str(e), - full_exception_trace=traceback.format_exc(), + or ( + cc_pair_loop is not None and not cc_pair_loop.status.is_active() ) - - if is_primary: - update_connector_credential_pair( - db_session=db_session, - connector_id=db_connector.id, - credential_id=db_credential.id, - net_docs=net_doc_change, + or ( + index_attempt_loop is not None + and index_attempt_loop.status != IndexingStatus.IN_PROGRESS + ) + ): + with get_session_with_tenant(tenant_id) as db_session_temp: + mark_attempt_failed( + index_attempt_id, + db_session_temp, + failure_reason=str(e), + full_exception_trace=traceback.format_exc(), ) + if ctx.is_primary: + update_connector_credential_pair( + db_session=db_session_temp, + connector_id=ctx.connector_id, + credential_id=ctx.credential_id, + net_docs=net_doc_change, + ) + if INDEXING_TRACER_INTERVAL > 0: tracer.stop() raise e @@ -412,56 +478,58 @@ def _run_indexing( index_attempt_md.num_exceptions > 0 and index_attempt_md.num_exceptions >= batch_num ): - mark_attempt_failed( - index_attempt.id, - db_session, - failure_reason="All batches exceptioned.", - ) - if is_primary: - update_connector_credential_pair( - db_session=db_session, - connector_id=index_attempt.connector_credential_pair.connector.id, - credential_id=index_attempt.connector_credential_pair.credential.id, + with get_session_with_tenant(tenant_id) as db_session_temp: + mark_attempt_failed( + index_attempt_id, + db_session_temp, + failure_reason="All batches exceptioned.", + ) + if ctx.is_primary: + update_connector_credential_pair( + db_session=db_session_temp, + connector_id=ctx.connector_id, + credential_id=ctx.credential_id, + ) + raise Exception( + f"Connector failed - All batches exceptioned: batches={batch_num}" ) - raise Exception( - f"Connector failed - All batches exceptioned: batches={batch_num}" - ) elapsed_time = time.time() - start_time - if index_attempt_md.num_exceptions == 0: - mark_attempt_succeeded(index_attempt, db_session) + with get_session_with_tenant(tenant_id) as db_session_temp: + if index_attempt_md.num_exceptions == 0: + mark_attempt_succeeded(index_attempt_id, db_session_temp) - create_milestone_and_report( - user=None, - distinct_id=tenant_id or "N/A", - event_type=MilestoneRecordType.CONNECTOR_SUCCEEDED, - properties=None, - db_session=db_session, - ) + create_milestone_and_report( + user=None, + distinct_id=tenant_id or "N/A", + event_type=MilestoneRecordType.CONNECTOR_SUCCEEDED, + properties=None, + db_session=db_session_temp, + ) - logger.info( - f"Connector succeeded: " - f"docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s" - ) - else: - mark_attempt_partially_succeeded(index_attempt, db_session) - logger.info( - f"Connector completed with some errors: " - f"exceptions={index_attempt_md.num_exceptions} " - f"batches={batch_num} " - f"docs={document_count} " - f"chunks={chunk_count} " - f"elapsed={elapsed_time:.2f}s" - ) + logger.info( + f"Connector succeeded: " + f"docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s" + ) + else: + mark_attempt_partially_succeeded(index_attempt_id, db_session_temp) + logger.info( + f"Connector completed with some errors: " + f"exceptions={index_attempt_md.num_exceptions} " + f"batches={batch_num} " + f"docs={document_count} " + f"chunks={chunk_count} " + f"elapsed={elapsed_time:.2f}s" + ) - if is_primary: - update_connector_credential_pair( - db_session=db_session, - connector_id=db_connector.id, - credential_id=db_credential.id, - run_dt=run_end_dt, - ) + if ctx.is_primary: + update_connector_credential_pair( + db_session=db_session_temp, + connector_id=ctx.connector_id, + credential_id=ctx.credential_id, + run_dt=run_end_dt, + ) def run_indexing_entrypoint( @@ -481,27 +549,35 @@ def run_indexing_entrypoint( index_attempt_id, connector_credential_pair_id ) with get_session_with_tenant(tenant_id) as db_session: + # TODO: remove long running session entirely attempt = transition_attempt_to_in_progress(index_attempt_id, db_session) tenant_str = "" if tenant_id is not None: tenant_str = f" for tenant {tenant_id}" - logger.info( - f"Indexing starting{tenant_str}: " - f"connector='{attempt.connector_credential_pair.connector.name}' " - f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' " - f"credentials='{attempt.connector_credential_pair.connector_id}'" + connector_name = attempt.connector_credential_pair.connector.name + connector_config = ( + attempt.connector_credential_pair.connector.connector_specific_config ) + credential_id = attempt.connector_credential_pair.credential_id - _run_indexing(db_session, attempt, tenant_id, callback) + logger.info( + f"Indexing starting{tenant_str}: " + f"connector='{connector_name}' " + f"config='{connector_config}' " + f"credentials='{credential_id}'" + ) - logger.info( - f"Indexing finished{tenant_str}: " - f"connector='{attempt.connector_credential_pair.connector.name}' " - f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' " - f"credentials='{attempt.connector_credential_pair.connector_id}'" - ) + with get_session_with_tenant(tenant_id) as db_session: + _run_indexing(db_session, index_attempt_id, tenant_id, callback) + + logger.info( + f"Indexing finished{tenant_str}: " + f"connector='{connector_name}' " + f"config='{connector_config}' " + f"credentials='{credential_id}'" + ) except Exception as e: logger.exception( f"Indexing job with ID '{index_attempt_id}' for tenant {tenant_id} failed due to {e}" diff --git a/backend/onyx/db/index_attempt.py b/backend/onyx/db/index_attempt.py index 692a27976f..93a7a6947b 100644 --- a/backend/onyx/db/index_attempt.py +++ b/backend/onyx/db/index_attempt.py @@ -182,13 +182,13 @@ def mark_attempt_in_progress( def mark_attempt_succeeded( - index_attempt: IndexAttempt, + index_attempt_id: int, db_session: Session, ) -> None: try: attempt = db_session.execute( select(IndexAttempt) - .where(IndexAttempt.id == index_attempt.id) + .where(IndexAttempt.id == index_attempt_id) .with_for_update() ).scalar_one() @@ -200,13 +200,13 @@ def mark_attempt_succeeded( def mark_attempt_partially_succeeded( - index_attempt: IndexAttempt, + index_attempt_id: int, db_session: Session, ) -> None: try: attempt = db_session.execute( select(IndexAttempt) - .where(IndexAttempt.id == index_attempt.id) + .where(IndexAttempt.id == index_attempt_id) .with_for_update() ).scalar_one() @@ -265,17 +265,26 @@ def mark_attempt_failed( def update_docs_indexed( db_session: Session, - index_attempt: IndexAttempt, + index_attempt_id: int, total_docs_indexed: int, new_docs_indexed: int, docs_removed_from_index: int, ) -> None: - index_attempt.total_docs_indexed = total_docs_indexed - index_attempt.new_docs_indexed = new_docs_indexed - index_attempt.docs_removed_from_index = docs_removed_from_index + try: + attempt = db_session.execute( + select(IndexAttempt) + .where(IndexAttempt.id == index_attempt_id) + .with_for_update() + ).scalar_one() - db_session.add(index_attempt) - db_session.commit() + attempt.total_docs_indexed = total_docs_indexed + attempt.new_docs_indexed = new_docs_indexed + attempt.docs_removed_from_index = docs_removed_from_index + db_session.commit() + except Exception: + db_session.rollback() + logger.exception("update_docs_indexed exceptioned.") + raise def get_last_attempt(