mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-04 20:20:37 +02:00
always finalize the serialized transaction so that it doesn't leak ou… (#2843)
* always finalize the serialized transaction so that it doesn't leak outside the function * re-raise the exception and log it
This commit is contained in:
parent
b169f78699
commit
4c2cf8b132
@ -387,30 +387,39 @@ def _prepare_index_attempt(
|
|||||||
# after the next commit:
|
# after the next commit:
|
||||||
# https://docs.sqlalchemy.org/en/20/orm/session_transaction.html#setting-isolation-for-individual-transactions
|
# https://docs.sqlalchemy.org/en/20/orm/session_transaction.html#setting-isolation-for-individual-transactions
|
||||||
db_session.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # type: ignore
|
db_session.connection(execution_options={"isolation_level": "SERIALIZABLE"}) # type: ignore
|
||||||
if tenant_id is not None:
|
try:
|
||||||
# Explicitly set the search path for the given tenant
|
if tenant_id is not None:
|
||||||
db_session.execute(text(f'SET search_path TO "{tenant_id}"'))
|
# Explicitly set the search path for the given tenant
|
||||||
# Verify the search path was set correctly
|
db_session.execute(text(f'SET search_path TO "{tenant_id}"'))
|
||||||
result = db_session.execute(text("SHOW search_path"))
|
# Verify the search path was set correctly
|
||||||
current_search_path = result.scalar()
|
result = db_session.execute(text("SHOW search_path"))
|
||||||
logger.info(f"Current search path set to: {current_search_path}")
|
current_search_path = result.scalar()
|
||||||
|
logger.info(f"Current search path set to: {current_search_path}")
|
||||||
|
|
||||||
attempt = get_index_attempt(
|
attempt = get_index_attempt(
|
||||||
db_session=db_session,
|
db_session=db_session,
|
||||||
index_attempt_id=index_attempt_id,
|
index_attempt_id=index_attempt_id,
|
||||||
)
|
|
||||||
|
|
||||||
if attempt is None:
|
|
||||||
raise RuntimeError(f"Unable to find IndexAttempt for ID '{index_attempt_id}'")
|
|
||||||
|
|
||||||
if attempt.status != IndexingStatus.NOT_STARTED:
|
|
||||||
raise RuntimeError(
|
|
||||||
f"Indexing attempt with ID '{index_attempt_id}' is not in NOT_STARTED status. "
|
|
||||||
f"Current status is '{attempt.status}'."
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# only commit once, to make sure this all happens in a single transaction
|
if attempt is None:
|
||||||
mark_attempt_in_progress(attempt, db_session)
|
raise RuntimeError(
|
||||||
|
f"Unable to find IndexAttempt for ID '{index_attempt_id}'"
|
||||||
|
)
|
||||||
|
|
||||||
|
if attempt.status != IndexingStatus.NOT_STARTED:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"Indexing attempt with ID '{index_attempt_id}' is not in NOT_STARTED status. "
|
||||||
|
f"Current status is '{attempt.status}'."
|
||||||
|
)
|
||||||
|
|
||||||
|
mark_attempt_in_progress(attempt, db_session)
|
||||||
|
|
||||||
|
# only commit once, to make sure this all happens in a single transaction
|
||||||
|
db_session.commit()
|
||||||
|
except Exception:
|
||||||
|
db_session.rollback()
|
||||||
|
logger.exception("_prepare_index_attempt exceptioned.")
|
||||||
|
raise
|
||||||
|
|
||||||
return attempt
|
return attempt
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user