From 583bd1d207c4597a54b647dfcad13753bb9fef95 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Mon, 19 Aug 2024 22:15:44 -0700 Subject: [PATCH] add kombu message cleanup task (#2172) * add kombu message cleanup task * added some logging if we find an associated task (since tasks shouldn't be around for longer than 7 days) --- .../danswer/background/celery/celery_app.py | 129 ++++++++++++++++++ backend/danswer/configs/constants.py | 5 + 2 files changed, 134 insertions(+) diff --git a/backend/danswer/background/celery/celery_app.py b/backend/danswer/background/celery/celery_app.py index 1a185b62e..ffd805c29 100644 --- a/backend/danswer/background/celery/celery_app.py +++ b/backend/danswer/background/celery/celery_app.py @@ -1,7 +1,12 @@ +import json from datetime import timedelta +from typing import Any from typing import cast from celery import Celery # type: ignore +from celery.contrib.abortable import AbortableTask # type: ignore +from celery.exceptions import TaskRevokedError +from sqlalchemy import text from sqlalchemy.orm import Session from danswer.background.celery.celery_utils import extract_ids_from_runnable_connector @@ -16,6 +21,7 @@ from danswer.background.task_utils import name_cc_prune_task from danswer.background.task_utils import name_document_set_sync_task from danswer.configs.app_configs import JOB_TIMEOUT from danswer.configs.constants import POSTGRES_CELERY_APP_NAME +from danswer.configs.constants import PostgresAdvisoryLocks from danswer.connectors.factory import instantiate_connector from danswer.connectors.models import InputType from danswer.db.connector_credential_pair import get_connector_credential_pair @@ -291,6 +297,121 @@ def check_for_cc_pair_deletion_task() -> None: ) +@celery_app.task( + name="kombu_message_cleanup_task", + soft_time_limit=JOB_TIMEOUT, + bind=True, + base=AbortableTask, +) +def kombu_message_cleanup_task(self: Any) -> int: + """Runs periodically to clean up the kombu_message table""" + + # we will select messages older than this amount to clean up + KOMBU_MESSAGE_CLEANUP_AGE = 7 # days + KOMBU_MESSAGE_CLEANUP_PAGE_LIMIT = 1000 + + ctx = {} + ctx["last_processed_id"] = 0 + ctx["deleted"] = 0 + ctx["cleanup_age"] = KOMBU_MESSAGE_CLEANUP_AGE + ctx["page_limit"] = KOMBU_MESSAGE_CLEANUP_PAGE_LIMIT + with Session(get_sqlalchemy_engine()) as db_session: + # Exit the task if we can't take the advisory lock + result = db_session.execute( + text("SELECT pg_try_advisory_lock(:id)"), + {"id": PostgresAdvisoryLocks.KOMBU_MESSAGE_CLEANUP_LOCK_ID.value}, + ).scalar() + if not result: + return 0 + + while True: + if self.is_aborted(): + raise TaskRevokedError("kombu_message_cleanup_task was aborted.") + + b = kombu_message_cleanup_task_helper(ctx, db_session) + if not b: + break + + db_session.commit() + + if ctx["deleted"] > 0: + logger.info(f"Deleted {ctx['deleted']} orphaned messages from kombu_message.") + + return ctx["deleted"] + + +def kombu_message_cleanup_task_helper(ctx: dict, db_session: Session) -> bool: + """ + Helper function to clean up old messages from the `kombu_message` table that are no longer relevant. + + This function retrieves messages from the `kombu_message` table that are no longer visible and + older than a specified interval. It checks if the corresponding task_id exists in the + `celery_taskmeta` table. If the task_id does not exist, the message is deleted. + + Args: + ctx (dict): A context dictionary containing configuration parameters such as: + - 'cleanup_age' (int): The age in days after which messages are considered old. + - 'page_limit' (int): The maximum number of messages to process in one batch. + - 'last_processed_id' (int): The ID of the last processed message to handle pagination. + - 'deleted' (int): A counter to track the number of deleted messages. + db_session (Session): The SQLAlchemy database session for executing queries. + + Returns: + bool: Returns True if there are more rows to process, False if not. + """ + + query = text( + """ + SELECT id, timestamp, payload + FROM kombu_message WHERE visible = 'false' + AND timestamp < CURRENT_TIMESTAMP - INTERVAL :interval_days + AND id > :last_processed_id + ORDER BY id + LIMIT :page_limit +""" + ) + kombu_messages = db_session.execute( + query, + { + "interval_days": f"{ctx['cleanup_age']} days", + "page_limit": ctx["page_limit"], + "last_processed_id": ctx["last_processed_id"], + }, + ).fetchall() + + if len(kombu_messages) == 0: + return False + + for msg in kombu_messages: + payload = json.loads(msg[2]) + task_id = payload["headers"]["id"] + + # Check if task_id exists in celery_taskmeta + task_exists = db_session.execute( + text("SELECT 1 FROM celery_taskmeta WHERE task_id = :task_id"), + {"task_id": task_id}, + ).fetchone() + + # If task_id does not exist, delete the message + if not task_exists: + result = db_session.execute( + text("DELETE FROM kombu_message WHERE id = :message_id"), + {"message_id": msg[0]}, + ) + if result.rowcount > 0: # type: ignore + ctx["deleted"] += 1 + else: + task_name = payload["headers"]["task"] + logger.warning( + f"Message found for task older than {ctx['cleanup_age']} days. " + f"id={task_id} name={task_name}" + ) + + ctx["last_processed_id"] = msg[0] + + return True + + @celery_app.task( name="check_for_prune_task", soft_time_limit=JOB_TIMEOUT, @@ -341,3 +462,11 @@ celery_app.conf.beat_schedule.update( }, } ) +celery_app.conf.beat_schedule.update( + { + "kombu-message-cleanup": { + "task": "kombu_message_cleanup_task", + "schedule": timedelta(seconds=3600), + }, + } +) diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index d5f2b63b5..64c162d7b 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -1,3 +1,4 @@ +from enum import auto from enum import Enum SOURCE_TYPE = "source_type" @@ -161,3 +162,7 @@ class FileOrigin(str, Enum): CONNECTOR = "connector" GENERATED_REPORT = "generated_report" OTHER = "other" + + +class PostgresAdvisoryLocks(Enum): + KOMBU_MESSAGE_CLEANUP_LOCK_ID = auto()