diff --git a/backend/ee/onyx/background/celery/apps/heavy.py b/backend/ee/onyx/background/celery/apps/heavy.py index bd8ee6e694..ff6dd9ba6c 100644 --- a/backend/ee/onyx/background/celery/apps/heavy.py +++ b/backend/ee/onyx/background/celery/apps/heavy.py @@ -1,12 +1,10 @@ import csv import io from datetime import datetime -from datetime import timezone from celery import shared_task from celery import Task -from ee.onyx.background.task_name_builders import query_history_task_name from ee.onyx.server.query_history.api import fetch_and_process_chat_session_history from ee.onyx.server.query_history.api import ONYX_ANONYMIZED_EMAIL from ee.onyx.server.query_history.models import QuestionAnswerPairSnapshot @@ -19,10 +17,9 @@ from onyx.configs.constants import FileType from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import QueryHistoryType from onyx.db.engine import get_session_with_current_tenant -from onyx.db.enums import TaskStatus from onyx.db.tasks import delete_task_with_id from onyx.db.tasks import mark_task_as_finished_with_id -from onyx.db.tasks import register_task +from onyx.db.tasks import mark_task_as_started_with_id from onyx.file_store.file_store import get_default_file_store from onyx.utils.logger import setup_logger @@ -37,13 +34,13 @@ logger = setup_logger() bind=True, trail=False, ) -def export_query_history_task(self: Task, *, start: datetime, end: datetime) -> None: +def export_query_history_task( + self: Task, *, start: datetime, end: datetime, start_time: datetime +) -> None: if not self.request.id: raise RuntimeError("No task id defined for this task; cannot identify it") task_id = self.request.id - start_time = datetime.now(tz=timezone.utc) - stream = io.StringIO() writer = csv.DictWriter( stream, @@ -53,12 +50,9 @@ def export_query_history_task(self: Task, *, start: datetime, end: datetime) -> with get_session_with_current_tenant() as db_session: try: - register_task( + mark_task_as_started_with_id( db_session=db_session, - task_name=query_history_task_name(start=start, end=end), task_id=task_id, - status=TaskStatus.STARTED, - start_time=start_time, ) snapshot_generator = fetch_and_process_chat_session_history( diff --git a/backend/ee/onyx/server/query_history/api.py b/backend/ee/onyx/server/query_history/api.py index 530f61adcc..6c03670280 100644 --- a/backend/ee/onyx/server/query_history/api.py +++ b/backend/ee/onyx/server/query_history/api.py @@ -1,3 +1,4 @@ +import uuid from collections.abc import Generator from datetime import datetime from datetime import timezone @@ -11,6 +12,7 @@ from fastapi import Query from fastapi.responses import StreamingResponse from sqlalchemy.orm import Session +from ee.onyx.background.task_name_builders import query_history_task_name from ee.onyx.db.query_history import get_all_query_history_export_tasks from ee.onyx.db.query_history import get_page_of_chat_sessions from ee.onyx.db.query_history import get_total_filtered_chat_sessions_count @@ -41,6 +43,7 @@ from onyx.db.models import ChatSession from onyx.db.models import User from onyx.db.pg_file_store import get_query_history_export_files from onyx.db.tasks import get_task_with_id +from onyx.db.tasks import register_task from onyx.file_store.file_store import get_default_file_store from onyx.server.documents.models import PaginatedReturn from onyx.server.query_and_chat.models import ChatSessionDetails @@ -310,17 +313,31 @@ def start_query_history_export( f"Start time must come before end time, but instead got the start time coming after; {start=} {end=}", ) - task = client_app.send_task( + task_id_uuid = uuid.uuid4() + task_id = str(task_id_uuid) + start_time = datetime.now(tz=timezone.utc) + + register_task( + db_session=db_session, + task_name=query_history_task_name(start=start, end=end), + task_id=task_id, + status=TaskStatus.PENDING, + start_time=start_time, + ) + + client_app.send_task( OnyxCeleryTask.EXPORT_QUERY_HISTORY_TASK, + task_id=task_id, priority=OnyxCeleryPriority.MEDIUM, queue=OnyxCeleryQueues.CSV_GENERATION, kwargs={ "start": start, "end": end, + "start_time": start_time, }, ) - return {"request_id": task.id} + return {"request_id": task_id} @router.get("/admin/query-history/export-status")