From 962240031fc2c0616a69f5efcb43c9fe8cd6ad00 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Thu, 9 Jan 2025 16:29:37 -0800 Subject: [PATCH 01/13] figuring out why multiprocessing set_start_method isn't working. --- backend/onyx/background/celery/apps/app_base.py | 6 ++++-- backend/onyx/background/celery/apps/heavy.py | 1 + backend/onyx/background/celery/apps/indexing.py | 1 + backend/onyx/background/celery/apps/light.py | 2 ++ backend/onyx/background/celery/apps/primary.py | 1 + backend/onyx/background/celery/tasks/indexing/tasks.py | 8 ++++++-- 6 files changed, 15 insertions(+), 4 deletions(-) diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 22529a66c2..5e767dfbef 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -1,5 +1,4 @@ import logging -import multiprocessing import time from typing import Any @@ -163,7 +162,10 @@ def on_task_postrun( def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: """The first signal sent on celery worker startup""" - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn + # rkuo: commenting out as set_start_method seems to work here on macOS + # but not in the cloud and it is unclear why. + # logger.info(f"Multiprocessing start method - setting to spawn.") + # multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn def wait_for_redis(sender: Any, **kwargs: Any) -> None: diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index f45e6df9aa..ee8958e7dd 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -56,6 +56,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 9262b632dc..46282772ff 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -57,6 +57,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index e6567b1477..11f1341a1e 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -56,7 +56,9 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") + logger.info(f"Concurrency: {sender.concurrency}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME) SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index caa697f883..af2105b8c6 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -80,6 +80,7 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 9fd73972d0..b29dd1e8a0 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -1,3 +1,4 @@ +import multiprocessing import os import sys import time @@ -853,11 +854,14 @@ def connector_indexing_proxy_task( search_settings_id: int, tenant_id: str | None, ) -> None: - """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" + """celery tasks are forked, but forking is unstable. + This is a thread that proxies work to a spawned task.""" + task_logger.info( f"Indexing watchdog - starting: attempt={index_attempt_id} " f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" + f"search_settings={search_settings_id} " + f"multiprocessing={multiprocessing.get_start_method()}" ) if not self.request.id: From ac182c74b3862ba75097f6f346b7d39357b5878d Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 10 Jan 2025 12:11:33 -0800 Subject: [PATCH 02/13] log all start methods --- backend/onyx/background/celery/apps/indexing.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 46282772ff..818277ab47 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -57,6 +57,10 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + + all_start_methods: list[str] = multiprocessing.get_all_start_methods() + logger.info(f"Multiprocessing all start methods: {all_start_methods}") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") From b6c2ecfecb88d786129a6b3c6cf81699e2b6b997 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 10 Jan 2025 12:16:13 -0800 Subject: [PATCH 03/13] more debugging of start method --- backend/onyx/background/celery/apps/heavy.py | 8 +++++++- backend/onyx/background/celery/apps/indexing.py | 4 +++- backend/onyx/background/celery/apps/light.py | 9 ++++++++- backend/onyx/background/celery/apps/primary.py | 8 +++++++- 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index ee8958e7dd..1a8b658744 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -56,8 +56,14 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + + all_start_methods: list[str] = multiprocessing.get_all_start_methods() + logger.info(f"Multiprocessing all start methods: {all_start_methods}") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") + logger.info( + f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" + ) SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) SqlEngine.init_engine(pool_size=4, max_overflow=12) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 818277ab47..1db1641ae5 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -62,7 +62,9 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info(f"Multiprocessing all start methods: {all_start_methods}") multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") + logger.info( + f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" + ) SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index 11f1341a1e..73058ad321 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -56,8 +56,15 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + + all_start_methods: list[str] = multiprocessing.get_all_start_methods() + logger.info(f"Multiprocessing all start methods: {all_start_methods}") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") + logger.info( + f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" + ) + logger.info(f"Concurrency: {sender.concurrency}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index af2105b8c6..23f2485279 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -80,8 +80,14 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None @worker_init.connect def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") + + all_start_methods: list[str] = multiprocessing.get_all_start_methods() + logger.info(f"Multiprocessing all start methods: {all_start_methods}") + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}") + logger.info( + f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" + ) SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) SqlEngine.init_engine(pool_size=8, max_overflow=0) From 2163a138ed57d604d97d8f3ed9fa6daa48be9a59 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 10 Jan 2025 12:41:05 -0800 Subject: [PATCH 04/13] logging --- backend/onyx/background/celery/tasks/indexing/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index b29dd1e8a0..771ee8e709 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -861,7 +861,7 @@ def connector_indexing_proxy_task( f"Indexing watchdog - starting: attempt={index_attempt_id} " f"cc_pair={cc_pair_id} " f"search_settings={search_settings_id} " - f"multiprocessing={multiprocessing.get_start_method()}" + f"mp_start_method={multiprocessing.get_start_method()}" ) if not self.request.id: From 384a38418bce7b1fa57091765585a8f7c50e4846 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 10 Jan 2025 12:59:34 -0800 Subject: [PATCH 05/13] test set_spawn_method and handle exceptions --- backend/onyx/background/celery/apps/heavy.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index 1a8b658744..c49ccfa751 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -60,7 +60,17 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: all_start_methods: list[str] = multiprocessing.get_all_start_methods() logger.info(f"Multiprocessing all start methods: {all_start_methods}") - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn + try: + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn + except Exception: + logger.info("multiprocessing.set_start_method exceptioned.") + try: + multiprocessing.set_start_method( + "spawn", force=True + ) # fork is unsafe, set to spawn + except Exception: + logger.info("multiprocessing.set_start_method force=True exceptioned.") + logger.info( f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" ) From ccef35028700e9467e9ac501f329ed84ede7e963 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Fri, 10 Jan 2025 14:19:31 -0800 Subject: [PATCH 06/13] try using spawn specifically --- backend/onyx/background/indexing/job_client.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/backend/onyx/background/indexing/job_client.py b/backend/onyx/background/indexing/job_client.py index 444894f8d6..c99e12fbc7 100644 --- a/backend/onyx/background/indexing/job_client.py +++ b/backend/onyx/background/indexing/job_client.py @@ -4,9 +4,10 @@ not follow the expected behavior, etc. NOTE: cannot use Celery directly due to https://github.com/celery/celery/issues/7007#issuecomment-1740139367""" +import multiprocessing as mp from collections.abc import Callable from dataclasses import dataclass -from multiprocessing import Process +from multiprocessing.context import SpawnProcess from typing import Any from typing import Literal from typing import Optional @@ -63,7 +64,7 @@ class SimpleJob: """Drop in replacement for `dask.distributed.Future`""" id: int - process: Optional["Process"] = None + process: Optional["SpawnProcess"] = None def cancel(self) -> bool: return self.release() @@ -131,7 +132,8 @@ class SimpleJobClient: job_id = self.job_id_counter self.job_id_counter += 1 - process = Process(target=_run_in_process, args=(func, args), daemon=True) + ctx = mp.get_context("spawn") + process = ctx.Process(target=_run_in_process, args=(func, args), daemon=True) job = SimpleJob(id=job_id, process=process) process.start() From f5bdf9d2c9f5a2130c98a22e20b9c89d7ae8efcc Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 13 Jan 2025 02:46:03 -0800 Subject: [PATCH 07/13] move to celeryd_init --- .../onyx/background/celery/apps/app_base.py | 23 +++++++++++++++++++ backend/onyx/background/celery/apps/light.py | 9 -------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 5e767dfbef..440b7cba32 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -1,4 +1,5 @@ import logging +import multiprocessing import time from typing import Any @@ -167,6 +168,28 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None # logger.info(f"Multiprocessing start method - setting to spawn.") # multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn + all_start_methods: list[str] = multiprocessing.get_all_start_methods() + logger.info(f"Multiprocessing all start methods: {all_start_methods}") + + try: + multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn + except Exception: + logger.info( + "multiprocessing.set_start_method exceptioned. Trying force=True..." + ) + try: + multiprocessing.set_start_method( + "spawn", force=True + ) # fork is unsafe, set to spawn + except Exception: + logger.info( + "multiprocessing.set_start_method force=True exceptioned even with force=True." + ) + + logger.info( + f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" + ) + def wait_for_redis(sender: Any, **kwargs: Any) -> None: """Waits for redis to become ready subject to a hardcoded timeout. diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index 73058ad321..695bda69cc 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -1,4 +1,3 @@ -import multiprocessing from typing import Any from celery import Celery @@ -57,14 +56,6 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") - all_start_methods: list[str] = multiprocessing.get_all_start_methods() - logger.info(f"Multiprocessing all start methods: {all_start_methods}") - - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info( - f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" - ) - logger.info(f"Concurrency: {sender.concurrency}") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME) From be3cfdd4a6bd95d195b5948cc1e5822df39282dd Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 13 Jan 2025 10:46:20 -0800 Subject: [PATCH 08/13] saved files --- backend/onyx/background/celery/apps/heavy.py | 19 ------------------- .../onyx/background/celery/apps/indexing.py | 13 +------------ .../onyx/background/celery/apps/primary.py | 9 --------- 3 files changed, 1 insertion(+), 40 deletions(-) diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index c49ccfa751..7216e858d4 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -1,4 +1,3 @@ -import multiprocessing from typing import Any from celery import Celery @@ -57,24 +56,6 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") - all_start_methods: list[str] = multiprocessing.get_all_start_methods() - logger.info(f"Multiprocessing all start methods: {all_start_methods}") - - try: - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - except Exception: - logger.info("multiprocessing.set_start_method exceptioned.") - try: - multiprocessing.set_start_method( - "spawn", force=True - ) # fork is unsafe, set to spawn - except Exception: - logger.info("multiprocessing.set_start_method force=True exceptioned.") - - logger.info( - f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" - ) - SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) SqlEngine.init_engine(pool_size=4, max_overflow=12) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 1db1641ae5..0c116984f7 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -1,4 +1,3 @@ -import multiprocessing from typing import Any from celery import Celery @@ -58,21 +57,11 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") - all_start_methods: list[str] = multiprocessing.get_all_start_methods() - logger.info(f"Multiprocessing all start methods: {all_start_methods}") - - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info( - f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" - ) - SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) # rkuo: been seeing transient connection exceptions here, so upping the connection count # from just concurrency/concurrency to concurrency/concurrency*2 - SqlEngine.init_engine( - pool_size=sender.concurrency, max_overflow=sender.concurrency * 2 - ) + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 23f2485279..b4f9868ac5 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -1,5 +1,4 @@ import logging -import multiprocessing from typing import Any from typing import cast @@ -81,14 +80,6 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None def on_worker_init(sender: Any, **kwargs: Any) -> None: logger.info("worker_init signal received.") - all_start_methods: list[str] = multiprocessing.get_all_start_methods() - logger.info(f"Multiprocessing all start methods: {all_start_methods}") - - multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn - logger.info( - f"Multiprocessing selected start method: {multiprocessing.get_start_method()}" - ) - SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) SqlEngine.init_engine(pool_size=8, max_overflow=0) From 9a09222b7d42f00255206b340798a62e27211aab Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 13 Jan 2025 10:58:33 -0800 Subject: [PATCH 09/13] add comments --- backend/onyx/background/celery/apps/app_base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 440b7cba32..3f0d50950b 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -163,10 +163,10 @@ def on_task_postrun( def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: """The first signal sent on celery worker startup""" - # rkuo: commenting out as set_start_method seems to work here on macOS - # but not in the cloud and it is unclear why. - # logger.info(f"Multiprocessing start method - setting to spawn.") - # multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn + + # NOTE(rkuo): start method "fork" is unsafe and we really need it to be "spawn" + # But something is blocking set_start_method from working in the cloud unless + # force=True. so we use force=True as a fallback. all_start_methods: list[str] = multiprocessing.get_all_start_methods() logger.info(f"Multiprocessing all start methods: {all_start_methods}") From b6dd999c1b795484ba222fbbfe371a7baff84c79 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 13 Jan 2025 11:31:57 -0800 Subject: [PATCH 10/13] add some type hints --- backend/onyx/background/celery/apps/app_base.py | 2 +- backend/onyx/background/celery/apps/heavy.py | 7 ++++--- backend/onyx/background/celery/apps/indexing.py | 7 ++++--- backend/onyx/background/celery/apps/light.py | 10 +++++----- backend/onyx/background/celery/apps/primary.py | 7 ++++--- 5 files changed, 18 insertions(+), 15 deletions(-) diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 3f0d50950b..9b320aae42 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -161,7 +161,7 @@ def on_task_postrun( return -def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: +def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: """The first signal sent on celery worker startup""" # NOTE(rkuo): start method "fork" is unsafe and we really need it to be "spawn" diff --git a/backend/onyx/background/celery/apps/heavy.py b/backend/onyx/background/celery/apps/heavy.py index 7216e858d4..4854940fd9 100644 --- a/backend/onyx/background/celery/apps/heavy.py +++ b/backend/onyx/background/celery/apps/heavy.py @@ -3,6 +3,7 @@ from typing import Any from celery import Celery from celery import signals from celery import Task +from celery.apps.worker import Worker from celery.signals import celeryd_init from celery.signals import worker_init from celery.signals import worker_ready @@ -48,16 +49,16 @@ def on_task_postrun( @celeryd_init.connect -def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: +def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: app_base.on_celeryd_init(sender, conf, **kwargs) @worker_init.connect -def on_worker_init(sender: Any, **kwargs: Any) -> None: +def on_worker_init(sender: Worker, **kwargs: Any) -> None: logger.info("worker_init signal received.") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME) - SqlEngine.init_engine(pool_size=4, max_overflow=12) + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) # type: ignore app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 0c116984f7..89681ea741 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -3,6 +3,7 @@ from typing import Any from celery import Celery from celery import signals from celery import Task +from celery.apps.worker import Worker from celery.signals import celeryd_init from celery.signals import worker_init from celery.signals import worker_process_init @@ -49,19 +50,19 @@ def on_task_postrun( @celeryd_init.connect -def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: +def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: app_base.on_celeryd_init(sender, conf, **kwargs) @worker_init.connect -def on_worker_init(sender: Any, **kwargs: Any) -> None: +def on_worker_init(sender: Worker, **kwargs: Any) -> None: logger.info("worker_init signal received.") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) # rkuo: been seeing transient connection exceptions here, so upping the connection count # from just concurrency/concurrency to concurrency/concurrency*2 - SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) # type: ignore app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/light.py b/backend/onyx/background/celery/apps/light.py index 695bda69cc..abc2cfab12 100644 --- a/backend/onyx/background/celery/apps/light.py +++ b/backend/onyx/background/celery/apps/light.py @@ -3,6 +3,7 @@ from typing import Any from celery import Celery from celery import signals from celery import Task +from celery.apps.worker import Worker from celery.signals import celeryd_init from celery.signals import worker_init from celery.signals import worker_ready @@ -14,7 +15,6 @@ from onyx.db.engine import SqlEngine from onyx.utils.logger import setup_logger from shared_configs.configs import MULTI_TENANT - logger = setup_logger() celery_app = Celery(__name__) @@ -48,18 +48,18 @@ def on_task_postrun( @celeryd_init.connect -def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: +def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: app_base.on_celeryd_init(sender, conf, **kwargs) @worker_init.connect -def on_worker_init(sender: Any, **kwargs: Any) -> None: +def on_worker_init(sender: Worker, **kwargs: Any) -> None: logger.info("worker_init signal received.") - logger.info(f"Concurrency: {sender.concurrency}") + logger.info(f"Concurrency: {sender.concurrency}") # type: ignore SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME) - SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) # type: ignore app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index b4f9868ac5..8056e3d5e1 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -6,6 +6,7 @@ from celery import bootsteps # type: ignore from celery import Celery from celery import signals from celery import Task +from celery.apps.worker import Worker from celery.exceptions import WorkerShutdown from celery.signals import celeryd_init from celery.signals import worker_init @@ -72,12 +73,12 @@ def on_task_postrun( @celeryd_init.connect -def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None: +def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: app_base.on_celeryd_init(sender, conf, **kwargs) @worker_init.connect -def on_worker_init(sender: Any, **kwargs: Any) -> None: +def on_worker_init(sender: Worker, **kwargs: Any) -> None: logger.info("worker_init signal received.") SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) @@ -133,7 +134,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None: raise WorkerShutdown("Primary worker lock could not be acquired!") # tacking on our own user data to the sender - sender.primary_worker_lock = lock + sender.primary_worker_lock = lock # type: ignore # As currently designed, when this worker starts as "primary", we reinitialize redis # to a clean state (for our purposes, anyway) From d96d2fc6e963f0f5c1993a268814f5dce4685a24 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 13 Jan 2025 11:35:58 -0800 Subject: [PATCH 11/13] add comment --- backend/onyx/background/indexing/job_client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/onyx/background/indexing/job_client.py b/backend/onyx/background/indexing/job_client.py index c99e12fbc7..a679eebe7f 100644 --- a/backend/onyx/background/indexing/job_client.py +++ b/backend/onyx/background/indexing/job_client.py @@ -132,6 +132,8 @@ class SimpleJobClient: job_id = self.job_id_counter self.job_id_counter += 1 + # this approach allows us to always "spawn" a new process regardless of + # get_start_method's current setting ctx = mp.get_context("spawn") process = ctx.Process(target=_run_in_process, args=(func, args), daemon=True) job = SimpleJob(id=job_id, process=process) From 4f8e48df7c8497f53d6df96e81a437b95fe7f8c6 Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 13 Jan 2025 11:50:04 -0800 Subject: [PATCH 12/13] try more sql settings --- backend/onyx/background/celery/apps/indexing.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index 89681ea741..a241ff19b0 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -60,9 +60,13 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) - # rkuo: been seeing transient connection exceptions here, so upping the connection count - # from just concurrency/concurrency to concurrency/concurrency*2 - SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) # type: ignore + # rkuo: Transient errors keep happening in the worker threads for indexing + # "SSL connection has been closed unexpectedly" + # fixing spawn method didn't help (although it seemed like it should) + # setting pre ping might help. + SqlEngine.init_engine( + pool_size=sender.concurrency, max_overflow=8, pool_pre_ping=True + ) # type: ignore app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) From 7d86b2833585fce6c553504fd3201e8423648ebe Mon Sep 17 00:00:00 2001 From: "Richard Kuo (Danswer)" Date: Mon, 13 Jan 2025 12:14:32 -0800 Subject: [PATCH 13/13] maybe we don't need pre ping yet --- backend/onyx/background/celery/apps/app_base.py | 4 ++-- backend/onyx/background/celery/apps/indexing.py | 10 ++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 9b320aae42..40a98f38ab 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -175,7 +175,7 @@ def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn except Exception: logger.info( - "multiprocessing.set_start_method exceptioned. Trying force=True..." + "Multiprocessing set_start_method exceptioned. Trying force=True..." ) try: multiprocessing.set_start_method( @@ -183,7 +183,7 @@ def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: ) # fork is unsafe, set to spawn except Exception: logger.info( - "multiprocessing.set_start_method force=True exceptioned even with force=True." + "Multiprocessing set_start_method force=True exceptioned even with force=True." ) logger.info( diff --git a/backend/onyx/background/celery/apps/indexing.py b/backend/onyx/background/celery/apps/indexing.py index a241ff19b0..e222da5e3b 100644 --- a/backend/onyx/background/celery/apps/indexing.py +++ b/backend/onyx/background/celery/apps/indexing.py @@ -60,13 +60,11 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME) - # rkuo: Transient errors keep happening in the worker threads for indexing + # rkuo: Transient errors keep happening in the indexing watchdog threads. # "SSL connection has been closed unexpectedly" - # fixing spawn method didn't help (although it seemed like it should) - # setting pre ping might help. - SqlEngine.init_engine( - pool_size=sender.concurrency, max_overflow=8, pool_pre_ping=True - ) # type: ignore + # actually setting the spawn method in the cloud fixes 95% of these. + # setting pre ping might help even more, but not worrying about that yet + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8) # type: ignore app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs)