Port KV Store to Postgres (#1227)

This commit is contained in:
Yuhong Sun 2024-03-19 16:21:22 -07:00 committed by GitHub
parent fab2be510a
commit 3a6d32da7c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 150 additions and 23 deletions

View File

@ -0,0 +1,29 @@
"""Port Config Store
Revision ID: 173cae5bba26
Revises: e50154680a5c
Create Date: 2024-03-19 15:30:44.425436
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "173cae5bba26"
down_revision = "e50154680a5c"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"key_value_store",
sa.Column("key", sa.String(), nullable=False),
sa.Column("value", postgresql.JSONB(astext_type=sa.Text()), nullable=False),
sa.PrimaryKeyConstraint("key"),
)
def downgrade() -> None:
op.drop_table("key_value_store")

View File

@ -224,8 +224,8 @@ INDEXING_MODEL_SERVER_HOST = (
#####
# Miscellaneous
#####
DYNAMIC_CONFIG_STORE = os.environ.get(
"DYNAMIC_CONFIG_STORE", "FileSystemBackedDynamicConfigStore"
DYNAMIC_CONFIG_STORE = (
os.environ.get("DYNAMIC_CONFIG_STORE") or "PostgresBackedDynamicConfigStore"
)
DYNAMIC_CONFIG_DIR_PATH = os.environ.get("DYNAMIC_CONFIG_DIR_PATH", "/home/storage")
JOB_TIMEOUT = 60 * 60 * 6 # 6 hours default

View File

@ -24,7 +24,7 @@ from danswer.connectors.gmail.constants import GMAIL_SERVICE_ACCOUNT_KEY
from danswer.connectors.gmail.constants import SCOPES
from danswer.db.credentials import update_credential_json
from danswer.db.models import User
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.server.documents.models import CredentialBase
from danswer.server.documents.models import GoogleAppCredentials
from danswer.server.documents.models import GoogleServiceAccountKey

View File

@ -24,7 +24,7 @@ from danswer.connectors.google_drive.constants import GOOGLE_DRIVE_SERVICE_ACCOU
from danswer.connectors.google_drive.constants import SCOPES
from danswer.db.credentials import update_credential_json
from danswer.db.models import User
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.server.documents.models import CredentialBase
from danswer.server.documents.models import GoogleAppCredentials
from danswer.server.documents.models import GoogleServiceAccountKey

View File

@ -1,7 +1,7 @@
import os
from typing import cast
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.server.manage.models import SlackBotTokens

View File

@ -10,6 +10,7 @@ from sqlalchemy.ext.asyncio import AsyncEngine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from danswer.configs.app_configs import POSTGRES_DB
from danswer.configs.app_configs import POSTGRES_HOST
@ -80,3 +81,6 @@ async def get_async_session() -> AsyncGenerator[AsyncSession, None]:
get_sqlalchemy_async_engine(), expire_on_commit=False
) as async_session:
yield async_session
SessionFactory = sessionmaker(bind=get_sqlalchemy_engine())

View File

@ -35,6 +35,7 @@ from danswer.configs.constants import DocumentSource
from danswer.configs.constants import MessageType
from danswer.configs.constants import SearchFeedbackType
from danswer.connectors.models import InputType
from danswer.dynamic_configs.interface import JSON_ro
from danswer.search.models import RecencyBiasSetting
from danswer.search.models import SearchType
@ -851,3 +852,10 @@ class TaskQueueState(Base):
register_time: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
class KVStore(Base):
__tablename__ = "key_value_store"
key: Mapped[str] = mapped_column(String, primary_key=True)
value: Mapped[JSON_ro] = mapped_column(postgresql.JSONB(), nullable=False)

View File

@ -1,13 +0,0 @@
from danswer.configs.app_configs import DYNAMIC_CONFIG_DIR_PATH
from danswer.configs.app_configs import DYNAMIC_CONFIG_STORE
from danswer.dynamic_configs.file_system.store import FileSystemBackedDynamicConfigStore
from danswer.dynamic_configs.interface import DynamicConfigStore
def get_dynamic_config_store() -> DynamicConfigStore:
dynamic_config_store_type = DYNAMIC_CONFIG_STORE
if dynamic_config_store_type == FileSystemBackedDynamicConfigStore.__name__:
return FileSystemBackedDynamicConfigStore(DYNAMIC_CONFIG_DIR_PATH)
# TODO: change exception type
raise Exception("Unknown dynamic config store type")

View File

@ -0,0 +1,16 @@
from danswer.configs.app_configs import DYNAMIC_CONFIG_DIR_PATH
from danswer.configs.app_configs import DYNAMIC_CONFIG_STORE
from danswer.dynamic_configs.interface import DynamicConfigStore
from danswer.dynamic_configs.store import FileSystemBackedDynamicConfigStore
from danswer.dynamic_configs.store import PostgresBackedDynamicConfigStore
def get_dynamic_config_store() -> DynamicConfigStore:
dynamic_config_store_type = DYNAMIC_CONFIG_STORE
if dynamic_config_store_type == FileSystemBackedDynamicConfigStore.__name__:
return FileSystemBackedDynamicConfigStore(DYNAMIC_CONFIG_DIR_PATH)
if dynamic_config_store_type == PostgresBackedDynamicConfigStore.__name__:
return PostgresBackedDynamicConfigStore()
# TODO: change exception type
raise Exception("Unknown dynamic config store type")

View File

@ -0,0 +1,40 @@
import json
from pathlib import Path
from danswer.configs.app_configs import DYNAMIC_CONFIG_DIR_PATH
from danswer.dynamic_configs.factory import PostgresBackedDynamicConfigStore
from danswer.dynamic_configs.interface import ConfigNotFoundError
def read_file_system_store(directory_path: str) -> dict:
store = {}
base_path = Path(directory_path)
for file_path in base_path.iterdir():
if file_path.is_file() and "." not in file_path.name:
with open(file_path, "r") as file:
key = file_path.stem
value = json.load(file)
if value:
store[key] = value
return store
def insert_into_postgres(store_data: dict) -> None:
port_once_key = "file_store_ported"
config_store = PostgresBackedDynamicConfigStore()
try:
config_store.load(port_once_key)
return
except ConfigNotFoundError:
pass
for key, value in store_data.items():
config_store.store(key, value)
config_store.store(port_once_key, True)
def port_filesystem_to_postgres(directory_path: str = DYNAMIC_CONFIG_DIR_PATH) -> None:
store_data = read_file_system_store(directory_path)
insert_into_postgres(store_data)

View File

@ -1,10 +1,15 @@
import json
import os
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import cast
from filelock import FileLock
from sqlalchemy.orm import Session
from danswer.db.engine import SessionFactory
from danswer.db.models import KVStore
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.dynamic_configs.interface import DynamicConfigStore
from danswer.dynamic_configs.interface import JSON_ro
@ -46,3 +51,38 @@ class FileSystemBackedDynamicConfigStore(DynamicConfigStore):
lock = _get_file_lock(file_path)
with lock.acquire(timeout=FILE_LOCK_TIMEOUT):
os.remove(file_path)
class PostgresBackedDynamicConfigStore(DynamicConfigStore):
@contextmanager
def get_session(self) -> Iterator[Session]:
session: Session = SessionFactory()
try:
yield session
finally:
session.close()
def store(self, key: str, val: JSON_ro) -> None:
with self.get_session() as session:
obj = session.query(KVStore).filter_by(key=key).first()
if obj:
obj.value = val
else:
obj = KVStore(key=key, value=val) # type: ignore
session.query(KVStore).filter_by(key=key).delete()
session.add(obj)
session.commit()
def load(self, key: str) -> JSON_ro:
with self.get_session() as session:
obj = session.query(KVStore).filter_by(key=key).first()
if not obj:
raise ConfigNotFoundError
return cast(JSON_ro, obj.value)
def delete(self, key: str) -> None:
with self.get_session() as session:
result = session.query(KVStore).filter_by(key=key).delete() # type: ignore
if result == 0:
raise ConfigNotFoundError
session.commit()

View File

@ -30,7 +30,7 @@ from danswer.configs.model_configs import GEN_AI_MAX_TOKENS
from danswer.configs.model_configs import GEN_AI_MODEL_PROVIDER
from danswer.configs.model_configs import GEN_AI_MODEL_VERSION
from danswer.db.models import ChatMessage
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.models import InferenceChunk
from danswer.llm.interfaces import LLM

View File

@ -51,6 +51,7 @@ from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import expire_index_attempts
from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs.port_configs import port_filesystem_to_postgres
from danswer.llm.factory import get_default_llm
from danswer.llm.utils import get_default_llm_version
from danswer.search.search_nlp_models import warm_up_models
@ -168,6 +169,8 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
f"Using multilingual flow with languages: {MULTILINGUAL_QUERY_EXPANSION}"
)
port_filesystem_to_postgres()
with Session(engine) as db_session:
db_embedding_model = get_current_db_embedding_model(db_session)
secondary_db_embedding_model = get_secondary_db_embedding_model(db_session)

View File

@ -19,7 +19,7 @@ from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.engine import get_session
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.embedder import DefaultIndexingEmbedder
from danswer.indexing.indexing_pipeline import build_indexing_pipeline

View File

@ -24,7 +24,7 @@ from danswer.db.feedback import update_document_hidden
from danswer.db.models import User
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.llm.exceptions import GenAIDisabledException
from danswer.llm.factory import get_default_llm

View File

@ -11,7 +11,7 @@ from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import UpdateRequest
from danswer.document_index.vespa.index import VespaIndex
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.utils.logger import setup_logger

View File

@ -6,7 +6,7 @@ from typing import cast
import requests
from danswer.configs.app_configs import DISABLE_TELEMETRY
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
CUSTOMER_UUID_KEY = "customer_uuid"