Remove Nested Session (#78)

This commit is contained in:
Yuhong Sun
2024-04-22 23:27:20 -07:00
committed by Chris Weaver
parent 6c6e33e001
commit f55a4ef9bd
3 changed files with 24 additions and 22 deletions

View File

@@ -37,7 +37,7 @@ def sync_user_group_task(user_group_id: int) -> None:
error_msg = str(e)
logger.exception(f"Failed to sync user group - {error_msg}")
# need a new session so this can be committed (previous transaction may have
# Need a new session so this can be committed (previous transaction may have
# been rolled back due to the exception)
with Session(engine) as db_session:
mark_task_finished(task_name, db_session, success=error_msg is None)

View File

@@ -18,7 +18,7 @@ from ee.danswer.server.api_key.models import APIKeyArgs
def is_api_key_email_address(email: str) -> bool:
return email.endswith(f"@{DANSWER_API_KEY_DUMMY_EMAIL_DOMAIN}")
return email.endswith(f"{DANSWER_API_KEY_DUMMY_EMAIL_DOMAIN}")
def fetch_api_keys(db_session: Session) -> list[ApiKeyDescriptor]:

View File

@@ -4,7 +4,6 @@ from danswer.access.access import get_access_for_documents
from danswer.db.document import prepare_to_modify_documents
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.engine import get_sqlalchemy_engine
from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
@@ -21,29 +20,31 @@ _SYNC_BATCH_SIZE = 1000
def _sync_user_group_batch(
document_ids: list[str], document_index: DocumentIndex
document_ids: list[str], document_index: DocumentIndex, db_session: Session
) -> None:
logger.debug(f"Syncing document sets for: {document_ids}")
# begin a transaction, release lock at the end
with Session(get_sqlalchemy_engine()) as db_session:
# acquires a lock on the documents so that no other process can modify them
prepare_to_modify_documents(db_session=db_session, document_ids=document_ids)
# get current state of document sets for these documents
document_id_to_access = get_access_for_documents(
document_ids=document_ids, db_session=db_session
)
# Acquires a lock on the documents so that no other process can modify them
prepare_to_modify_documents(db_session=db_session, document_ids=document_ids)
# update Vespa
document_index.update(
update_requests=[
UpdateRequest(
document_ids=[document_id],
access=document_id_to_access[document_id],
)
for document_id in document_ids
]
)
# get current state of document sets for these documents
document_id_to_access = get_access_for_documents(
document_ids=document_ids, db_session=db_session
)
# update Vespa
document_index.update(
update_requests=[
UpdateRequest(
document_ids=[document_id],
access=document_id_to_access[document_id],
)
for document_id in document_ids
]
)
# Finish the transaction and release the locks
db_session.commit()
def sync_user_groups(user_group_id: int, db_session: Session) -> None:
@@ -70,6 +71,7 @@ def sync_user_groups(user_group_id: int, db_session: Session) -> None:
_sync_user_group_batch(
document_ids=[document.id for document in document_batch],
document_index=document_index,
db_session=db_session,
)
if user_group.is_up_for_deletion: