diff --git a/backend/ee/danswer/background/celery/celery.py b/backend/ee/danswer/background/celery/celery.py index e46d0a5e7..0a742bd86 100644 --- a/backend/ee/danswer/background/celery/celery.py +++ b/backend/ee/danswer/background/celery/celery.py @@ -1,7 +1,75 @@ +from datetime import timedelta + +from sqlalchemy.orm import Session + from danswer.background.celery.celery import celery_app +from danswer.configs.app_configs import JOB_TIMEOUT +from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.tasks import check_live_task_not_timed_out +from danswer.db.tasks import get_latest_task +from danswer.db.tasks import mark_task_finished +from danswer.db.tasks import mark_task_start +from danswer.db.tasks import register_task +from danswer.utils.logger import setup_logger +from ee.danswer.background.user_group_sync import name_user_group_sync_task +from ee.danswer.db.user_group import fetch_user_groups from ee.danswer.user_groups.sync import sync_user_groups +logger = setup_logger() -@celery_app.task(soft_time_limit=60 * 60 * 6) # 6 hour time limit + +@celery_app.task(soft_time_limit=JOB_TIMEOUT) def sync_user_group_task(user_group_id: int) -> None: - sync_user_groups(user_group_id=user_group_id) + with Session(get_sqlalchemy_engine()) as db_session: + task_name = name_user_group_sync_task(user_group_id) + mark_task_start(task_name, db_session) + + # actual sync logic + sync_user_groups(user_group_id=user_group_id, db_session=db_session) + + mark_task_finished(task_name, db_session) + + +##### +# Periodic Tasks +##### +@celery_app.task( + name="check_for_user_groups_sync_task", + soft_time_limit=JOB_TIMEOUT, +) +def check_for_user_groups_sync_task() -> None: + """Runs periodically to check if any user groups are out of sync + Creates a task to sync the user group if needed""" + with Session(get_sqlalchemy_engine()) as db_session: + # check if any document sets are not synced + user_groups = fetch_user_groups(db_session=db_session, only_current=False) + for user_group in user_groups: + if not user_group.is_up_to_date: + task_name = name_user_group_sync_task(user_group.id) + latest_sync = get_latest_task(task_name, db_session) + + if latest_sync and check_live_task_not_timed_out( + latest_sync, db_session + ): + logger.info( + f"User Group '{user_group.id}' is already syncing. Skipping." + ) + continue + + logger.info(f"User Group {user_group.id} is not synced. Syncing now!") + task = sync_user_group_task.apply_async( + kwargs=dict(user_group_id=user_group.id), + ) + register_task(task.id, task_name, db_session) + + +##### +# Celery Beat (Periodic Tasks) Settings +##### +celery_app.conf.beat_schedule = { + "check-for-user-group-sync": { + "task": "check_for_user_groups_sync_task", + "schedule": timedelta(seconds=5), + }, + **(celery_app.conf.beat_schedule or {}), +} diff --git a/backend/ee/danswer/background/user_group_sync.py b/backend/ee/danswer/background/user_group_sync.py new file mode 100644 index 000000000..ce824c471 --- /dev/null +++ b/backend/ee/danswer/background/user_group_sync.py @@ -0,0 +1,2 @@ +def name_user_group_sync_task(user_group_id: int) -> str: + return f"user_group_sync_task__{user_group_id}" diff --git a/backend/ee/danswer/user_groups/sync.py b/backend/ee/danswer/user_groups/sync.py index 9fc3c68ef..91de86431 100644 --- a/backend/ee/danswer/user_groups/sync.py +++ b/backend/ee/danswer/user_groups/sync.py @@ -44,27 +44,25 @@ def _sync_user_group_batch( ) -def sync_user_groups(user_group_id: int) -> None: +def sync_user_groups(user_group_id: int, db_session: Session) -> None: """Sync the status of Postgres for the specified user group""" document_index = get_default_document_index() - with Session(get_sqlalchemy_engine()) as db_session: - user_group = fetch_user_group( - db_session=db_session, user_group_id=user_group_id - ) - if user_group is None: - raise ValueError(f"User group '{user_group_id}' does not exist") - documents_to_update = fetch_documents_for_user_group( - db_session=db_session, - user_group_id=user_group_id, - ) - for document_batch in batch_generator(documents_to_update, _SYNC_BATCH_SIZE): - _sync_user_group_batch( - document_ids=[document.id for document in document_batch], - document_index=document_index, - ) + user_group = fetch_user_group(db_session=db_session, user_group_id=user_group_id) + if user_group is None: + raise ValueError(f"User group '{user_group_id}' does not exist") - if user_group.is_up_for_deletion: - delete_user_group(db_session=db_session, user_group=user_group) - else: - mark_user_group_as_synced(db_session=db_session, user_group=user_group) + documents_to_update = fetch_documents_for_user_group( + db_session=db_session, + user_group_id=user_group_id, + ) + for document_batch in batch_generator(documents_to_update, _SYNC_BATCH_SIZE): + _sync_user_group_batch( + document_ids=[document.id for document in document_batch], + document_index=document_index, + ) + + if user_group.is_up_for_deletion: + delete_user_group(db_session=db_session, user_group=user_group) + else: + mark_user_group_as_synced(db_session=db_session, user_group=user_group)