move to celeryd_init

This commit is contained in:
Richard Kuo (Danswer) 2025-01-13 02:46:03 -08:00
parent ccef350287
commit f5bdf9d2c9
2 changed files with 23 additions and 9 deletions

View File

@ -1,4 +1,5 @@
import logging
import multiprocessing
import time
from typing import Any
@ -167,6 +168,28 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
# logger.info(f"Multiprocessing start method - setting to spawn.")
# multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
all_start_methods: list[str] = multiprocessing.get_all_start_methods()
logger.info(f"Multiprocessing all start methods: {all_start_methods}")
try:
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
except Exception:
logger.info(
"multiprocessing.set_start_method exceptioned. Trying force=True..."
)
try:
multiprocessing.set_start_method(
"spawn", force=True
) # fork is unsafe, set to spawn
except Exception:
logger.info(
"multiprocessing.set_start_method force=True exceptioned even with force=True."
)
logger.info(
f"Multiprocessing selected start method: {multiprocessing.get_start_method()}"
)
def wait_for_redis(sender: Any, **kwargs: Any) -> None:
"""Waits for redis to become ready subject to a hardcoded timeout.

View File

@ -1,4 +1,3 @@
import multiprocessing
from typing import Any
from celery import Celery
@ -57,14 +56,6 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
all_start_methods: list[str] = multiprocessing.get_all_start_methods()
logger.info(f"Multiprocessing all start methods: {all_start_methods}")
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
logger.info(
f"Multiprocessing selected start method: {multiprocessing.get_start_method()}"
)
logger.info(f"Concurrency: {sender.concurrency}")
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_LIGHT_APP_NAME)