mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-03-29 11:12:02 +01:00
Address 'PGRES_TUPLES_OK and no message from the libpq' issues
This commit is contained in:
parent
f5b3333df3
commit
a19290cb27
@ -11,6 +11,7 @@ from typing import Any
|
||||
from typing import Literal
|
||||
from typing import Optional
|
||||
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
@ -24,6 +25,22 @@ JobStatusType = (
|
||||
)
|
||||
|
||||
|
||||
def _initializer(
|
||||
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
|
||||
) -> Any:
|
||||
"""Ensure the parent proc's database connections are not touched
|
||||
in the new connection pool
|
||||
|
||||
Based on the recommended approach in the SQLAlchemy docs found:
|
||||
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
|
||||
"""
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
get_sqlalchemy_engine().dispose(close=False)
|
||||
return func(*args, **kwargs)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SimpleJob:
|
||||
"""Drop in replacement for `dask.distributed.Future`"""
|
||||
@ -94,7 +111,7 @@ class SimpleJobClient:
|
||||
job_id = self.job_id_counter
|
||||
self.job_id_counter += 1
|
||||
|
||||
process = Process(target=func, args=args, daemon=True)
|
||||
process = Process(target=_initializer(func=func, args=args), daemon=True)
|
||||
job = SimpleJob(id=job_id, process=process)
|
||||
process.start()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user