mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-27 20:38:32 +02:00
init sqlalchemy in child process (#2987)
This commit is contained in:
@@ -21,6 +21,8 @@ celery_app.config_from_object("danswer.background.celery.configs.beat")
|
|||||||
@beat_init.connect
|
@beat_init.connect
|
||||||
def on_beat_init(sender: Any, **kwargs: Any) -> None:
|
def on_beat_init(sender: Any, **kwargs: Any) -> None:
|
||||||
logger.info("beat_init signal received.")
|
logger.info("beat_init signal received.")
|
||||||
|
|
||||||
|
# celery beat shouldn't touch the db at all. But just setting a low minimum here.
|
||||||
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
|
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
|
||||||
SqlEngine.init_engine(pool_size=2, max_overflow=0)
|
SqlEngine.init_engine(pool_size=2, max_overflow=0)
|
||||||
app_base.wait_for_redis(sender, **kwargs)
|
app_base.wait_for_redis(sender, **kwargs)
|
||||||
|
@@ -58,7 +58,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
|||||||
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")
|
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")
|
||||||
|
|
||||||
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
|
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
|
||||||
SqlEngine.init_engine(pool_size=8, max_overflow=0)
|
SqlEngine.init_engine(pool_size=4, max_overflow=12)
|
||||||
|
|
||||||
app_base.wait_for_redis(sender, **kwargs)
|
app_base.wait_for_redis(sender, **kwargs)
|
||||||
app_base.on_secondary_worker_init(sender, **kwargs)
|
app_base.on_secondary_worker_init(sender, **kwargs)
|
||||||
|
@@ -166,19 +166,6 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
|
|||||||
r.delete(key)
|
r.delete(key)
|
||||||
|
|
||||||
|
|
||||||
# @worker_process_init.connect
|
|
||||||
# def on_worker_process_init(sender: Any, **kwargs: Any) -> None:
|
|
||||||
# """This only runs inside child processes when the worker is in pool=prefork mode.
|
|
||||||
# This may be technically unnecessary since we're finding prefork pools to be
|
|
||||||
# unstable and currently aren't planning on using them."""
|
|
||||||
# logger.info("worker_process_init signal received.")
|
|
||||||
# SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
|
|
||||||
# SqlEngine.init_engine(pool_size=5, max_overflow=0)
|
|
||||||
|
|
||||||
# # https://stackoverflow.com/questions/43944787/sqlalchemy-celery-with-scoped-session-error
|
|
||||||
# SqlEngine.get_engine().dispose(close=False)
|
|
||||||
|
|
||||||
|
|
||||||
@worker_ready.connect
|
@worker_ready.connect
|
||||||
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
|
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
|
||||||
app_base.on_worker_ready(sender, **kwargs)
|
app_base.on_worker_ready(sender, **kwargs)
|
||||||
|
@@ -11,7 +11,8 @@ from typing import Any
|
|||||||
from typing import Literal
|
from typing import Literal
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from danswer.db.engine import get_sqlalchemy_engine
|
from danswer.configs.constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME
|
||||||
|
from danswer.db.engine import SqlEngine
|
||||||
from danswer.utils.logger import setup_logger
|
from danswer.utils.logger import setup_logger
|
||||||
|
|
||||||
logger = setup_logger()
|
logger = setup_logger()
|
||||||
@@ -37,7 +38,9 @@ def _initializer(
|
|||||||
if kwargs is None:
|
if kwargs is None:
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
|
|
||||||
get_sqlalchemy_engine().dispose(close=False)
|
logger.info("Initializing spawned worker child process.")
|
||||||
|
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
|
||||||
|
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
|
||||||
return func(*args, **kwargs)
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user