diff --git a/backend/ee/danswer/background/celery/celery.py b/backend/ee/danswer/background/celery/celery.py index 16de00da24f..d190b18e799 100644 --- a/backend/ee/danswer/background/celery/celery.py +++ b/backend/ee/danswer/background/celery/celery.py @@ -24,7 +24,8 @@ global_version.set_ee() @celery_app.task(soft_time_limit=JOB_TIMEOUT) def sync_user_group_task(user_group_id: int) -> None: - with Session(get_sqlalchemy_engine()) as db_session: + engine = get_sqlalchemy_engine() + with Session(engine) as db_session: task_name = name_user_group_sync_task(user_group_id) mark_task_start(task_name, db_session) @@ -36,6 +37,9 @@ def sync_user_group_task(user_group_id: int) -> None: error_msg = str(e) logger.exception(f"Failed to sync user group - {error_msg}") + # need a new session so this can be committed (previous transaction may have + # been rolled back due to the exception) + with Session(engine) as db_session: mark_task_finished(task_name, db_session, success=error_msg is None) diff --git a/backend/ee/danswer/background/user_group_sync_script.py b/backend/ee/danswer/background/user_group_sync_script.py deleted file mode 100644 index f8bfea087c4..00000000000 --- a/backend/ee/danswer/background/user_group_sync_script.py +++ /dev/null @@ -1,54 +0,0 @@ -import time -from celery.result import AsyncResult -from sqlalchemy.orm import Session - -from danswer.db.engine import get_sqlalchemy_engine -from danswer.utils.logger import setup_logger -from ee.danswer.background.celery.celery import sync_user_group_task -from ee.danswer.db.user_group import fetch_user_groups - -logger = setup_logger() - - -_ExistingTaskCache: dict[int, AsyncResult] = {} - - -def _user_group_sync_loop() -> None: - # cleanup tasks - existing_tasks = list(_ExistingTaskCache.items()) - for user_group_id, task in existing_tasks: - if task.ready(): - logger.info( - f"User Group '{user_group_id}' is complete with status " - f"{task.status}. Cleaning up." - ) - del _ExistingTaskCache[user_group_id] - - # kick off new tasks - with Session(get_sqlalchemy_engine()) as db_session: - # check if any document sets are not synced - user_groups = fetch_user_groups(db_session=db_session, only_current=False) - for user_group in user_groups: - if not user_group.is_up_to_date: - if user_group.id in _ExistingTaskCache: - logger.info( - f"User Group '{user_group.id}' is already syncing. Skipping." - ) - continue - - logger.info(f"User Group {user_group.id} is not synced. Syncing now!") - task = sync_user_group_task.apply_async( - kwargs=dict(user_group_id=user_group.id), - ) - _ExistingTaskCache[user_group.id] = task - - -if __name__ == "__main__": - while True: - start = time.monotonic() - - _user_group_sync_loop() - - sleep_time = 30 - (time.monotonic() - start) - if sleep_time > 0: - time.sleep(sleep_time) diff --git a/backend/ee/danswer/db/user_group.py b/backend/ee/danswer/db/user_group.py index 2a83235bcea..c41fc0d904b 100644 --- a/backend/ee/danswer/db/user_group.py +++ b/backend/ee/danswer/db/user_group.py @@ -294,5 +294,9 @@ def delete_user_group(db_session: Session, user_group: UserGroup) -> None: user_group_id=user_group.id, outdated_only=False, ) + + # need to flush so that we don't get a foreign key error when deleting the user group row + db_session.flush() + db_session.delete(user_group) db_session.commit()