saved files

This commit is contained in:
Richard Kuo (Danswer) 2025-01-13 10:46:20 -08:00
parent f5bdf9d2c9
commit be3cfdd4a6
3 changed files with 1 additions and 40 deletions

View File

@ -1,4 +1,3 @@
import multiprocessing
from typing import Any
from celery import Celery
@ -57,24 +56,6 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
all_start_methods: list[str] = multiprocessing.get_all_start_methods()
logger.info(f"Multiprocessing all start methods: {all_start_methods}")
try:
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
except Exception:
logger.info("multiprocessing.set_start_method exceptioned.")
try:
multiprocessing.set_start_method(
"spawn", force=True
) # fork is unsafe, set to spawn
except Exception:
logger.info("multiprocessing.set_start_method force=True exceptioned.")
logger.info(
f"Multiprocessing selected start method: {multiprocessing.get_start_method()}"
)
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
SqlEngine.init_engine(pool_size=4, max_overflow=12)

View File

@ -1,4 +1,3 @@
import multiprocessing
from typing import Any
from celery import Celery
@ -58,21 +57,11 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
all_start_methods: list[str] = multiprocessing.get_all_start_methods()
logger.info(f"Multiprocessing all start methods: {all_start_methods}")
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
logger.info(
f"Multiprocessing selected start method: {multiprocessing.get_start_method()}"
)
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_APP_NAME)
# rkuo: been seeing transient connection exceptions here, so upping the connection count
# from just concurrency/concurrency to concurrency/concurrency*2
SqlEngine.init_engine(
pool_size=sender.concurrency, max_overflow=sender.concurrency * 2
)
SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=8)
app_base.wait_for_redis(sender, **kwargs)
app_base.wait_for_db(sender, **kwargs)

View File

@ -1,5 +1,4 @@
import logging
import multiprocessing
from typing import Any
from typing import cast
@ -81,14 +80,6 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None
def on_worker_init(sender: Any, **kwargs: Any) -> None:
logger.info("worker_init signal received.")
all_start_methods: list[str] = multiprocessing.get_all_start_methods()
logger.info(f"Multiprocessing all start methods: {all_start_methods}")
multiprocessing.set_start_method("spawn") # fork is unsafe, set to spawn
logger.info(
f"Multiprocessing selected start method: {multiprocessing.get_start_method()}"
)
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0)