From a19290cb27f90379b698e93c85115f7128d94cef Mon Sep 17 00:00:00 2001 From: Weves Date: Sat, 27 Apr 2024 14:33:14 -0700 Subject: [PATCH] Address 'PGRES_TUPLES_OK and no message from the libpq' issues --- .../danswer/background/indexing/job_client.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/backend/danswer/background/indexing/job_client.py b/backend/danswer/background/indexing/job_client.py index 6b1344b59..e9ddad58e 100644 --- a/backend/danswer/background/indexing/job_client.py +++ b/backend/danswer/background/indexing/job_client.py @@ -11,6 +11,7 @@ from typing import Any from typing import Literal from typing import Optional +from danswer.db.engine import get_sqlalchemy_engine from danswer.utils.logger import setup_logger logger = setup_logger() @@ -24,6 +25,22 @@ JobStatusType = ( ) +def _initializer( + func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None +) -> Any: + """Ensure the parent proc's database connections are not touched + in the new connection pool + + Based on the recommended approach in the SQLAlchemy docs found: + https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork + """ + if kwargs is None: + kwargs = {} + + get_sqlalchemy_engine().dispose(close=False) + return func(*args, **kwargs) + + @dataclass class SimpleJob: """Drop in replacement for `dask.distributed.Future`""" @@ -94,7 +111,7 @@ class SimpleJobClient: job_id = self.job_id_counter self.job_id_counter += 1 - process = Process(target=func, args=args, daemon=True) + process = Process(target=_initializer(func=func, args=args), daemon=True) job = SimpleJob(id=job_id, process=process) process.start()