Updated Contributing for Celery (#629)

This commit is contained in:
Yuhong Sun
2023-10-25 18:26:02 -07:00
committed by GitHub
parent fbb05e630d
commit 9a51745fc9
5 changed files with 74 additions and 74 deletions

View File

@@ -1,4 +1,5 @@
# This file is purely for development use, not included in any builds
import argparse
import os
import subprocess
import threading
@@ -16,18 +17,20 @@ def monitor_process(process_name: str, process: subprocess.Popen) -> None:
break
def run_celery() -> None:
def run_jobs(exclude_indexing: bool) -> None:
cmd_worker = [
"celery",
"-A",
"danswer.background.celery",
"worker",
"--pool=threads",
"--autoscale=3,10",
"--loglevel=INFO",
"--concurrency=1",
]
cmd_beat = ["celery", "-A", "danswer.background.celery", "beat", "--loglevel=INFO"]
# Redirect stderr to stdout for both processes
worker_process = subprocess.Popen(
cmd_worker, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
@@ -35,7 +38,6 @@ def run_celery() -> None:
cmd_beat, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)
# Monitor outputs using threads
worker_thread = threading.Thread(
target=monitor_process, args=("WORKER", worker_process)
)
@@ -44,10 +46,37 @@ def run_celery() -> None:
worker_thread.start()
beat_thread.start()
# Wait for threads to finish
if not exclude_indexing:
update_env = os.environ.copy()
update_env["PYTHONPATH"] = "."
update_env["DYNAMIC_CONFIG_DIR_PATH"] = "./dynamic_config_storage"
update_env["FILE_CONNECTOR_TMP_STORAGE_PATH"] = "./dynamic_config_storage"
cmd_indexing = ["python", "danswer/background/update.py"]
indexing_process = subprocess.Popen(
cmd_indexing,
env=update_env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
indexing_thread = threading.Thread(
target=monitor_process, args=("INDEXING", indexing_process)
)
indexing_thread.start()
indexing_thread.join()
worker_thread.join()
beat_thread.join()
if __name__ == "__main__":
run_celery()
parser = argparse.ArgumentParser(description="Run background jobs.")
parser.add_argument(
"--no-indexing", action="store_true", help="Do not run indexing process"
)
args = parser.parse_args()
run_jobs(args.no_indexing)