diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 22529a66c2..5e767dfbef 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -1,5 +1,4 @@ import logging -import multiprocessing import time from typing import Any @@ -163,7 +162,10 @@ def on_task_postrun( def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: """The first signal sent on celery worker startup""" - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn + # rkuo: commenting out as set_start_method seems to work here on macOS + # but not in the cloud and it is unclear why. + # logger.info(f"Multiprocessing start method - setting to spawn.") + # multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn def wait_for_redis(sender: Any, **kwargs: Any) -> None: diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index f45e6df9aa..ee8958e7dd 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -56,6 +56,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 9262b632dc..46282772ff 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -57,6 +57,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index e6567b1477..11f1341a1e 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -56,7 +56,9 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") + logger.info(f"Concurrency: {sender.concurrency}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME) SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index caa697f883..af2105b8c6 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -80,6 +80,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 9fd73972d0..b29dd1e8a0 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -1,3 +1,4 @@ +import multiprocessing import os import sys import time @@ -853,11 +854,14 @@ def connector_indexing_proxy_task( search_settings_id: int, tenant_id: str | None, ) -> None: - """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" + """celery tasks are forked, but forking is unstable. + This is a thread that proxies work to a spawned task.""" + task_logger.info( f"Indexing watchdog - starting: attempt={index_attempt_id} " f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" + f"search_settings={search_settings_id} " + f"multiprocessing={multiprocessing.get_start_method()}" ) if not self.request.id: