Move user group syncing to Celery Beat

This commit is contained in:
Weves 2023-10-21 18:55:54 -07:00 committed by Chris Weaver
parent 7503f8f37b
commit 50170cc97e
3 changed files with 90 additions and 22 deletions

View File

@ -1,7 +1,75 @@
from datetime import timedelta
from sqlalchemy.orm import Session
from danswer.background.celery.celery import celery_app
from danswer.configs.app_configs import JOB_TIMEOUT
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.tasks import check_live_task_not_timed_out
from danswer.db.tasks import get_latest_task
from danswer.db.tasks import mark_task_finished
from danswer.db.tasks import mark_task_start
from danswer.db.tasks import register_task
from danswer.utils.logger import setup_logger
from ee.danswer.background.user_group_sync import name_user_group_sync_task
from ee.danswer.db.user_group import fetch_user_groups
from ee.danswer.user_groups.sync import sync_user_groups
logger = setup_logger()
@celery_app.task(soft_time_limit=60 * 60 * 6) # 6 hour time limit
@celery_app.task(soft_time_limit=JOB_TIMEOUT)
def sync_user_group_task(user_group_id: int) -> None:
sync_user_groups(user_group_id=user_group_id)
with Session(get_sqlalchemy_engine()) as db_session:
task_name = name_user_group_sync_task(user_group_id)
mark_task_start(task_name, db_session)
# actual sync logic
sync_user_groups(user_group_id=user_group_id, db_session=db_session)
mark_task_finished(task_name, db_session)
#####
# Periodic Tasks
#####
@celery_app.task(
name="check_for_user_groups_sync_task",
soft_time_limit=JOB_TIMEOUT,
)
def check_for_user_groups_sync_task() -> None:
"""Runs periodically to check if any user groups are out of sync
Creates a task to sync the user group if needed"""
with Session(get_sqlalchemy_engine()) as db_session:
# check if any document sets are not synced
user_groups = fetch_user_groups(db_session=db_session, only_current=False)
for user_group in user_groups:
if not user_group.is_up_to_date:
task_name = name_user_group_sync_task(user_group.id)
latest_sync = get_latest_task(task_name, db_session)
if latest_sync and check_live_task_not_timed_out(
latest_sync, db_session
):
logger.info(
f"User Group '{user_group.id}' is already syncing. Skipping."
)
continue
logger.info(f"User Group {user_group.id} is not synced. Syncing now!")
task = sync_user_group_task.apply_async(
kwargs=dict(user_group_id=user_group.id),
)
register_task(task.id, task_name, db_session)
#####
# Celery Beat (Periodic Tasks) Settings
#####
celery_app.conf.beat_schedule = {
"check-for-user-group-sync": {
"task": "check_for_user_groups_sync_task",
"schedule": timedelta(seconds=5),
},
**(celery_app.conf.beat_schedule or {}),
}

View File

@ -0,0 +1,2 @@
def name_user_group_sync_task(user_group_id: int) -> str:
return f"user_group_sync_task__{user_group_id}"

View File

@ -44,27 +44,25 @@ def _sync_user_group_batch(
)
def sync_user_groups(user_group_id: int) -> None:
def sync_user_groups(user_group_id: int, db_session: Session) -> None:
"""Sync the status of Postgres for the specified user group"""
document_index = get_default_document_index()
with Session(get_sqlalchemy_engine()) as db_session:
user_group = fetch_user_group(
db_session=db_session, user_group_id=user_group_id
)
if user_group is None:
raise ValueError(f"User group '{user_group_id}' does not exist")
documents_to_update = fetch_documents_for_user_group(
db_session=db_session,
user_group_id=user_group_id,
)
for document_batch in batch_generator(documents_to_update, _SYNC_BATCH_SIZE):
_sync_user_group_batch(
document_ids=[document.id for document in document_batch],
document_index=document_index,
)
user_group = fetch_user_group(db_session=db_session, user_group_id=user_group_id)
if user_group is None:
raise ValueError(f"User group '{user_group_id}' does not exist")
if user_group.is_up_for_deletion:
delete_user_group(db_session=db_session, user_group=user_group)
else:
mark_user_group_as_synced(db_session=db_session, user_group=user_group)
documents_to_update = fetch_documents_for_user_group(
db_session=db_session,
user_group_id=user_group_id,
)
for document_batch in batch_generator(documents_to_update, _SYNC_BATCH_SIZE):
_sync_user_group_batch(
document_ids=[document.id for document in document_batch],
document_index=document_index,
)
if user_group.is_up_for_deletion:
delete_user_group(db_session=db_session, user_group=user_group)
else:
mark_user_group_as_synced(db_session=db_session, user_group=user_group)