mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-10-11 05:36:03 +02:00
* Update mode to be a default parameter in `FileStore.read` * Move query history exporting process to be a background job instead * Move hardcoded report-file-naming to a common utility function * Add type annotations * Update download component * Implement button to re-ping and download CSV file; fix up some backend file-checking logic * De-indent logic (w/ early return) * Return different error codes dependings on the type of task status * Add more resistant failure retrying mechanisms * Remove default parameter in helper function * Use popup for error messaging * Update return code * Update web/src/app/ee/admin/performance/query-history/DownloadAsCSV.tsx Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Add type to useState call * Update backend/ee/onyx/server/query_history/api.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Update backend/onyx/file_store/file_store.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Update backend/ee/onyx/background/celery/apps/primary.py Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com> * Move rerender call to after check * Run formatter * Add type conversions back (smh greptile) * Remove duplicated call to save_file * Move non-fallible logic out of try-except block * Pass date-ranges into API call * Convert to ISO strings before passing it into the API call * Add API to list all tasks * Create new pydantic model to represent tasks to return instead * Change helper to only fetch query-history tasks * Use `shared_tasks` instead of old method * Address more comments from PR; consolidate how task name is generated * Mark task as failed if any exception is raised * Change the task object which is returned back to the FE * Add a table to display previously generated query-history-csv's * Add timestamps to task; delete tasks as soon as file finishes processing * Raise exception if start_time is not present * Convert hard-coded string to constant * Add "Generated At" field to table * Return task list in sorted order (based off of start-time) * Implement pagination * Remove unused props and cleanup tailwind classes * Change the name of kickoff button * Redesign how previous query exports are viewed * Make button a constant width even when contents change * Remove timezone information before comparing * Decrease interval time for re-pinging API * Add timezone to start-time creation * Add a refreshInterval for getting updated task status * Add new background queue * Edit small verbiage and remove error popup when max-retries is hit * Change up heavy worker to recognize new task in new module * Ensure `celery_app` is imported * Change how `celery_app` is imported and defined * Update comment on why `celery_app` must be imported * Add basic skeleton for new beat task to cleanup any dead / failed query-history-export tasks * Move cleanup task to different worker / queue * Implement cleanup task * Add return type * Address comment on PR * Remove delimiter from prefix * Change name of function to be more descriptive * Remove delimiter from prefix constant * Move function invocation closer to usage location * Move imports to top of file * Move variable up a scope due to undefined error * Remove dangling if-statement * Make function more pure-functional * Remove redefinition --------- Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
185 lines
5.0 KiB
Python
185 lines
5.0 KiB
Python
import subprocess
|
|
import threading
|
|
|
|
|
|
def monitor_process(process_name: str, process: subprocess.Popen) -> None:
|
|
assert process.stdout is not None
|
|
|
|
while True:
|
|
output = process.stdout.readline()
|
|
|
|
if output:
|
|
print(f"{process_name}: {output.strip()}")
|
|
|
|
if process.poll() is not None:
|
|
break
|
|
|
|
|
|
def run_jobs() -> None:
|
|
# command setup
|
|
cmd_worker_primary = [
|
|
"celery",
|
|
"-A",
|
|
"onyx.background.celery.versioned_apps.primary",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=6",
|
|
"--prefetch-multiplier=1",
|
|
"--loglevel=INFO",
|
|
"--hostname=primary@%n",
|
|
"-Q",
|
|
"celery",
|
|
]
|
|
|
|
cmd_worker_light = [
|
|
"celery",
|
|
"-A",
|
|
"onyx.background.celery.versioned_apps.light",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=16",
|
|
"--prefetch-multiplier=8",
|
|
"--loglevel=INFO",
|
|
"--hostname=light@%n",
|
|
"-Q",
|
|
"vespa_metadata_sync,connector_deletion,doc_permissions_upsert,checkpoint_cleanup",
|
|
]
|
|
|
|
cmd_worker_heavy = [
|
|
"celery",
|
|
"-A",
|
|
"onyx.background.celery.versioned_apps.heavy",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=6",
|
|
"--prefetch-multiplier=1",
|
|
"--loglevel=INFO",
|
|
"--hostname=heavy@%n",
|
|
"-Q",
|
|
"connector_pruning,connector_doc_permissions_sync,connector_external_group_sync,csv_generation",
|
|
]
|
|
|
|
cmd_worker_indexing = [
|
|
"celery",
|
|
"-A",
|
|
"onyx.background.celery.versioned_apps.indexing",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=1",
|
|
"--prefetch-multiplier=1",
|
|
"--loglevel=INFO",
|
|
"--hostname=indexing@%n",
|
|
"--queues=connector_indexing",
|
|
]
|
|
|
|
cmd_worker_user_files_indexing = [
|
|
"celery",
|
|
"-A",
|
|
"onyx.background.celery.versioned_apps.indexing",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=1",
|
|
"--prefetch-multiplier=1",
|
|
"--loglevel=INFO",
|
|
"--hostname=user_files_indexing@%n",
|
|
"--queues=user_files_indexing",
|
|
]
|
|
|
|
cmd_worker_monitoring = [
|
|
"celery",
|
|
"-A",
|
|
"onyx.background.celery.versioned_apps.monitoring",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=1",
|
|
"--prefetch-multiplier=1",
|
|
"--loglevel=INFO",
|
|
"--hostname=monitoring@%n",
|
|
"--queues=monitoring",
|
|
]
|
|
|
|
cmd_beat = [
|
|
"celery",
|
|
"-A",
|
|
"onyx.background.celery.versioned_apps.beat",
|
|
"beat",
|
|
"--loglevel=INFO",
|
|
]
|
|
|
|
# spawn processes
|
|
worker_primary_process = subprocess.Popen(
|
|
cmd_worker_primary, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
worker_light_process = subprocess.Popen(
|
|
cmd_worker_light, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
worker_heavy_process = subprocess.Popen(
|
|
cmd_worker_heavy, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
worker_indexing_process = subprocess.Popen(
|
|
cmd_worker_indexing, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
worker_user_files_indexing_process = subprocess.Popen(
|
|
cmd_worker_user_files_indexing,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
|
|
worker_monitoring_process = subprocess.Popen(
|
|
cmd_worker_monitoring,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
|
|
beat_process = subprocess.Popen(
|
|
cmd_beat, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
# monitor threads
|
|
worker_primary_thread = threading.Thread(
|
|
target=monitor_process, args=("PRIMARY", worker_primary_process)
|
|
)
|
|
worker_light_thread = threading.Thread(
|
|
target=monitor_process, args=("LIGHT", worker_light_process)
|
|
)
|
|
worker_heavy_thread = threading.Thread(
|
|
target=monitor_process, args=("HEAVY", worker_heavy_process)
|
|
)
|
|
worker_indexing_thread = threading.Thread(
|
|
target=monitor_process, args=("INDEX", worker_indexing_process)
|
|
)
|
|
worker_user_files_indexing_thread = threading.Thread(
|
|
target=monitor_process,
|
|
args=("USER_FILES_INDEX", worker_user_files_indexing_process),
|
|
)
|
|
worker_monitoring_thread = threading.Thread(
|
|
target=monitor_process, args=("MONITORING", worker_monitoring_process)
|
|
)
|
|
beat_thread = threading.Thread(target=monitor_process, args=("BEAT", beat_process))
|
|
|
|
worker_primary_thread.start()
|
|
worker_light_thread.start()
|
|
worker_heavy_thread.start()
|
|
worker_indexing_thread.start()
|
|
worker_user_files_indexing_thread.start()
|
|
worker_monitoring_thread.start()
|
|
beat_thread.start()
|
|
|
|
worker_primary_thread.join()
|
|
worker_light_thread.join()
|
|
worker_heavy_thread.join()
|
|
worker_indexing_thread.join()
|
|
worker_user_files_indexing_thread.join()
|
|
worker_monitoring_thread.join()
|
|
beat_thread.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_jobs()
|