diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index c49ccfa75..7216e858d 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -1,4 +1,3 @@ -import multiprocessing from typing import Any from celery import Celery @@ -57,24 +56,6 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") - all_start_methods: list[str] = multiprocessing.get_all_start_methods() - logger.info(f"Multiprocessing all start methods: {all_start_methods}") - - try: - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - except Exception: - logger.info("multiprocessing.set_start_method exceptioned.") - try: - multiprocessing.set_start_method( - "spawn", force=True - ) # fork is unsafe, set to spawn - except Exception: - logger.info("multiprocessing.set_start_method force=True exceptioned.") - - logger.info( - f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" - ) - SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) SqlEngine.init_engine(pool_size=4, max_overflow=12) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 1db1641ae..0c116984f 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -1,4 +1,3 @@ -import multiprocessing from typing import Any from celery import Celery @@ -58,21 +57,11 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") - all_start_methods: list[str] = multiprocessing.get_all_start_methods() - logger.info(f"Multiprocessing all start methods: {all_start_methods}") - - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info( - f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" - ) - SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) # rkuo: been seeing transient connection exceptions here, so upping the connection count # from just concurrency/concurrency to concurrency/concurrency*2 - SqlEngine.init_engine( - pool_size=sender.concurrency, max_overflow=sender.concurrency * 2 - ) + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 23f248527..b4f9868ac 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -1,5 +1,4 @@ import logging -import multiprocessing from typing import Any from typing import cast @@ -81,14 +80,6 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") - all_start_methods: list[str] = multiprocessing.get_all_start_methods() - logger.info(f"Multiprocessing all start methods: {all_start_methods}") - - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info( - f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" - ) - SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) SqlEngine.init_engine(pool_size=8, max_overflow=0)