Files
danswer/backend/scripts/dev_run_background_jobs.py
Raunak Bhagat 79b981075e perf: Optimize query history exporting process (#4602)
* Update mode to be a default parameter in `FileStore.read`

* Move query history exporting process to be a background job instead

* Move hardcoded report-file-naming to a common utility function

* Add type annotations

* Update download component

* Implement button to re-ping and download CSV file; fix up some backend file-checking logic

* De-indent logic (w/ early return)

* Return different error codes dependings on the type of task status

* Add more resistant failure retrying mechanisms

* Remove default parameter in helper function

* Use popup for error messaging

* Update return code

* Update web/src/app/ee/admin/performance/query-history/DownloadAsCSV.tsx

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Add type to useState call

* Update backend/ee/onyx/server/query_history/api.py

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Update backend/onyx/file_store/file_store.py

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Update backend/ee/onyx/background/celery/apps/primary.py

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Move rerender call to after check

* Run formatter

* Add type conversions back (smh greptile)

* Remove duplicated call to save_file

* Move non-fallible logic out of try-except block

* Pass date-ranges into API call

* Convert to ISO strings before passing it into the API call

* Add API to list all tasks

* Create new pydantic model to represent tasks to return instead

* Change helper to only fetch query-history tasks

* Use `shared_tasks` instead of old method

* Address more comments from PR; consolidate how task name is generated

* Mark task as failed if any exception is raised

* Change the task object which is returned back to the FE

* Add a table to display previously generated query-history-csv's

* Add timestamps to task; delete tasks as soon as file finishes processing

* Raise exception if start_time is not present

* Convert hard-coded string to constant

* Add "Generated At" field to table

* Return task list in sorted order (based off of start-time)

* Implement pagination

* Remove unused props and cleanup tailwind classes

* Change the name of kickoff button

* Redesign how previous query exports are viewed

* Make button a constant width even when contents change

* Remove timezone information before comparing

* Decrease interval time for re-pinging API

* Add timezone to start-time creation

* Add a refreshInterval for getting updated task status

* Add new background queue

* Edit small verbiage and remove error popup when max-retries is hit

* Change up heavy worker to recognize new task in new module

* Ensure `celery_app` is imported

* Change how `celery_app` is imported and defined

* Update comment on why `celery_app` must be imported

* Add basic skeleton for new beat task to cleanup any dead / failed query-history-export tasks

* Move cleanup task to different worker / queue

* Implement cleanup task

* Add return type

* Address comment on PR

* Remove delimiter from prefix

* Change name of function to be more descriptive

* Remove delimiter from prefix constant

* Move function invocation closer to usage location

* Move imports to top of file

* Move variable up a scope due to undefined error

* Remove dangling if-statement

* Make function more pure-functional

* Remove redefinition

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2025-05-03 00:16:35 +00:00

185 lines
5.0 KiB
Python

import subprocess
import threading
def monitor_process(process_name: str, process: subprocess.Popen) -> None:
assert process.stdout is not None
while True:
output = process.stdout.readline()
if output:
print(f"{process_name}: {output.strip()}")
if process.poll() is not None:
break
def run_jobs() -> None:
# command setup
cmd_worker_primary = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.primary",
"worker",
"--pool=threads",
"--concurrency=6",
"--prefetch-multiplier=1",
"--loglevel=INFO",
"--hostname=primary@%n",
"-Q",
"celery",
]
cmd_worker_light = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.light",
"worker",
"--pool=threads",
"--concurrency=16",
"--prefetch-multiplier=8",
"--loglevel=INFO",
"--hostname=light@%n",
"-Q",
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup",
]
cmd_worker_heavy = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.heavy",
"worker",
"--pool=threads",
"--concurrency=6",
"--prefetch-multiplier=1",
"--loglevel=INFO",
"--hostname=heavy@%n",
"-Q",
"connector_pruning,connector_doc_permissions_sync,connector_external_group_sync,csv_generation",
]
cmd_worker_indexing = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.indexing",
"worker",
"--pool=threads",
"--concurrency=1",
"--prefetch-multiplier=1",
"--loglevel=INFO",
"--hostname=indexing@%n",
"--queues=connector_indexing",
]
cmd_worker_user_files_indexing = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.indexing",
"worker",
"--pool=threads",
"--concurrency=1",
"--prefetch-multiplier=1",
"--loglevel=INFO",
"--hostname=user_files_indexing@%n",
"--queues=user_files_indexing",
]
cmd_worker_monitoring = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.monitoring",
"worker",
"--pool=threads",
"--concurrency=1",
"--prefetch-multiplier=1",
"--loglevel=INFO",
"--hostname=monitoring@%n",
"--queues=monitoring",
]
cmd_beat = [
"celery",
"-A",
"onyx.background.celery.versioned_apps.beat",
"beat",
"--loglevel=INFO",
]
# spawn processes
worker_primary_process = subprocess.Popen(
cmd_worker_primary, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
worker_light_process = subprocess.Popen(
cmd_worker_light, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
worker_heavy_process = subprocess.Popen(
cmd_worker_heavy, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
worker_indexing_process = subprocess.Popen(
cmd_worker_indexing, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
worker_user_files_indexing_process = subprocess.Popen(
cmd_worker_user_files_indexing,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
worker_monitoring_process = subprocess.Popen(
cmd_worker_monitoring,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
beat_process = subprocess.Popen(
cmd_beat, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
# monitor threads
worker_primary_thread = threading.Thread(
target=monitor_process, args=("PRIMARY", worker_primary_process)
)
worker_light_thread = threading.Thread(
target=monitor_process, args=("LIGHT", worker_light_process)
)
worker_heavy_thread = threading.Thread(
target=monitor_process, args=("HEAVY", worker_heavy_process)
)
worker_indexing_thread = threading.Thread(
target=monitor_process, args=("INDEX", worker_indexing_process)
)
worker_user_files_indexing_thread = threading.Thread(
target=monitor_process,
args=("USER_FILES_INDEX", worker_user_files_indexing_process),
)
worker_monitoring_thread = threading.Thread(
target=monitor_process, args=("MONITORING", worker_monitoring_process)
)
beat_thread = threading.Thread(target=monitor_process, args=("BEAT", beat_process))
worker_primary_thread.start()
worker_light_thread.start()
worker_heavy_thread.start()
worker_indexing_thread.start()
worker_user_files_indexing_thread.start()
worker_monitoring_thread.start()
beat_thread.start()
worker_primary_thread.join()
worker_light_thread.join()
worker_heavy_thread.join()
worker_indexing_thread.join()
worker_user_files_indexing_thread.join()
worker_monitoring_thread.join()
beat_thread.join()
if __name__ == "__main__":
run_jobs()