mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-10-09 12:47:13 +02:00
add kombu message cleanup task (#2172)
* add kombu message cleanup task * added some logging if we find an associated task (since tasks shouldn't be around for longer than 7 days)
This commit is contained in:
@@ -1,7 +1,12 @@
|
|||||||
|
import json
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
from typing import Any
|
||||||
from typing import cast
|
from typing import cast
|
||||||
|
|
||||||
from celery import Celery # type: ignore
|
from celery import Celery # type: ignore
|
||||||
|
from celery.contrib.abortable import AbortableTask # type: ignore
|
||||||
|
from celery.exceptions import TaskRevokedError
|
||||||
|
from sqlalchemy import text
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from danswer.background.celery.celery_utils import extract_ids_from_runnable_connector
|
from danswer.background.celery.celery_utils import extract_ids_from_runnable_connector
|
||||||
@@ -16,6 +21,7 @@ from danswer.background.task_utils import name_cc_prune_task
|
|||||||
from danswer.background.task_utils import name_document_set_sync_task
|
from danswer.background.task_utils import name_document_set_sync_task
|
||||||
from danswer.configs.app_configs import JOB_TIMEOUT
|
from danswer.configs.app_configs import JOB_TIMEOUT
|
||||||
from danswer.configs.constants import POSTGRES_CELERY_APP_NAME
|
from danswer.configs.constants import POSTGRES_CELERY_APP_NAME
|
||||||
|
from danswer.configs.constants import PostgresAdvisoryLocks
|
||||||
from danswer.connectors.factory import instantiate_connector
|
from danswer.connectors.factory import instantiate_connector
|
||||||
from danswer.connectors.models import InputType
|
from danswer.connectors.models import InputType
|
||||||
from danswer.db.connector_credential_pair import get_connector_credential_pair
|
from danswer.db.connector_credential_pair import get_connector_credential_pair
|
||||||
@@ -291,6 +297,121 @@ def check_for_cc_pair_deletion_task() -> None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@celery_app.task(
|
||||||
|
name="kombu_message_cleanup_task",
|
||||||
|
soft_time_limit=JOB_TIMEOUT,
|
||||||
|
bind=True,
|
||||||
|
base=AbortableTask,
|
||||||
|
)
|
||||||
|
def kombu_message_cleanup_task(self: Any) -> int:
|
||||||
|
"""Runs periodically to clean up the kombu_message table"""
|
||||||
|
|
||||||
|
# we will select messages older than this amount to clean up
|
||||||
|
KOMBU_MESSAGE_CLEANUP_AGE = 7 # days
|
||||||
|
KOMBU_MESSAGE_CLEANUP_PAGE_LIMIT = 1000
|
||||||
|
|
||||||
|
ctx = {}
|
||||||
|
ctx["last_processed_id"] = 0
|
||||||
|
ctx["deleted"] = 0
|
||||||
|
ctx["cleanup_age"] = KOMBU_MESSAGE_CLEANUP_AGE
|
||||||
|
ctx["page_limit"] = KOMBU_MESSAGE_CLEANUP_PAGE_LIMIT
|
||||||
|
with Session(get_sqlalchemy_engine()) as db_session:
|
||||||
|
# Exit the task if we can't take the advisory lock
|
||||||
|
result = db_session.execute(
|
||||||
|
text("SELECT pg_try_advisory_lock(:id)"),
|
||||||
|
{"id": PostgresAdvisoryLocks.KOMBU_MESSAGE_CLEANUP_LOCK_ID.value},
|
||||||
|
).scalar()
|
||||||
|
if not result:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
if self.is_aborted():
|
||||||
|
raise TaskRevokedError("kombu_message_cleanup_task was aborted.")
|
||||||
|
|
||||||
|
b = kombu_message_cleanup_task_helper(ctx, db_session)
|
||||||
|
if not b:
|
||||||
|
break
|
||||||
|
|
||||||
|
db_session.commit()
|
||||||
|
|
||||||
|
if ctx["deleted"] > 0:
|
||||||
|
logger.info(f"Deleted {ctx['deleted']} orphaned messages from kombu_message.")
|
||||||
|
|
||||||
|
return ctx["deleted"]
|
||||||
|
|
||||||
|
|
||||||
|
def kombu_message_cleanup_task_helper(ctx: dict, db_session: Session) -> bool:
|
||||||
|
"""
|
||||||
|
Helper function to clean up old messages from the `kombu_message` table that are no longer relevant.
|
||||||
|
|
||||||
|
This function retrieves messages from the `kombu_message` table that are no longer visible and
|
||||||
|
older than a specified interval. It checks if the corresponding task_id exists in the
|
||||||
|
`celery_taskmeta` table. If the task_id does not exist, the message is deleted.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ctx (dict): A context dictionary containing configuration parameters such as:
|
||||||
|
- 'cleanup_age' (int): The age in days after which messages are considered old.
|
||||||
|
- 'page_limit' (int): The maximum number of messages to process in one batch.
|
||||||
|
- 'last_processed_id' (int): The ID of the last processed message to handle pagination.
|
||||||
|
- 'deleted' (int): A counter to track the number of deleted messages.
|
||||||
|
db_session (Session): The SQLAlchemy database session for executing queries.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: Returns True if there are more rows to process, False if not.
|
||||||
|
"""
|
||||||
|
|
||||||
|
query = text(
|
||||||
|
"""
|
||||||
|
SELECT id, timestamp, payload
|
||||||
|
FROM kombu_message WHERE visible = 'false'
|
||||||
|
AND timestamp < CURRENT_TIMESTAMP - INTERVAL :interval_days
|
||||||
|
AND id > :last_processed_id
|
||||||
|
ORDER BY id
|
||||||
|
LIMIT :page_limit
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
kombu_messages = db_session.execute(
|
||||||
|
query,
|
||||||
|
{
|
||||||
|
"interval_days": f"{ctx['cleanup_age']} days",
|
||||||
|
"page_limit": ctx["page_limit"],
|
||||||
|
"last_processed_id": ctx["last_processed_id"],
|
||||||
|
},
|
||||||
|
).fetchall()
|
||||||
|
|
||||||
|
if len(kombu_messages) == 0:
|
||||||
|
return False
|
||||||
|
|
||||||
|
for msg in kombu_messages:
|
||||||
|
payload = json.loads(msg[2])
|
||||||
|
task_id = payload["headers"]["id"]
|
||||||
|
|
||||||
|
# Check if task_id exists in celery_taskmeta
|
||||||
|
task_exists = db_session.execute(
|
||||||
|
text("SELECT 1 FROM celery_taskmeta WHERE task_id = :task_id"),
|
||||||
|
{"task_id": task_id},
|
||||||
|
).fetchone()
|
||||||
|
|
||||||
|
# If task_id does not exist, delete the message
|
||||||
|
if not task_exists:
|
||||||
|
result = db_session.execute(
|
||||||
|
text("DELETE FROM kombu_message WHERE id = :message_id"),
|
||||||
|
{"message_id": msg[0]},
|
||||||
|
)
|
||||||
|
if result.rowcount > 0: # type: ignore
|
||||||
|
ctx["deleted"] += 1
|
||||||
|
else:
|
||||||
|
task_name = payload["headers"]["task"]
|
||||||
|
logger.warning(
|
||||||
|
f"Message found for task older than {ctx['cleanup_age']} days. "
|
||||||
|
f"id={task_id} name={task_name}"
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx["last_processed_id"] = msg[0]
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
@celery_app.task(
|
@celery_app.task(
|
||||||
name="check_for_prune_task",
|
name="check_for_prune_task",
|
||||||
soft_time_limit=JOB_TIMEOUT,
|
soft_time_limit=JOB_TIMEOUT,
|
||||||
@@ -341,3 +462,11 @@ celery_app.conf.beat_schedule.update(
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
celery_app.conf.beat_schedule.update(
|
||||||
|
{
|
||||||
|
"kombu-message-cleanup": {
|
||||||
|
"task": "kombu_message_cleanup_task",
|
||||||
|
"schedule": timedelta(seconds=3600),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
@@ -1,3 +1,4 @@
|
|||||||
|
from enum import auto
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
SOURCE_TYPE = "source_type"
|
SOURCE_TYPE = "source_type"
|
||||||
@@ -161,3 +162,7 @@ class FileOrigin(str, Enum):
|
|||||||
CONNECTOR = "connector"
|
CONNECTOR = "connector"
|
||||||
GENERATED_REPORT = "generated_report"
|
GENERATED_REPORT = "generated_report"
|
||||||
OTHER = "other"
|
OTHER = "other"
|
||||||
|
|
||||||
|
|
||||||
|
class PostgresAdvisoryLocks(Enum):
|
||||||
|
KOMBU_MESSAGE_CLEANUP_LOCK_ID = auto()
|
||||||
|
Reference in New Issue
Block a user