mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-03-27 10:13:05 +01:00
* first cut at redis * some new helper functions for the db * ignore kombu tables in alembic migrations (used by celery) * multiline commands for readability, add vespa_metadata_sync queue to worker * typo fix * fix returning tuple fields * add constants * fix _get_access_for_document * docstrings! * fix double function declaration and typing * fix type hinting * add a global redis pool * Add get_document function * use task_logger in various celery tasks * add celeryconfig.py to simplify configuration. Will be used in a subsequent commit * Add celery redis helper. used in a subsequent PR * kombu warning getting spammy since celery is not self managing its queue in Postgres any more * add last_modified and last_synced to documents * fix task naming convention * use celeryconfig.py * the big one. adds queues and tasks, updates functions to use the queues with priorities, etc * change vespa index log line to debug * mypy fixes * update alembic migration * fix fence ordering, rename to "monitor", fix fetch_versioned_implementation call * mypy * switch to monotonic time * fix startup dependencies on redis * rebase alembic migration * kombu cleanup - fail silently * mypy * add redis_host environment override * update REDIS_HOST env var in docker-compose.dev.yml * update the rest of the docker files * in flight * harden indexing-status endpoint against db changes happening in the background. Needs further improvement but OK for now. * allow no task syncs to run because we create certain objects with no entries but initially marked as out of date * add back writing to vespa on indexing * actually working connector deletion * update contributing guide * backporting fixes from background_deletion * renaming cache to cache_volume * add redis password to various deployments * try setting up pr testing for helm * fix indent * hopefully this release version actually exists * fix command line option to --chart-dirs * fetch-depth 0 * edit values.yaml * try setting ct working directory * bypass testing only on change for now * move files and lint them * update helm testing * some issues suggest using --config works * add vespa repo * add postgresql repo * increase timeout * try amd64 runner * fix redis password reference * add comment to helm chart testing workflow * rename helm testing workflow to disable it * adding clarifying comments * address code review * missed a file * remove commented warning ... just not needed * fix imports * refactor to use update_single * mypy fixes * add vespa test * multiple celery workers * update logs as well and set prefetch multipliers appropriate to the worker intent * add db refresh to connector deletion * add some preliminary locking * organize tasks into separate files * celery auto associates tasks created inside another task, which bloats the result metadata considerably. trail=False prevents this. * code review fixes * move monitor_usergroup_taskset to ee, improve logging * add multi workers to dev_run_background_jobs.py * update supervisord with some recommended settings for celery * name celery workers and shorten dev script prefixing * add configurable sql alchemy engine settings on startup (needed for various intents like API server, different celery workers and tasks, etc) * fix comments * autoscale sqlalchemy pool size to celery concurrency (allow override later?) * supervisord needs the percent symbols escaped * use name as primary check, some minor refactoring and type hinting too. * addressing code review * fix import * fix prune_documents_task references --------- Co-authored-by: Richard Kuo <rkuo@rkuo.com>
160 lines
4.1 KiB
Python
160 lines
4.1 KiB
Python
import argparse
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
|
|
|
|
def monitor_process(process_name: str, process: subprocess.Popen) -> None:
|
|
assert process.stdout is not None
|
|
|
|
while True:
|
|
output = process.stdout.readline()
|
|
|
|
if output:
|
|
print(f"{process_name}: {output.strip()}")
|
|
|
|
if process.poll() is not None:
|
|
break
|
|
|
|
|
|
def run_jobs(exclude_indexing: bool) -> None:
|
|
# command setup
|
|
cmd_worker_primary = [
|
|
"celery",
|
|
"-A",
|
|
"ee.danswer.background.celery.celery_app",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=6",
|
|
"--loglevel=INFO",
|
|
"-n",
|
|
"primary@%n",
|
|
"-Q",
|
|
"celery",
|
|
]
|
|
|
|
cmd_worker_light = [
|
|
"celery",
|
|
"-A",
|
|
"ee.danswer.background.celery.celery_app",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=16",
|
|
"--loglevel=INFO",
|
|
"-n",
|
|
"light@%n",
|
|
"-Q",
|
|
"vespa_metadata_sync,connector_deletion",
|
|
]
|
|
|
|
cmd_worker_heavy = [
|
|
"celery",
|
|
"-A",
|
|
"ee.danswer.background.celery.celery_app",
|
|
"worker",
|
|
"--pool=threads",
|
|
"--concurrency=6",
|
|
"--loglevel=INFO",
|
|
"-n",
|
|
"heavy@%n",
|
|
"-Q",
|
|
"connector_pruning",
|
|
]
|
|
|
|
cmd_beat = [
|
|
"celery",
|
|
"-A",
|
|
"ee.danswer.background.celery.celery_app",
|
|
"beat",
|
|
"--loglevel=INFO",
|
|
]
|
|
|
|
# spawn processes
|
|
worker_primary_process = subprocess.Popen(
|
|
cmd_worker_primary, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
worker_light_process = subprocess.Popen(
|
|
cmd_worker_light, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
worker_heavy_process = subprocess.Popen(
|
|
cmd_worker_heavy, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
beat_process = subprocess.Popen(
|
|
cmd_beat, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
|
|
)
|
|
|
|
# monitor threads
|
|
worker_primary_thread = threading.Thread(
|
|
target=monitor_process, args=("PRIMARY", worker_primary_process)
|
|
)
|
|
worker_light_thread = threading.Thread(
|
|
target=monitor_process, args=("LIGHT", worker_light_process)
|
|
)
|
|
worker_heavy_thread = threading.Thread(
|
|
target=monitor_process, args=("HEAVY", worker_heavy_process)
|
|
)
|
|
beat_thread = threading.Thread(target=monitor_process, args=("BEAT", beat_process))
|
|
|
|
worker_primary_thread.start()
|
|
worker_light_thread.start()
|
|
worker_heavy_thread.start()
|
|
beat_thread.start()
|
|
|
|
if not exclude_indexing:
|
|
update_env = os.environ.copy()
|
|
update_env["PYTHONPATH"] = "."
|
|
cmd_indexing = ["python", "danswer/background/update.py"]
|
|
|
|
indexing_process = subprocess.Popen(
|
|
cmd_indexing,
|
|
env=update_env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
|
|
indexing_thread = threading.Thread(
|
|
target=monitor_process, args=("INDEXING", indexing_process)
|
|
)
|
|
|
|
indexing_thread.start()
|
|
indexing_thread.join()
|
|
try:
|
|
update_env = os.environ.copy()
|
|
update_env["PYTHONPATH"] = "."
|
|
cmd_perm_sync = ["python", "ee/danswer/background/permission_sync.py"]
|
|
|
|
indexing_process = subprocess.Popen(
|
|
cmd_perm_sync,
|
|
env=update_env,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
|
|
perm_sync_thread = threading.Thread(
|
|
target=monitor_process, args=("INDEXING", indexing_process)
|
|
)
|
|
perm_sync_thread.start()
|
|
perm_sync_thread.join()
|
|
except Exception:
|
|
pass
|
|
|
|
worker_primary_thread.join()
|
|
worker_light_thread.join()
|
|
worker_heavy_thread.join()
|
|
beat_thread.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(description="Run background jobs.")
|
|
parser.add_argument(
|
|
"--no-indexing", action="store_true", help="Do not run indexing process"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
run_jobs(args.no_indexing)
|