From 3a6d32da7c24a8cc4d6866f0c10a78987f969a07 Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Tue, 19 Mar 2024 16:21:22 -0700 Subject: [PATCH] Port KV Store to Postgres (#1227) --- .../173cae5bba26_port_config_store.py | 29 ++++++++++++++ backend/danswer/configs/app_configs.py | 4 +- .../connectors/gmail/connector_auth.py | 2 +- .../connectors/google_drive/connector_auth.py | 2 +- backend/danswer/danswerbot/slack/tokens.py | 2 +- backend/danswer/db/engine.py | 4 ++ backend/danswer/db/models.py | 8 ++++ backend/danswer/dynamic_configs/__init__.py | 13 ------ backend/danswer/dynamic_configs/factory.py | 16 ++++++++ .../danswer/dynamic_configs/port_configs.py | 40 +++++++++++++++++++ .../{file_system => }/store.py | 40 +++++++++++++++++++ backend/danswer/llm/utils.py | 2 +- backend/danswer/main.py | 3 ++ .../danswer/server/danswer_api/ingestion.py | 2 +- .../danswer/server/manage/administrative.py | 2 +- backend/danswer/utils/acl.py | 2 +- backend/danswer/utils/telemetry.py | 2 +- 17 files changed, 150 insertions(+), 23 deletions(-) create mode 100644 backend/alembic/versions/173cae5bba26_port_config_store.py create mode 100644 backend/danswer/dynamic_configs/factory.py create mode 100644 backend/danswer/dynamic_configs/port_configs.py rename backend/danswer/dynamic_configs/{file_system => }/store.py (52%) diff --git a/backend/alembic/versions/173cae5bba26_port_config_store.py b/backend/alembic/versions/173cae5bba26_port_config_store.py new file mode 100644 index 000000000..4087086bf --- /dev/null +++ b/backend/alembic/versions/173cae5bba26_port_config_store.py @@ -0,0 +1,29 @@ +"""Port Config Store + +Revision ID: 173cae5bba26 +Revises: e50154680a5c +Create Date: 2024-03-19 15:30:44.425436 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "173cae5bba26" +down_revision = "e50154680a5c" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "key_value_store", + sa.Column("key", sa.String(), nullable=False), + sa.Column("value", postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.PrimaryKeyConstraint("key"), + ) + + +def downgrade() -> None: + op.drop_table("key_value_store") diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index cff1b8e5c..08ac2fc23 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -224,8 +224,8 @@ INDEXING_MODEL_SERVER_HOST = ( ##### # Miscellaneous ##### -DYNAMIC_CONFIG_STORE = os.environ.get( - "DYNAMIC_CONFIG_STORE", "FileSystemBackedDynamicConfigStore" +DYNAMIC_CONFIG_STORE = ( + os.environ.get("DYNAMIC_CONFIG_STORE") or "PostgresBackedDynamicConfigStore" ) DYNAMIC_CONFIG_DIR_PATH = os.environ.get("DYNAMIC_CONFIG_DIR_PATH", "/home/storage") JOB_TIMEOUT = 60 * 60 * 6 # 6 hours default diff --git a/backend/danswer/connectors/gmail/connector_auth.py b/backend/danswer/connectors/gmail/connector_auth.py index f6cfa5a74..39dd9aacf 100644 --- a/backend/danswer/connectors/gmail/connector_auth.py +++ b/backend/danswer/connectors/gmail/connector_auth.py @@ -24,7 +24,7 @@ from danswer.connectors.gmail.constants import GMAIL_SERVICE_ACCOUNT_KEY from danswer.connectors.gmail.constants import SCOPES from danswer.db.credentials import update_credential_json from danswer.db.models import User -from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.server.documents.models import CredentialBase from danswer.server.documents.models import GoogleAppCredentials from danswer.server.documents.models import GoogleServiceAccountKey diff --git a/backend/danswer/connectors/google_drive/connector_auth.py b/backend/danswer/connectors/google_drive/connector_auth.py index f65e17772..65c34393c 100644 --- a/backend/danswer/connectors/google_drive/connector_auth.py +++ b/backend/danswer/connectors/google_drive/connector_auth.py @@ -24,7 +24,7 @@ from danswer.connectors.google_drive.constants import GOOGLE_DRIVE_SERVICE_ACCOU from danswer.connectors.google_drive.constants import SCOPES from danswer.db.credentials import update_credential_json from danswer.db.models import User -from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.server.documents.models import CredentialBase from danswer.server.documents.models import GoogleAppCredentials from danswer.server.documents.models import GoogleServiceAccountKey diff --git a/backend/danswer/danswerbot/slack/tokens.py b/backend/danswer/danswerbot/slack/tokens.py index c9c128628..34d2b79a3 100644 --- a/backend/danswer/danswerbot/slack/tokens.py +++ b/backend/danswer/danswerbot/slack/tokens.py @@ -1,7 +1,7 @@ import os from typing import cast -from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.server.manage.models import SlackBotTokens diff --git a/backend/danswer/db/engine.py b/backend/danswer/db/engine.py index 146f11ef8..6803c51aa 100644 --- a/backend/danswer/db/engine.py +++ b/backend/danswer/db/engine.py @@ -10,6 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import Session +from sqlalchemy.orm import sessionmaker from danswer.configs.app_configs import POSTGRES_DB from danswer.configs.app_configs import POSTGRES_HOST @@ -80,3 +81,6 @@ async def get_async_session() -> AsyncGenerator[AsyncSession, None]: get_sqlalchemy_async_engine(), expire_on_commit=False ) as async_session: yield async_session + + +SessionFactory = sessionmaker(bind=get_sqlalchemy_engine()) diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index 5ca3bdbe9..2405ffed6 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -35,6 +35,7 @@ from danswer.configs.constants import DocumentSource from danswer.configs.constants import MessageType from danswer.configs.constants import SearchFeedbackType from danswer.connectors.models import InputType +from danswer.dynamic_configs.interface import JSON_ro from danswer.search.models import RecencyBiasSetting from danswer.search.models import SearchType @@ -851,3 +852,10 @@ class TaskQueueState(Base): register_time: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now() ) + + +class KVStore(Base): + __tablename__ = "key_value_store" + + key: Mapped[str] = mapped_column(String, primary_key=True) + value: Mapped[JSON_ro] = mapped_column(postgresql.JSONB(), nullable=False) diff --git a/backend/danswer/dynamic_configs/__init__.py b/backend/danswer/dynamic_configs/__init__.py index 0fc2233fa..e69de29bb 100644 --- a/backend/danswer/dynamic_configs/__init__.py +++ b/backend/danswer/dynamic_configs/__init__.py @@ -1,13 +0,0 @@ -from danswer.configs.app_configs import DYNAMIC_CONFIG_DIR_PATH -from danswer.configs.app_configs import DYNAMIC_CONFIG_STORE -from danswer.dynamic_configs.file_system.store import FileSystemBackedDynamicConfigStore -from danswer.dynamic_configs.interface import DynamicConfigStore - - -def get_dynamic_config_store() -> DynamicConfigStore: - dynamic_config_store_type = DYNAMIC_CONFIG_STORE - if dynamic_config_store_type == FileSystemBackedDynamicConfigStore.__name__: - return FileSystemBackedDynamicConfigStore(DYNAMIC_CONFIG_DIR_PATH) - - # TODO: change exception type - raise Exception("Unknown dynamic config store type") diff --git a/backend/danswer/dynamic_configs/factory.py b/backend/danswer/dynamic_configs/factory.py new file mode 100644 index 000000000..a82bc315c --- /dev/null +++ b/backend/danswer/dynamic_configs/factory.py @@ -0,0 +1,16 @@ +from danswer.configs.app_configs import DYNAMIC_CONFIG_DIR_PATH +from danswer.configs.app_configs import DYNAMIC_CONFIG_STORE +from danswer.dynamic_configs.interface import DynamicConfigStore +from danswer.dynamic_configs.store import FileSystemBackedDynamicConfigStore +from danswer.dynamic_configs.store import PostgresBackedDynamicConfigStore + + +def get_dynamic_config_store() -> DynamicConfigStore: + dynamic_config_store_type = DYNAMIC_CONFIG_STORE + if dynamic_config_store_type == FileSystemBackedDynamicConfigStore.__name__: + return FileSystemBackedDynamicConfigStore(DYNAMIC_CONFIG_DIR_PATH) + if dynamic_config_store_type == PostgresBackedDynamicConfigStore.__name__: + return PostgresBackedDynamicConfigStore() + + # TODO: change exception type + raise Exception("Unknown dynamic config store type") diff --git a/backend/danswer/dynamic_configs/port_configs.py b/backend/danswer/dynamic_configs/port_configs.py new file mode 100644 index 000000000..34abcff74 --- /dev/null +++ b/backend/danswer/dynamic_configs/port_configs.py @@ -0,0 +1,40 @@ +import json +from pathlib import Path + +from danswer.configs.app_configs import DYNAMIC_CONFIG_DIR_PATH +from danswer.dynamic_configs.factory import PostgresBackedDynamicConfigStore +from danswer.dynamic_configs.interface import ConfigNotFoundError + + +def read_file_system_store(directory_path: str) -> dict: + store = {} + base_path = Path(directory_path) + for file_path in base_path.iterdir(): + if file_path.is_file() and "." not in file_path.name: + with open(file_path, "r") as file: + key = file_path.stem + value = json.load(file) + + if value: + store[key] = value + return store + + +def insert_into_postgres(store_data: dict) -> None: + port_once_key = "file_store_ported" + config_store = PostgresBackedDynamicConfigStore() + try: + config_store.load(port_once_key) + return + except ConfigNotFoundError: + pass + + for key, value in store_data.items(): + config_store.store(key, value) + + config_store.store(port_once_key, True) + + +def port_filesystem_to_postgres(directory_path: str = DYNAMIC_CONFIG_DIR_PATH) -> None: + store_data = read_file_system_store(directory_path) + insert_into_postgres(store_data) diff --git a/backend/danswer/dynamic_configs/file_system/store.py b/backend/danswer/dynamic_configs/store.py similarity index 52% rename from backend/danswer/dynamic_configs/file_system/store.py rename to backend/danswer/dynamic_configs/store.py index 75cc0d740..043d762d4 100644 --- a/backend/danswer/dynamic_configs/file_system/store.py +++ b/backend/danswer/dynamic_configs/store.py @@ -1,10 +1,15 @@ import json import os +from collections.abc import Iterator +from contextlib import contextmanager from pathlib import Path from typing import cast from filelock import FileLock +from sqlalchemy.orm import Session +from danswer.db.engine import SessionFactory +from danswer.db.models import KVStore from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.dynamic_configs.interface import DynamicConfigStore from danswer.dynamic_configs.interface import JSON_ro @@ -46,3 +51,38 @@ class FileSystemBackedDynamicConfigStore(DynamicConfigStore): lock = _get_file_lock(file_path) with lock.acquire(timeout=FILE_LOCK_TIMEOUT): os.remove(file_path) + + +class PostgresBackedDynamicConfigStore(DynamicConfigStore): + @contextmanager + def get_session(self) -> Iterator[Session]: + session: Session = SessionFactory() + try: + yield session + finally: + session.close() + + def store(self, key: str, val: JSON_ro) -> None: + with self.get_session() as session: + obj = session.query(KVStore).filter_by(key=key).first() + if obj: + obj.value = val + else: + obj = KVStore(key=key, value=val) # type: ignore + session.query(KVStore).filter_by(key=key).delete() + session.add(obj) + session.commit() + + def load(self, key: str) -> JSON_ro: + with self.get_session() as session: + obj = session.query(KVStore).filter_by(key=key).first() + if not obj: + raise ConfigNotFoundError + return cast(JSON_ro, obj.value) + + def delete(self, key: str) -> None: + with self.get_session() as session: + result = session.query(KVStore).filter_by(key=key).delete() # type: ignore + if result == 0: + raise ConfigNotFoundError + session.commit() diff --git a/backend/danswer/llm/utils.py b/backend/danswer/llm/utils.py index 5685213b4..b1983180c 100644 --- a/backend/danswer/llm/utils.py +++ b/backend/danswer/llm/utils.py @@ -30,7 +30,7 @@ from danswer.configs.model_configs import GEN_AI_MAX_TOKENS from danswer.configs.model_configs import GEN_AI_MODEL_PROVIDER from danswer.configs.model_configs import GEN_AI_MODEL_VERSION from danswer.db.models import ChatMessage -from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.indexing.models import InferenceChunk from danswer.llm.interfaces import LLM diff --git a/backend/danswer/main.py b/backend/danswer/main.py index ad7bb14b5..e770cc8ab 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -51,6 +51,7 @@ from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import cancel_indexing_attempts_past_model from danswer.db.index_attempt import expire_index_attempts from danswer.document_index.factory import get_default_document_index +from danswer.dynamic_configs.port_configs import port_filesystem_to_postgres from danswer.llm.factory import get_default_llm from danswer.llm.utils import get_default_llm_version from danswer.search.search_nlp_models import warm_up_models @@ -168,6 +169,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator: f"Using multilingual flow with languages: {MULTILINGUAL_QUERY_EXPANSION}" ) + port_filesystem_to_postgres() + with Session(engine) as db_session: db_embedding_model = get_current_db_embedding_model(db_session) secondary_db_embedding_model = get_secondary_db_embedding_model(db_session) diff --git a/backend/danswer/server/danswer_api/ingestion.py b/backend/danswer/server/danswer_api/ingestion.py index 8856e20d6..7fce8d1d3 100644 --- a/backend/danswer/server/danswer_api/ingestion.py +++ b/backend/danswer/server/danswer_api/ingestion.py @@ -19,7 +19,7 @@ from danswer.db.embedding_model import get_secondary_db_embedding_model from danswer.db.engine import get_session from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index -from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.indexing.embedder import DefaultIndexingEmbedder from danswer.indexing.indexing_pipeline import build_indexing_pipeline diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py index dea338eea..26e2dfd54 100644 --- a/backend/danswer/server/manage/administrative.py +++ b/backend/danswer/server/manage/administrative.py @@ -24,7 +24,7 @@ from danswer.db.feedback import update_document_hidden from danswer.db.models import User from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index -from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.llm.exceptions import GenAIDisabledException from danswer.llm.factory import get_default_llm diff --git a/backend/danswer/utils/acl.py b/backend/danswer/utils/acl.py index 268457bfd..8fbadb300 100644 --- a/backend/danswer/utils/acl.py +++ b/backend/danswer/utils/acl.py @@ -11,7 +11,7 @@ from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index from danswer.document_index.interfaces import UpdateRequest from danswer.document_index.vespa.index import VespaIndex -from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.utils.logger import setup_logger diff --git a/backend/danswer/utils/telemetry.py b/backend/danswer/utils/telemetry.py index 65e9f4709..0a21cf66e 100644 --- a/backend/danswer/utils/telemetry.py +++ b/backend/danswer/utils/telemetry.py @@ -6,7 +6,7 @@ from typing import cast import requests from danswer.configs.app_configs import DISABLE_TELEMETRY -from danswer.dynamic_configs import get_dynamic_config_store +from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError CUSTOMER_UUID_KEY = "customer_uuid"