From 4c2cf8b13224dce731969cf4399d0a18f2a0cad6 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 17 Oct 2024 16:13:57 -0700 Subject: [PATCH] =?UTF-8?q?always=20finalize=20the=20serialized=20transact?= =?UTF-8?q?ion=20so=20that=20it=20doesn't=20leak=20ou=E2=80=A6=20(#2843)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * always finalize the serialized transaction so that it doesn't leak outside the function * re-raise the exception and log it --- .../background/indexing/run_indexing.py | 51 +++++++++++-------- 1 file changed, 30 insertions(+), 21 deletions(-) diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index c48d07ffd..b4cfea97a 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -387,30 +387,39 @@ def _prepare_index_attempt( # after the next commit: # https://docs.sqlalchemy.org/en/20/orm/session_transaction.html#setting-isolation-for-individual-transactions db_session.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # type: ignore - if tenant_id is not None: - # Explicitly set the search path for the given tenant - db_session.execute(text(f'SET search_path TO "{tenant_id}"')) - # Verify the search path was set correctly - result = db_session.execute(text("SHOW search_path")) - current_search_path = result.scalar() - logger.info(f"Current search path set to: {current_search_path}") + try: + if tenant_id is not None: + # Explicitly set the search path for the given tenant + db_session.execute(text(f'SET search_path TO "{tenant_id}"')) + # Verify the search path was set correctly + result = db_session.execute(text("SHOW search_path")) + current_search_path = result.scalar() + logger.info(f"Current search path set to: {current_search_path}") - attempt = get_index_attempt( - db_session=db_session, - index_attempt_id=index_attempt_id, - ) - - if attempt is None: - raise RuntimeError(f"Unable to find IndexAttempt for ID '{index_attempt_id}'") - - if attempt.status != IndexingStatus.NOT_STARTED: - raise RuntimeError( - f"Indexing attempt with ID '{index_attempt_id}' is not in NOT_STARTED status. " - f"Current status is '{attempt.status}'." + attempt = get_index_attempt( + db_session=db_session, + index_attempt_id=index_attempt_id, ) - # only commit once, to make sure this all happens in a single transaction - mark_attempt_in_progress(attempt, db_session) + if attempt is None: + raise RuntimeError( + f"Unable to find IndexAttempt for ID '{index_attempt_id}'" + ) + + if attempt.status != IndexingStatus.NOT_STARTED: + raise RuntimeError( + f"Indexing attempt with ID '{index_attempt_id}' is not in NOT_STARTED status. " + f"Current status is '{attempt.status}'." + ) + + mark_attempt_in_progress(attempt, db_session) + + # only commit once, to make sure this all happens in a single transaction + db_session.commit() + except Exception: + db_session.rollback() + logger.exception("_prepare_index_attempt exceptioned.") + raise return attempt