Redis Cache for KV Store (#2603)

* k

* k

* k

* k
This commit is contained in:
Yuhong Sun 2024-10-01 11:31:18 -07:00 committed by GitHub
parent f513c5bbed
commit fffb9c155a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
39 changed files with 250 additions and 269 deletions

View File

@ -9,7 +9,7 @@ import json
from typing import cast from typing import cast
from alembic import op from alembic import op
import sqlalchemy as sa import sqlalchemy as sa
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = "703313b75876" revision = "703313b75876"
@ -54,9 +54,7 @@ def upgrade() -> None:
) )
try: try:
settings_json = cast( settings_json = cast(str, get_kv_store().load("token_budget_settings"))
str, get_dynamic_config_store().load("token_budget_settings")
)
settings = json.loads(settings_json) settings = json.loads(settings_json)
is_enabled = settings.get("enable_token_budget", False) is_enabled = settings.get("enable_token_budget", False)
@ -71,7 +69,7 @@ def upgrade() -> None:
) )
# Delete the dynamic config # Delete the dynamic config
get_dynamic_config_store().delete("token_budget_settings") get_kv_store().delete("token_budget_settings")
except Exception: except Exception:
# Ignore if the dynamic config is not found # Ignore if the dynamic config is not found

View File

@ -1,20 +1,20 @@
from typing import cast from typing import cast
from danswer.configs.constants import KV_USER_STORE_KEY from danswer.configs.constants import KV_USER_STORE_KEY
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.key_value_store.interface import JSON_ro
from danswer.dynamic_configs.interface import JSON_ro from danswer.key_value_store.interface import KvKeyNotFoundError
def get_invited_users() -> list[str]: def get_invited_users() -> list[str]:
try: try:
store = get_dynamic_config_store() store = get_kv_store()
return cast(list, store.load(KV_USER_STORE_KEY)) return cast(list, store.load(KV_USER_STORE_KEY))
except ConfigNotFoundError: except KvKeyNotFoundError:
return list() return list()
def write_invited_users(emails: list[str]) -> int: def write_invited_users(emails: list[str]) -> int:
store = get_dynamic_config_store() store = get_kv_store()
store.store(KV_USER_STORE_KEY, cast(JSON_ro, emails)) store.store(KV_USER_STORE_KEY, cast(JSON_ro, emails))
return len(emails) return len(emails)

View File

@ -4,29 +4,29 @@ from typing import cast
from danswer.auth.schemas import UserRole from danswer.auth.schemas import UserRole
from danswer.configs.constants import KV_NO_AUTH_USER_PREFERENCES_KEY from danswer.configs.constants import KV_NO_AUTH_USER_PREFERENCES_KEY
from danswer.dynamic_configs.store import ConfigNotFoundError from danswer.key_value_store.store import KeyValueStore
from danswer.dynamic_configs.store import DynamicConfigStore from danswer.key_value_store.store import KvKeyNotFoundError
from danswer.server.manage.models import UserInfo from danswer.server.manage.models import UserInfo
from danswer.server.manage.models import UserPreferences from danswer.server.manage.models import UserPreferences
def set_no_auth_user_preferences( def set_no_auth_user_preferences(
store: DynamicConfigStore, preferences: UserPreferences store: KeyValueStore, preferences: UserPreferences
) -> None: ) -> None:
store.store(KV_NO_AUTH_USER_PREFERENCES_KEY, preferences.model_dump()) store.store(KV_NO_AUTH_USER_PREFERENCES_KEY, preferences.model_dump())
def load_no_auth_user_preferences(store: DynamicConfigStore) -> UserPreferences: def load_no_auth_user_preferences(store: KeyValueStore) -> UserPreferences:
try: try:
preferences_data = cast( preferences_data = cast(
Mapping[str, Any], store.load(KV_NO_AUTH_USER_PREFERENCES_KEY) Mapping[str, Any], store.load(KV_NO_AUTH_USER_PREFERENCES_KEY)
) )
return UserPreferences(**preferences_data) return UserPreferences(**preferences_data)
except ConfigNotFoundError: except KvKeyNotFoundError:
return UserPreferences(chosen_assistants=None, default_model=None) return UserPreferences(chosen_assistants=None, default_model=None)
def fetch_no_auth_user(store: DynamicConfigStore) -> UserInfo: def fetch_no_auth_user(store: KeyValueStore) -> UserInfo:
return UserInfo( return UserInfo(
id="__no_auth_user__", id="__no_auth_user__",
email="anonymous@danswer.ai", email="anonymous@danswer.ai",

View File

@ -30,7 +30,7 @@ from danswer.configs.constants import POSTGRES_CELERY_WORKER_HEAVY_APP_NAME
from danswer.configs.constants import POSTGRES_CELERY_WORKER_LIGHT_APP_NAME from danswer.configs.constants import POSTGRES_CELERY_WORKER_LIGHT_APP_NAME
from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
from danswer.db.engine import SqlEngine from danswer.db.engine import SqlEngine
from danswer.redis.redis_pool import RedisPool from danswer.redis.redis_pool import get_redis_client
from danswer.utils.logger import ColoredFormatter from danswer.utils.logger import ColoredFormatter
from danswer.utils.logger import PlainFormatter from danswer.utils.logger import PlainFormatter
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
@ -40,8 +40,6 @@ logger = setup_logger()
# use this within celery tasks to get celery task specific logging # use this within celery tasks to get celery task specific logging
task_logger = get_task_logger(__name__) task_logger = get_task_logger(__name__)
redis_pool = RedisPool()
celery_app = Celery(__name__) celery_app = Celery(__name__)
celery_app.config_from_object( celery_app.config_from_object(
"danswer.background.celery.celeryconfig" "danswer.background.celery.celeryconfig"
@ -79,13 +77,13 @@ def celery_task_postrun(
if not task_id: if not task_id:
return return
r = get_redis_client()
if task_id.startswith(RedisConnectorCredentialPair.PREFIX): if task_id.startswith(RedisConnectorCredentialPair.PREFIX):
r = redis_pool.get_client()
r.srem(RedisConnectorCredentialPair.get_taskset_key(), task_id) r.srem(RedisConnectorCredentialPair.get_taskset_key(), task_id)
return return
if task_id.startswith(RedisDocumentSet.PREFIX): if task_id.startswith(RedisDocumentSet.PREFIX):
r = redis_pool.get_client()
document_set_id = RedisDocumentSet.get_id_from_task_id(task_id) document_set_id = RedisDocumentSet.get_id_from_task_id(task_id)
if document_set_id is not None: if document_set_id is not None:
rds = RedisDocumentSet(document_set_id) rds = RedisDocumentSet(document_set_id)
@ -93,7 +91,6 @@ def celery_task_postrun(
return return
if task_id.startswith(RedisUserGroup.PREFIX): if task_id.startswith(RedisUserGroup.PREFIX):
r = redis_pool.get_client()
usergroup_id = RedisUserGroup.get_id_from_task_id(task_id) usergroup_id = RedisUserGroup.get_id_from_task_id(task_id)
if usergroup_id is not None: if usergroup_id is not None:
rug = RedisUserGroup(usergroup_id) rug = RedisUserGroup(usergroup_id)
@ -101,7 +98,6 @@ def celery_task_postrun(
return return
if task_id.startswith(RedisConnectorDeletion.PREFIX): if task_id.startswith(RedisConnectorDeletion.PREFIX):
r = redis_pool.get_client()
cc_pair_id = RedisConnectorDeletion.get_id_from_task_id(task_id) cc_pair_id = RedisConnectorDeletion.get_id_from_task_id(task_id)
if cc_pair_id is not None: if cc_pair_id is not None:
rcd = RedisConnectorDeletion(cc_pair_id) rcd = RedisConnectorDeletion(cc_pair_id)
@ -130,7 +126,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME)
SqlEngine.init_engine(pool_size=8, max_overflow=0) SqlEngine.init_engine(pool_size=8, max_overflow=0)
r = redis_pool.get_client() r = get_redis_client()
WAIT_INTERVAL = 5 WAIT_INTERVAL = 5
WAIT_LIMIT = 60 WAIT_LIMIT = 60
@ -190,7 +186,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
# This is singleton work that should be done on startup exactly once # This is singleton work that should be done on startup exactly once
# by the primary worker # by the primary worker
r = redis_pool.get_client() r = get_redis_client()
# For the moment, we're assuming that we are the only primary worker # For the moment, we're assuming that we are the only primary worker
# that should be running. # that should be running.
@ -364,7 +360,7 @@ class HubPeriodicTask(bootsteps.StartStopStep):
if not hasattr(worker, "primary_worker_lock"): if not hasattr(worker, "primary_worker_lock"):
return return
r = redis_pool.get_client() r = get_redis_client()
lock: redis.lock.Lock = worker.primary_worker_lock lock: redis.lock.Lock = worker.primary_worker_lock

View File

@ -25,12 +25,12 @@ from danswer.db.models import TaskQueueState
from danswer.db.tasks import check_task_is_live_and_not_timed_out from danswer.db.tasks import check_task_is_live_and_not_timed_out
from danswer.db.tasks import get_latest_task from danswer.db.tasks import get_latest_task
from danswer.db.tasks import get_latest_task_by_type from danswer.db.tasks import get_latest_task_by_type
from danswer.redis.redis_pool import RedisPool from danswer.redis.redis_pool import get_redis_client
from danswer.server.documents.models import DeletionAttemptSnapshot from danswer.server.documents.models import DeletionAttemptSnapshot
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
logger = setup_logger() logger = setup_logger()
redis_pool = RedisPool()
def _get_deletion_status( def _get_deletion_status(
@ -47,7 +47,7 @@ def _get_deletion_status(
rcd = RedisConnectorDeletion(cc_pair.id) rcd = RedisConnectorDeletion(cc_pair.id)
r = redis_pool.get_client() r = get_redis_client()
if not r.exists(rcd.fence_key): if not r.exists(rcd.fence_key):
return None return None

View File

@ -18,9 +18,8 @@ from danswer.db.enums import IndexingStatus
from danswer.db.index_attempt import get_last_attempt from danswer.db.index_attempt import get_last_attempt
from danswer.db.models import ConnectorCredentialPair from danswer.db.models import ConnectorCredentialPair
from danswer.db.search_settings import get_current_search_settings from danswer.db.search_settings import get_current_search_settings
from danswer.redis.redis_pool import RedisPool from danswer.redis.redis_pool import get_redis_client
redis_pool = RedisPool()
# use this within celery tasks to get celery task specific logging # use this within celery tasks to get celery task specific logging
task_logger = get_task_logger(__name__) task_logger = get_task_logger(__name__)
@ -32,7 +31,7 @@ task_logger = get_task_logger(__name__)
trail=False, trail=False,
) )
def check_for_connector_deletion_task() -> None: def check_for_connector_deletion_task() -> None:
r = redis_pool.get_client() r = get_redis_client()
lock_beat = r.lock( lock_beat = r.lock(
DanswerRedisLocks.CHECK_CONNECTOR_DELETION_BEAT_LOCK, DanswerRedisLocks.CHECK_CONNECTOR_DELETION_BEAT_LOCK,

View File

@ -41,14 +41,13 @@ from danswer.db.models import UserGroup
from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import UpdateRequest from danswer.document_index.interfaces import UpdateRequest
from danswer.redis.redis_pool import RedisPool from danswer.redis.redis_pool import get_redis_client
from danswer.utils.variable_functionality import fetch_versioned_implementation from danswer.utils.variable_functionality import fetch_versioned_implementation
from danswer.utils.variable_functionality import ( from danswer.utils.variable_functionality import (
fetch_versioned_implementation_with_fallback, fetch_versioned_implementation_with_fallback,
) )
from danswer.utils.variable_functionality import noop_fallback from danswer.utils.variable_functionality import noop_fallback
redis_pool = RedisPool()
# use this within celery tasks to get celery task specific logging # use this within celery tasks to get celery task specific logging
task_logger = get_task_logger(__name__) task_logger = get_task_logger(__name__)
@ -65,7 +64,7 @@ def check_for_vespa_sync_task() -> None:
"""Runs periodically to check if any document needs syncing. """Runs periodically to check if any document needs syncing.
Generates sets of tasks for Celery if syncing is needed.""" Generates sets of tasks for Celery if syncing is needed."""
r = redis_pool.get_client() r = get_redis_client()
lock_beat = r.lock( lock_beat = r.lock(
DanswerRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK, DanswerRedisLocks.CHECK_VESPA_SYNC_BEAT_LOCK,
@ -426,7 +425,7 @@ def monitor_vespa_sync() -> None:
This task lock timeout is CELERY_METADATA_SYNC_BEAT_LOCK_TIMEOUT seconds, so don't This task lock timeout is CELERY_METADATA_SYNC_BEAT_LOCK_TIMEOUT seconds, so don't
do anything too expensive in this function! do anything too expensive in this function!
""" """
r = redis_pool.get_client() r = get_redis_client()
lock_beat = r.lock( lock_beat = r.lock(
DanswerRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK, DanswerRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK,

View File

@ -342,9 +342,6 @@ INDEXING_EXCEPTION_LIMIT = int(os.environ.get("INDEXING_EXCEPTION_LIMIT", 0))
##### #####
# Miscellaneous # Miscellaneous
##### #####
# File based Key Value store no longer used
DYNAMIC_CONFIG_STORE = "PostgresBackedDynamicConfigStore"
JOB_TIMEOUT = 60 * 60 * 6 # 6 hours default JOB_TIMEOUT = 60 * 60 * 6 # 6 hours default
# used to allow the background indexing jobs to use a different embedding # used to allow the background indexing jobs to use a different embedding
# model server than the API server # model server than the API server

View File

@ -25,7 +25,7 @@ from danswer.connectors.gmail.constants import (
from danswer.connectors.gmail.constants import SCOPES from danswer.connectors.gmail.constants import SCOPES
from danswer.db.credentials import update_credential_json from danswer.db.credentials import update_credential_json
from danswer.db.models import User from danswer.db.models import User
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.server.documents.models import CredentialBase from danswer.server.documents.models import CredentialBase
from danswer.server.documents.models import GoogleAppCredentials from danswer.server.documents.models import GoogleAppCredentials
from danswer.server.documents.models import GoogleServiceAccountKey from danswer.server.documents.models import GoogleServiceAccountKey
@ -72,7 +72,7 @@ def get_gmail_creds_for_service_account(
def verify_csrf(credential_id: int, state: str) -> None: def verify_csrf(credential_id: int, state: str) -> None:
csrf = get_dynamic_config_store().load(KV_CRED_KEY.format(str(credential_id))) csrf = get_kv_store().load(KV_CRED_KEY.format(str(credential_id)))
if csrf != state: if csrf != state:
raise PermissionError( raise PermissionError(
"State from Gmail Connector callback does not match expected" "State from Gmail Connector callback does not match expected"
@ -80,7 +80,7 @@ def verify_csrf(credential_id: int, state: str) -> None:
def get_gmail_auth_url(credential_id: int) -> str: def get_gmail_auth_url(credential_id: int) -> str:
creds_str = str(get_dynamic_config_store().load(KV_GMAIL_CRED_KEY)) creds_str = str(get_kv_store().load(KV_GMAIL_CRED_KEY))
credential_json = json.loads(creds_str) credential_json = json.loads(creds_str)
flow = InstalledAppFlow.from_client_config( flow = InstalledAppFlow.from_client_config(
credential_json, credential_json,
@ -92,14 +92,14 @@ def get_gmail_auth_url(credential_id: int) -> str:
parsed_url = cast(ParseResult, urlparse(auth_url)) parsed_url = cast(ParseResult, urlparse(auth_url))
params = parse_qs(parsed_url.query) params = parse_qs(parsed_url.query)
get_dynamic_config_store().store( get_kv_store().store(
KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True
) # type: ignore ) # type: ignore
return str(auth_url) return str(auth_url)
def get_auth_url(credential_id: int) -> str: def get_auth_url(credential_id: int) -> str:
creds_str = str(get_dynamic_config_store().load(KV_GMAIL_CRED_KEY)) creds_str = str(get_kv_store().load(KV_GMAIL_CRED_KEY))
credential_json = json.loads(creds_str) credential_json = json.loads(creds_str)
flow = InstalledAppFlow.from_client_config( flow = InstalledAppFlow.from_client_config(
credential_json, credential_json,
@ -111,7 +111,7 @@ def get_auth_url(credential_id: int) -> str:
parsed_url = cast(ParseResult, urlparse(auth_url)) parsed_url = cast(ParseResult, urlparse(auth_url))
params = parse_qs(parsed_url.query) params = parse_qs(parsed_url.query)
get_dynamic_config_store().store( get_kv_store().store(
KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True
) # type: ignore ) # type: ignore
return str(auth_url) return str(auth_url)
@ -158,42 +158,40 @@ def build_service_account_creds(
def get_google_app_gmail_cred() -> GoogleAppCredentials: def get_google_app_gmail_cred() -> GoogleAppCredentials:
creds_str = str(get_dynamic_config_store().load(KV_GMAIL_CRED_KEY)) creds_str = str(get_kv_store().load(KV_GMAIL_CRED_KEY))
return GoogleAppCredentials(**json.loads(creds_str)) return GoogleAppCredentials(**json.loads(creds_str))
def upsert_google_app_gmail_cred(app_credentials: GoogleAppCredentials) -> None: def upsert_google_app_gmail_cred(app_credentials: GoogleAppCredentials) -> None:
get_dynamic_config_store().store( get_kv_store().store(KV_GMAIL_CRED_KEY, app_credentials.json(), encrypt=True)
KV_GMAIL_CRED_KEY, app_credentials.json(), encrypt=True
)
def delete_google_app_gmail_cred() -> None: def delete_google_app_gmail_cred() -> None:
get_dynamic_config_store().delete(KV_GMAIL_CRED_KEY) get_kv_store().delete(KV_GMAIL_CRED_KEY)
def get_gmail_service_account_key() -> GoogleServiceAccountKey: def get_gmail_service_account_key() -> GoogleServiceAccountKey:
creds_str = str(get_dynamic_config_store().load(KV_GMAIL_SERVICE_ACCOUNT_KEY)) creds_str = str(get_kv_store().load(KV_GMAIL_SERVICE_ACCOUNT_KEY))
return GoogleServiceAccountKey(**json.loads(creds_str)) return GoogleServiceAccountKey(**json.loads(creds_str))
def upsert_gmail_service_account_key( def upsert_gmail_service_account_key(
service_account_key: GoogleServiceAccountKey, service_account_key: GoogleServiceAccountKey,
) -> None: ) -> None:
get_dynamic_config_store().store( get_kv_store().store(
KV_GMAIL_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True KV_GMAIL_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True
) )
def upsert_service_account_key(service_account_key: GoogleServiceAccountKey) -> None: def upsert_service_account_key(service_account_key: GoogleServiceAccountKey) -> None:
get_dynamic_config_store().store( get_kv_store().store(
KV_GMAIL_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True KV_GMAIL_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True
) )
def delete_gmail_service_account_key() -> None: def delete_gmail_service_account_key() -> None:
get_dynamic_config_store().delete(KV_GMAIL_SERVICE_ACCOUNT_KEY) get_kv_store().delete(KV_GMAIL_SERVICE_ACCOUNT_KEY)
def delete_service_account_key() -> None: def delete_service_account_key() -> None:
get_dynamic_config_store().delete(KV_GMAIL_SERVICE_ACCOUNT_KEY) get_kv_store().delete(KV_GMAIL_SERVICE_ACCOUNT_KEY)

View File

@ -28,7 +28,7 @@ from danswer.connectors.google_drive.constants import FETCH_GROUPS_SCOPES
from danswer.connectors.google_drive.constants import FETCH_PERMISSIONS_SCOPES from danswer.connectors.google_drive.constants import FETCH_PERMISSIONS_SCOPES
from danswer.db.credentials import update_credential_json from danswer.db.credentials import update_credential_json
from danswer.db.models import User from danswer.db.models import User
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.server.documents.models import CredentialBase from danswer.server.documents.models import CredentialBase
from danswer.server.documents.models import GoogleAppCredentials from danswer.server.documents.models import GoogleAppCredentials
from danswer.server.documents.models import GoogleServiceAccountKey from danswer.server.documents.models import GoogleServiceAccountKey
@ -134,7 +134,7 @@ def get_google_drive_creds(
def verify_csrf(credential_id: int, state: str) -> None: def verify_csrf(credential_id: int, state: str) -> None:
csrf = get_dynamic_config_store().load(KV_CRED_KEY.format(str(credential_id))) csrf = get_kv_store().load(KV_CRED_KEY.format(str(credential_id)))
if csrf != state: if csrf != state:
raise PermissionError( raise PermissionError(
"State from Google Drive Connector callback does not match expected" "State from Google Drive Connector callback does not match expected"
@ -142,7 +142,7 @@ def verify_csrf(credential_id: int, state: str) -> None:
def get_auth_url(credential_id: int) -> str: def get_auth_url(credential_id: int) -> str:
creds_str = str(get_dynamic_config_store().load(KV_GOOGLE_DRIVE_CRED_KEY)) creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_CRED_KEY))
credential_json = json.loads(creds_str) credential_json = json.loads(creds_str)
flow = InstalledAppFlow.from_client_config( flow = InstalledAppFlow.from_client_config(
credential_json, credential_json,
@ -154,7 +154,7 @@ def get_auth_url(credential_id: int) -> str:
parsed_url = cast(ParseResult, urlparse(auth_url)) parsed_url = cast(ParseResult, urlparse(auth_url))
params = parse_qs(parsed_url.query) params = parse_qs(parsed_url.query)
get_dynamic_config_store().store( get_kv_store().store(
KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True KV_CRED_KEY.format(credential_id), params.get("state", [None])[0], encrypt=True
) # type: ignore ) # type: ignore
return str(auth_url) return str(auth_url)
@ -202,32 +202,28 @@ def build_service_account_creds(
def get_google_app_cred() -> GoogleAppCredentials: def get_google_app_cred() -> GoogleAppCredentials:
creds_str = str(get_dynamic_config_store().load(KV_GOOGLE_DRIVE_CRED_KEY)) creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_CRED_KEY))
return GoogleAppCredentials(**json.loads(creds_str)) return GoogleAppCredentials(**json.loads(creds_str))
def upsert_google_app_cred(app_credentials: GoogleAppCredentials) -> None: def upsert_google_app_cred(app_credentials: GoogleAppCredentials) -> None:
get_dynamic_config_store().store( get_kv_store().store(KV_GOOGLE_DRIVE_CRED_KEY, app_credentials.json(), encrypt=True)
KV_GOOGLE_DRIVE_CRED_KEY, app_credentials.json(), encrypt=True
)
def delete_google_app_cred() -> None: def delete_google_app_cred() -> None:
get_dynamic_config_store().delete(KV_GOOGLE_DRIVE_CRED_KEY) get_kv_store().delete(KV_GOOGLE_DRIVE_CRED_KEY)
def get_service_account_key() -> GoogleServiceAccountKey: def get_service_account_key() -> GoogleServiceAccountKey:
creds_str = str( creds_str = str(get_kv_store().load(KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY))
get_dynamic_config_store().load(KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY)
)
return GoogleServiceAccountKey(**json.loads(creds_str)) return GoogleServiceAccountKey(**json.loads(creds_str))
def upsert_service_account_key(service_account_key: GoogleServiceAccountKey) -> None: def upsert_service_account_key(service_account_key: GoogleServiceAccountKey) -> None:
get_dynamic_config_store().store( get_kv_store().store(
KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY, service_account_key.json(), encrypt=True
) )
def delete_service_account_key() -> None: def delete_service_account_key() -> None:
get_dynamic_config_store().delete(KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY) get_kv_store().delete(KV_GOOGLE_DRIVE_SERVICE_ACCOUNT_KEY)

View File

@ -49,7 +49,7 @@ from danswer.danswerbot.slack.utils import rephrase_slack_message
from danswer.danswerbot.slack.utils import respond_in_thread from danswer.danswerbot.slack.utils import respond_in_thread
from danswer.db.engine import get_sqlalchemy_engine from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.search_settings import get_current_search_settings from danswer.db.search_settings import get_current_search_settings
from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.one_shot_answer.models import ThreadMessage from danswer.one_shot_answer.models import ThreadMessage
@ -522,7 +522,7 @@ if __name__ == "__main__":
# Let the handlers run in the background + re-check for token updates every 60 seconds # Let the handlers run in the background + re-check for token updates every 60 seconds
Event().wait(timeout=60) Event().wait(timeout=60)
except ConfigNotFoundError: except KvKeyNotFoundError:
# try again every 30 seconds. This is needed since the user may add tokens # try again every 30 seconds. This is needed since the user may add tokens
# via the UI at any point in the programs lifecycle - if we just allow it to # via the UI at any point in the programs lifecycle - if we just allow it to
# fail, then the user will need to restart the containers after adding tokens # fail, then the user will need to restart the containers after adding tokens

View File

@ -2,7 +2,7 @@ import os
from typing import cast from typing import cast
from danswer.configs.constants import KV_SLACK_BOT_TOKENS_CONFIG_KEY from danswer.configs.constants import KV_SLACK_BOT_TOKENS_CONFIG_KEY
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.server.manage.models import SlackBotTokens from danswer.server.manage.models import SlackBotTokens
@ -13,7 +13,7 @@ def fetch_tokens() -> SlackBotTokens:
if app_token and bot_token: if app_token and bot_token:
return SlackBotTokens(app_token=app_token, bot_token=bot_token) return SlackBotTokens(app_token=app_token, bot_token=bot_token)
dynamic_config_store = get_dynamic_config_store() dynamic_config_store = get_kv_store()
return SlackBotTokens( return SlackBotTokens(
**cast(dict, dynamic_config_store.load(key=KV_SLACK_BOT_TOKENS_CONFIG_KEY)) **cast(dict, dynamic_config_store.load(key=KV_SLACK_BOT_TOKENS_CONFIG_KEY))
) )
@ -22,7 +22,7 @@ def fetch_tokens() -> SlackBotTokens:
def save_tokens( def save_tokens(
tokens: SlackBotTokens, tokens: SlackBotTokens,
) -> None: ) -> None:
dynamic_config_store = get_dynamic_config_store() dynamic_config_store = get_kv_store()
dynamic_config_store.store( dynamic_config_store.store(
key=KV_SLACK_BOT_TOKENS_CONFIG_KEY, val=dict(tokens), encrypt=True key=KV_SLACK_BOT_TOKENS_CONFIG_KEY, val=dict(tokens), encrypt=True
) )

View File

@ -50,7 +50,7 @@ from danswer.db.enums import IndexingStatus
from danswer.db.enums import IndexModelStatus from danswer.db.enums import IndexModelStatus
from danswer.db.enums import TaskStatus from danswer.db.enums import TaskStatus
from danswer.db.pydantic_type import PydanticType from danswer.db.pydantic_type import PydanticType
from danswer.dynamic_configs.interface import JSON_ro from danswer.key_value_store.interface import JSON_ro
from danswer.file_store.models import FileDescriptor from danswer.file_store.models import FileDescriptor
from danswer.llm.override_models import LLMOverride from danswer.llm.override_models import LLMOverride
from danswer.llm.override_models import PromptOverride from danswer.llm.override_models import PromptOverride

View File

@ -11,7 +11,7 @@ from danswer.db.index_attempt import (
from danswer.db.search_settings import get_current_search_settings from danswer.db.search_settings import get_current_search_settings
from danswer.db.search_settings import get_secondary_search_settings from danswer.db.search_settings import get_secondary_search_settings
from danswer.db.search_settings import update_search_settings_status from danswer.db.search_settings import update_search_settings_status
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
logger = setup_logger() logger = setup_logger()
@ -54,7 +54,7 @@ def check_index_swap(db_session: Session) -> None:
) )
if cc_pair_count > 0: if cc_pair_count > 0:
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
kv_store.store(KV_REINDEX_KEY, False) kv_store.store(KV_REINDEX_KEY, False)
# Expire jobs for the now past index/embedding model # Expire jobs for the now past index/embedding model

View File

@ -58,8 +58,8 @@ from danswer.document_index.vespa_constants import VESPA_APPLICATION_ENDPOINT
from danswer.document_index.vespa_constants import VESPA_DIM_REPLACEMENT_PAT from danswer.document_index.vespa_constants import VESPA_DIM_REPLACEMENT_PAT
from danswer.document_index.vespa_constants import VESPA_TIMEOUT from danswer.document_index.vespa_constants import VESPA_TIMEOUT
from danswer.document_index.vespa_constants import YQL_BASE from danswer.document_index.vespa_constants import YQL_BASE
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.indexing.models import DocMetadataAwareIndexChunk from danswer.indexing.models import DocMetadataAwareIndexChunk
from danswer.key_value_store.factory import get_kv_store
from danswer.search.models import IndexFilters from danswer.search.models import IndexFilters
from danswer.search.models import InferenceChunkUncleaned from danswer.search.models import InferenceChunkUncleaned
from danswer.utils.batching import batch_generator from danswer.utils.batching import batch_generator
@ -140,7 +140,7 @@ class VespaIndex(DocumentIndex):
SEARCH_THREAD_NUMBER_PAT, str(VESPA_SEARCHER_THREADS) SEARCH_THREAD_NUMBER_PAT, str(VESPA_SEARCHER_THREADS)
) )
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
needs_reindexing = False needs_reindexing = False
try: try:

View File

@ -1,15 +0,0 @@
from danswer.configs.app_configs import DYNAMIC_CONFIG_STORE
from danswer.dynamic_configs.interface import DynamicConfigStore
from danswer.dynamic_configs.store import FileSystemBackedDynamicConfigStore
from danswer.dynamic_configs.store import PostgresBackedDynamicConfigStore
def get_dynamic_config_store() -> DynamicConfigStore:
dynamic_config_store_type = DYNAMIC_CONFIG_STORE
if dynamic_config_store_type == FileSystemBackedDynamicConfigStore.__name__:
raise NotImplementedError("File based config store no longer supported")
if dynamic_config_store_type == PostgresBackedDynamicConfigStore.__name__:
return PostgresBackedDynamicConfigStore()
# TODO: change exception type
raise Exception("Unknown dynamic config store type")

View File

@ -1,102 +0,0 @@
import json
import os
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import cast
from filelock import FileLock
from sqlalchemy.orm import Session
from danswer.db.engine import get_session_factory
from danswer.db.models import KVStore
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.dynamic_configs.interface import DynamicConfigStore
from danswer.dynamic_configs.interface import JSON_ro
FILE_LOCK_TIMEOUT = 10
def _get_file_lock(file_name: Path) -> FileLock:
return FileLock(file_name.with_suffix(".lock"))
class FileSystemBackedDynamicConfigStore(DynamicConfigStore):
def __init__(self, dir_path: str) -> None:
# TODO (chris): maybe require all possible keys to be passed in
# at app start somehow to prevent key overlaps
self.dir_path = Path(dir_path)
def store(self, key: str, val: JSON_ro, encrypt: bool = False) -> None:
file_path = self.dir_path / key
lock = _get_file_lock(file_path)
with lock.acquire(timeout=FILE_LOCK_TIMEOUT):
with open(file_path, "w+") as f:
json.dump(val, f)
def load(self, key: str) -> JSON_ro:
file_path = self.dir_path / key
if not file_path.exists():
raise ConfigNotFoundError
lock = _get_file_lock(file_path)
with lock.acquire(timeout=FILE_LOCK_TIMEOUT):
with open(self.dir_path / key) as f:
return cast(JSON_ro, json.load(f))
def delete(self, key: str) -> None:
file_path = self.dir_path / key
if not file_path.exists():
raise ConfigNotFoundError
lock = _get_file_lock(file_path)
with lock.acquire(timeout=FILE_LOCK_TIMEOUT):
os.remove(file_path)
class PostgresBackedDynamicConfigStore(DynamicConfigStore):
@contextmanager
def get_session(self) -> Iterator[Session]:
factory = get_session_factory()
session: Session = factory()
try:
yield session
finally:
session.close()
def store(self, key: str, val: JSON_ro, encrypt: bool = False) -> None:
# The actual encryption/decryption is done in Postgres, we just need to choose
# which field to set
encrypted_val = val if encrypt else None
plain_val = val if not encrypt else None
with self.get_session() as session:
obj = session.query(KVStore).filter_by(key=key).first()
if obj:
obj.value = plain_val
obj.encrypted_value = encrypted_val
else:
obj = KVStore(
key=key, value=plain_val, encrypted_value=encrypted_val
) # type: ignore
session.query(KVStore).filter_by(key=key).delete() # just in case
session.add(obj)
session.commit()
def load(self, key: str) -> JSON_ro:
with self.get_session() as session:
obj = session.query(KVStore).filter_by(key=key).first()
if not obj:
raise ConfigNotFoundError
if obj.value is not None:
return cast(JSON_ro, obj.value)
if obj.encrypted_value is not None:
return cast(JSON_ro, obj.encrypted_value)
return None
def delete(self, key: str) -> None:
with self.get_session() as session:
result = session.query(KVStore).filter_by(key=key).delete() # type: ignore
if result == 0:
raise ConfigNotFoundError
session.commit()

View File

@ -8,8 +8,8 @@ from unstructured_client.models import operations # type: ignore
from unstructured_client.models import shared from unstructured_client.models import shared
from danswer.configs.constants import KV_UNSTRUCTURED_API_KEY from danswer.configs.constants import KV_UNSTRUCTURED_API_KEY
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
@ -17,20 +17,20 @@ logger = setup_logger()
def get_unstructured_api_key() -> str | None: def get_unstructured_api_key() -> str | None:
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
try: try:
return cast(str, kv_store.load(KV_UNSTRUCTURED_API_KEY)) return cast(str, kv_store.load(KV_UNSTRUCTURED_API_KEY))
except ConfigNotFoundError: except KvKeyNotFoundError:
return None return None
def update_unstructured_api_key(api_key: str) -> None: def update_unstructured_api_key(api_key: str) -> None:
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
kv_store.store(KV_UNSTRUCTURED_API_KEY, api_key) kv_store.store(KV_UNSTRUCTURED_API_KEY, api_key)
def delete_unstructured_api_key() -> None: def delete_unstructured_api_key() -> None:
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
kv_store.delete(KV_UNSTRUCTURED_API_KEY) kv_store.delete(KV_UNSTRUCTURED_API_KEY)

View File

@ -0,0 +1,7 @@
from danswer.key_value_store.interface import KeyValueStore
from danswer.key_value_store.store import PgRedisKVStore
def get_kv_store() -> KeyValueStore:
# this is the only one supported currently
return PgRedisKVStore()

View File

@ -9,11 +9,11 @@ JSON_ro: TypeAlias = (
) )
class ConfigNotFoundError(Exception): class KvKeyNotFoundError(Exception):
pass pass
class DynamicConfigStore: class KeyValueStore:
@abc.abstractmethod @abc.abstractmethod
def store(self, key: str, val: JSON_ro, encrypt: bool = False) -> None: def store(self, key: str, val: JSON_ro, encrypt: bool = False) -> None:
raise NotImplementedError raise NotImplementedError

View File

@ -0,0 +1,99 @@
import json
from collections.abc import Iterator
from contextlib import contextmanager
from typing import cast
from sqlalchemy.orm import Session
from danswer.db.engine import get_session_factory
from danswer.db.models import KVStore
from danswer.key_value_store.interface import JSON_ro
from danswer.key_value_store.interface import KeyValueStore
from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.redis.redis_pool import get_redis_client
from danswer.utils.logger import setup_logger
logger = setup_logger()
REDIS_KEY_PREFIX = "danswer_kv_store:"
KV_REDIS_KEY_EXPIRATION = 60 * 60 * 24 # 1 Day
class PgRedisKVStore(KeyValueStore):
def __init__(self) -> None:
self.redis_client = get_redis_client()
@contextmanager
def get_session(self) -> Iterator[Session]:
factory = get_session_factory()
session: Session = factory()
try:
yield session
finally:
session.close()
def store(self, key: str, val: JSON_ro, encrypt: bool = False) -> None:
# Not encrypted in Redis, but encrypted in Postgres
try:
self.redis_client.set(
REDIS_KEY_PREFIX + key, json.dumps(val), ex=KV_REDIS_KEY_EXPIRATION
)
except Exception as e:
# Fallback gracefully to Postgres if Redis fails
logger.error(f"Failed to set value in Redis for key '{key}': {str(e)}")
encrypted_val = val if encrypt else None
plain_val = val if not encrypt else None
with self.get_session() as session:
obj = session.query(KVStore).filter_by(key=key).first()
if obj:
obj.value = plain_val
obj.encrypted_value = encrypted_val
else:
obj = KVStore(
key=key, value=plain_val, encrypted_value=encrypted_val
) # type: ignore
session.query(KVStore).filter_by(key=key).delete() # just in case
session.add(obj)
session.commit()
def load(self, key: str) -> JSON_ro:
try:
redis_value = self.redis_client.get(REDIS_KEY_PREFIX + key)
if redis_value:
assert isinstance(redis_value, bytes)
return json.loads(redis_value.decode("utf-8"))
except Exception as e:
logger.error(f"Failed to get value from Redis for key '{key}': {str(e)}")
with self.get_session() as session:
obj = session.query(KVStore).filter_by(key=key).first()
if not obj:
raise KvKeyNotFoundError
if obj.value is not None:
value = obj.value
elif obj.encrypted_value is not None:
value = obj.encrypted_value
else:
value = None
try:
self.redis_client.set(REDIS_KEY_PREFIX + key, json.dumps(value))
except Exception as e:
logger.error(f"Failed to set value in Redis for key '{key}': {str(e)}")
return cast(JSON_ro, value)
def delete(self, key: str) -> None:
try:
self.redis_client.delete(REDIS_KEY_PREFIX + key)
except Exception as e:
logger.error(f"Failed to delete value from Redis for key '{key}': {str(e)}")
with self.get_session() as session:
result = session.query(KVStore).filter_by(key=key).delete() # type: ignore
if result == 0:
raise KvKeyNotFoundError
session.commit()

View File

@ -65,9 +65,9 @@ from danswer.db.search_settings import update_secondary_search_settings
from danswer.db.swap_index import check_index_swap from danswer.db.swap_index import check_index_swap
from danswer.document_index.factory import get_default_document_index from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import DocumentIndex
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.models import IndexingSetting from danswer.indexing.models import IndexingSetting
from danswer.key_value_store.factory import get_kv_store
from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.natural_language_processing.search_nlp_models import warm_up_cross_encoder from danswer.natural_language_processing.search_nlp_models import warm_up_cross_encoder
@ -256,7 +256,7 @@ def update_default_multipass_indexing(db_session: Session) -> None:
def translate_saved_search_settings(db_session: Session) -> None: def translate_saved_search_settings(db_session: Session) -> None:
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
try: try:
search_settings_dict = kv_store.load(KV_SEARCH_SETTINGS) search_settings_dict = kv_store.load(KV_SEARCH_SETTINGS)
@ -294,17 +294,17 @@ def translate_saved_search_settings(db_session: Session) -> None:
logger.notice("Search settings updated and KV store entry deleted.") logger.notice("Search settings updated and KV store entry deleted.")
else: else:
logger.notice("KV store search settings is empty.") logger.notice("KV store search settings is empty.")
except ConfigNotFoundError: except KvKeyNotFoundError:
logger.notice("No search config found in KV store.") logger.notice("No search config found in KV store.")
def mark_reindex_flag(db_session: Session) -> None: def mark_reindex_flag(db_session: Session) -> None:
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
try: try:
value = kv_store.load(KV_REINDEX_KEY) value = kv_store.load(KV_REINDEX_KEY)
logger.debug(f"Re-indexing flag has value {value}") logger.debug(f"Re-indexing flag has value {value}")
return return
except ConfigNotFoundError: except KvKeyNotFoundError:
# Only need to update the flag if it hasn't been set # Only need to update the flag if it hasn't been set
pass pass

View File

@ -74,6 +74,13 @@ class RedisPool:
) )
redis_pool = RedisPool()
def get_redis_client() -> Redis:
return redis_pool.get_client()
# # Usage example # # Usage example
# redis_pool = RedisPool() # redis_pool = RedisPool()
# redis_client = redis_pool.get_client() # redis_client = redis_pool.get_client()

View File

@ -1,8 +1,8 @@
from typing import cast from typing import cast
from danswer.configs.constants import KV_SEARCH_SETTINGS from danswer.configs.constants import KV_SEARCH_SETTINGS
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.search.models import SavedSearchSettings from danswer.search.models import SavedSearchSettings
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
@ -17,10 +17,10 @@ def get_kv_search_settings() -> SavedSearchSettings | None:
if the value is updated by another process/instance of the API server. If this reads from an in memory cache like if the value is updated by another process/instance of the API server. If this reads from an in memory cache like
reddis then it will be ok. Until then this has some performance implications (though minor) reddis then it will be ok. Until then this has some performance implications (though minor)
""" """
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
try: try:
return SavedSearchSettings(**cast(dict, kv_store.load(KV_SEARCH_SETTINGS))) return SavedSearchSettings(**cast(dict, kv_store.load(KV_SEARCH_SETTINGS)))
except ConfigNotFoundError: except KvKeyNotFoundError:
return None return None
except Exception as e: except Exception as e:
logger.error(f"Error loading search settings: {e}") logger.error(f"Error loading search settings: {e}")

View File

@ -74,8 +74,8 @@ from danswer.db.models import IndexingStatus
from danswer.db.models import User from danswer.db.models import User
from danswer.db.models import UserRole from danswer.db.models import UserRole
from danswer.db.search_settings import get_current_search_settings from danswer.db.search_settings import get_current_search_settings
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.file_store.file_store import get_default_file_store from danswer.file_store.file_store import get_default_file_store
from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.server.documents.models import AuthStatus from danswer.server.documents.models import AuthStatus
from danswer.server.documents.models import AuthUrl from danswer.server.documents.models import AuthUrl
from danswer.server.documents.models import ConnectorCredentialPairIdentifier from danswer.server.documents.models import ConnectorCredentialPairIdentifier
@ -116,7 +116,7 @@ def check_google_app_gmail_credentials_exist(
) -> dict[str, str]: ) -> dict[str, str]:
try: try:
return {"client_id": get_google_app_gmail_cred().web.client_id} return {"client_id": get_google_app_gmail_cred().web.client_id}
except ConfigNotFoundError: except KvKeyNotFoundError:
raise HTTPException(status_code=404, detail="Google App Credentials not found") raise HTTPException(status_code=404, detail="Google App Credentials not found")
@ -140,7 +140,7 @@ def delete_google_app_gmail_credentials(
) -> StatusResponse: ) -> StatusResponse:
try: try:
delete_google_app_gmail_cred() delete_google_app_gmail_cred()
except ConfigNotFoundError as e: except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
return StatusResponse( return StatusResponse(
@ -154,7 +154,7 @@ def check_google_app_credentials_exist(
) -> dict[str, str]: ) -> dict[str, str]:
try: try:
return {"client_id": get_google_app_cred().web.client_id} return {"client_id": get_google_app_cred().web.client_id}
except ConfigNotFoundError: except KvKeyNotFoundError:
raise HTTPException(status_code=404, detail="Google App Credentials not found") raise HTTPException(status_code=404, detail="Google App Credentials not found")
@ -178,7 +178,7 @@ def delete_google_app_credentials(
) -> StatusResponse: ) -> StatusResponse:
try: try:
delete_google_app_cred() delete_google_app_cred()
except ConfigNotFoundError as e: except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
return StatusResponse( return StatusResponse(
@ -192,7 +192,7 @@ def check_google_service_gmail_account_key_exist(
) -> dict[str, str]: ) -> dict[str, str]:
try: try:
return {"service_account_email": get_gmail_service_account_key().client_email} return {"service_account_email": get_gmail_service_account_key().client_email}
except ConfigNotFoundError: except KvKeyNotFoundError:
raise HTTPException( raise HTTPException(
status_code=404, detail="Google Service Account Key not found" status_code=404, detail="Google Service Account Key not found"
) )
@ -218,7 +218,7 @@ def delete_google_service_gmail_account_key(
) -> StatusResponse: ) -> StatusResponse:
try: try:
delete_gmail_service_account_key() delete_gmail_service_account_key()
except ConfigNotFoundError as e: except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
return StatusResponse( return StatusResponse(
@ -232,7 +232,7 @@ def check_google_service_account_key_exist(
) -> dict[str, str]: ) -> dict[str, str]:
try: try:
return {"service_account_email": get_service_account_key().client_email} return {"service_account_email": get_service_account_key().client_email}
except ConfigNotFoundError: except KvKeyNotFoundError:
raise HTTPException( raise HTTPException(
status_code=404, detail="Google Service Account Key not found" status_code=404, detail="Google Service Account Key not found"
) )
@ -258,7 +258,7 @@ def delete_google_service_account_key(
) -> StatusResponse: ) -> StatusResponse:
try: try:
delete_service_account_key() delete_service_account_key()
except ConfigNotFoundError as e: except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
return StatusResponse( return StatusResponse(
@ -280,7 +280,7 @@ def upsert_service_account_credential(
DocumentSource.GOOGLE_DRIVE, DocumentSource.GOOGLE_DRIVE,
delegated_user_email=service_account_credential_request.google_drive_delegated_user, delegated_user_email=service_account_credential_request.google_drive_delegated_user,
) )
except ConfigNotFoundError as e: except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
# first delete all existing service account credentials # first delete all existing service account credentials
@ -306,7 +306,7 @@ def upsert_gmail_service_account_credential(
DocumentSource.GMAIL, DocumentSource.GMAIL,
delegated_user_email=service_account_credential_request.gmail_delegated_user, delegated_user_email=service_account_credential_request.gmail_delegated_user,
) )
except ConfigNotFoundError as e: except KvKeyNotFoundError as e:
raise HTTPException(status_code=400, detail=str(e)) raise HTTPException(status_code=400, detail=str(e))
# first delete all existing service account credentials # first delete all existing service account credentials

View File

@ -29,9 +29,9 @@ from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair
from danswer.db.models import User from danswer.db.models import User
from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.file_store.file_store import get_default_file_store from danswer.file_store.file_store import get_default_file_store
from danswer.key_value_store.factory import get_kv_store
from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.llm.factory import get_default_llms from danswer.llm.factory import get_default_llms
from danswer.llm.utils import test_llm from danswer.llm.utils import test_llm
from danswer.server.documents.models import ConnectorCredentialPairIdentifier from danswer.server.documents.models import ConnectorCredentialPairIdentifier
@ -114,7 +114,7 @@ def validate_existing_genai_api_key(
_: User = Depends(current_admin_user), _: User = Depends(current_admin_user),
) -> None: ) -> None:
# Only validate every so often # Only validate every so often
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
curr_time = datetime.now(tz=timezone.utc) curr_time = datetime.now(tz=timezone.utc)
try: try:
last_check = datetime.fromtimestamp( last_check = datetime.fromtimestamp(
@ -123,7 +123,7 @@ def validate_existing_genai_api_key(
check_freq_sec = timedelta(seconds=GENERATIVE_MODEL_ACCESS_CHECK_FREQ) check_freq_sec = timedelta(seconds=GENERATIVE_MODEL_ACCESS_CHECK_FREQ)
if curr_time - last_check < check_freq_sec: if curr_time - last_check < check_freq_sec:
return return
except ConfigNotFoundError: except KvKeyNotFoundError:
# First time checking the key, nothing unusual # First time checking the key, nothing unusual
pass pass

View File

@ -18,7 +18,7 @@ from danswer.db.slack_bot_config import fetch_slack_bot_configs
from danswer.db.slack_bot_config import insert_slack_bot_config from danswer.db.slack_bot_config import insert_slack_bot_config
from danswer.db.slack_bot_config import remove_slack_bot_config from danswer.db.slack_bot_config import remove_slack_bot_config
from danswer.db.slack_bot_config import update_slack_bot_config from danswer.db.slack_bot_config import update_slack_bot_config
from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.server.manage.models import SlackBotConfig from danswer.server.manage.models import SlackBotConfig
from danswer.server.manage.models import SlackBotConfigCreationRequest from danswer.server.manage.models import SlackBotConfigCreationRequest
from danswer.server.manage.models import SlackBotTokens from danswer.server.manage.models import SlackBotTokens
@ -212,5 +212,5 @@ def put_tokens(
def get_tokens(_: User | None = Depends(current_admin_user)) -> SlackBotTokens: def get_tokens(_: User | None = Depends(current_admin_user)) -> SlackBotTokens:
try: try:
return fetch_tokens() return fetch_tokens()
except ConfigNotFoundError: except KvKeyNotFoundError:
raise HTTPException(status_code=404, detail="No tokens found") raise HTTPException(status_code=404, detail="No tokens found")

View File

@ -38,7 +38,7 @@ from danswer.db.models import User
from danswer.db.models import User__UserGroup from danswer.db.models import User__UserGroup
from danswer.db.users import get_user_by_email from danswer.db.users import get_user_by_email
from danswer.db.users import list_users from danswer.db.users import list_users
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.server.manage.models import AllUsersResponse from danswer.server.manage.models import AllUsersResponse
from danswer.server.manage.models import UserByEmail from danswer.server.manage.models import UserByEmail
from danswer.server.manage.models import UserInfo from danswer.server.manage.models import UserInfo
@ -367,7 +367,7 @@ def verify_user_logged_in(
# if auth type is disabled, return a dummy user with preferences from # if auth type is disabled, return a dummy user with preferences from
# the key-value store # the key-value store
if AUTH_TYPE == AuthType.DISABLED: if AUTH_TYPE == AuthType.DISABLED:
store = get_dynamic_config_store() store = get_kv_store()
return fetch_no_auth_user(store) return fetch_no_auth_user(store)
raise HTTPException( raise HTTPException(
@ -405,7 +405,7 @@ def update_user_default_model(
) -> None: ) -> None:
if user is None: if user is None:
if AUTH_TYPE == AuthType.DISABLED: if AUTH_TYPE == AuthType.DISABLED:
store = get_dynamic_config_store() store = get_kv_store()
no_auth_user = fetch_no_auth_user(store) no_auth_user = fetch_no_auth_user(store)
no_auth_user.preferences.default_model = request.default_model no_auth_user.preferences.default_model = request.default_model
set_no_auth_user_preferences(store, no_auth_user.preferences) set_no_auth_user_preferences(store, no_auth_user.preferences)
@ -433,7 +433,7 @@ def update_user_assistant_list(
) -> None: ) -> None:
if user is None: if user is None:
if AUTH_TYPE == AuthType.DISABLED: if AUTH_TYPE == AuthType.DISABLED:
store = get_dynamic_config_store() store = get_kv_store()
no_auth_user = fetch_no_auth_user(store) no_auth_user = fetch_no_auth_user(store)
no_auth_user.preferences.chosen_assistants = request.chosen_assistants no_auth_user.preferences.chosen_assistants = request.chosen_assistants
@ -487,7 +487,7 @@ def update_user_assistant_visibility(
) -> None: ) -> None:
if user is None: if user is None:
if AUTH_TYPE == AuthType.DISABLED: if AUTH_TYPE == AuthType.DISABLED:
store = get_dynamic_config_store() store = get_kv_store()
no_auth_user = fetch_no_auth_user(store) no_auth_user = fetch_no_auth_user(store)
preferences = no_auth_user.preferences preferences = no_auth_user.preferences
updated_preferences = update_assistant_list(preferences, assistant_id, show) updated_preferences = update_assistant_list(preferences, assistant_id, show)

View File

@ -19,8 +19,8 @@ from danswer.db.notification import dismiss_notification
from danswer.db.notification import get_notification_by_id from danswer.db.notification import get_notification_by_id
from danswer.db.notification import get_notifications from danswer.db.notification import get_notifications
from danswer.db.notification import update_notification_last_shown from danswer.db.notification import update_notification_last_shown
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.server.settings.models import Notification from danswer.server.settings.models import Notification
from danswer.server.settings.models import Settings from danswer.server.settings.models import Settings
from danswer.server.settings.models import UserSettings from danswer.server.settings.models import UserSettings
@ -58,9 +58,9 @@ def fetch_settings(
user_notifications = get_user_notifications(user, db_session) user_notifications = get_user_notifications(user, db_session)
try: try:
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
needs_reindexing = cast(bool, kv_store.load(KV_REINDEX_KEY)) needs_reindexing = cast(bool, kv_store.load(KV_REINDEX_KEY))
except ConfigNotFoundError: except KvKeyNotFoundError:
needs_reindexing = False needs_reindexing = False
return UserSettings( return UserSettings(
@ -97,7 +97,7 @@ def get_user_notifications(
# Reindexing flag should only be shown to admins, basic users can't trigger it anyway # Reindexing flag should only be shown to admins, basic users can't trigger it anyway
return [] return []
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
try: try:
needs_index = cast(bool, kv_store.load(KV_REINDEX_KEY)) needs_index = cast(bool, kv_store.load(KV_REINDEX_KEY))
if not needs_index: if not needs_index:
@ -105,7 +105,7 @@ def get_user_notifications(
notif_type=NotificationType.REINDEX, db_session=db_session notif_type=NotificationType.REINDEX, db_session=db_session
) )
return [] return []
except ConfigNotFoundError: except KvKeyNotFoundError:
# If something goes wrong and the flag is gone, better to not start a reindexing # If something goes wrong and the flag is gone, better to not start a reindexing
# it's a heavyweight long running job and maybe this flag is cleaned up later # it's a heavyweight long running job and maybe this flag is cleaned up later
logger.warning("Could not find reindex flag") logger.warning("Could not find reindex flag")

View File

@ -1,16 +1,16 @@
from typing import cast from typing import cast
from danswer.configs.constants import KV_SETTINGS_KEY from danswer.configs.constants import KV_SETTINGS_KEY
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.server.settings.models import Settings from danswer.server.settings.models import Settings
def load_settings() -> Settings: def load_settings() -> Settings:
dynamic_config_store = get_dynamic_config_store() dynamic_config_store = get_kv_store()
try: try:
settings = Settings(**cast(dict, dynamic_config_store.load(KV_SETTINGS_KEY))) settings = Settings(**cast(dict, dynamic_config_store.load(KV_SETTINGS_KEY)))
except ConfigNotFoundError: except KvKeyNotFoundError:
settings = Settings() settings = Settings()
dynamic_config_store.store(KV_SETTINGS_KEY, settings.model_dump()) dynamic_config_store.store(KV_SETTINGS_KEY, settings.model_dump())
@ -18,4 +18,4 @@ def load_settings() -> Settings:
def store_settings(settings: Settings) -> None: def store_settings(settings: Settings) -> None:
get_dynamic_config_store().store(KV_SETTINGS_KEY, settings.model_dump()) get_kv_store().store(KV_SETTINGS_KEY, settings.model_dump())

View File

@ -8,7 +8,7 @@ from langchain_core.messages import HumanMessage
from langchain_core.messages import SystemMessage from langchain_core.messages import SystemMessage
from pydantic import BaseModel from pydantic import BaseModel
from danswer.dynamic_configs.interface import JSON_ro from danswer.key_value_store.interface import JSON_ro
from danswer.llm.answering.models import PreviousMessage from danswer.llm.answering.models import PreviousMessage
from danswer.llm.interfaces import LLM from danswer.llm.interfaces import LLM
from danswer.tools.custom.base_tool_types import ToolResultType from danswer.tools.custom.base_tool_types import ToolResultType

View File

@ -9,7 +9,7 @@ from pydantic import BaseModel
from danswer.chat.chat_utils import combine_message_chain from danswer.chat.chat_utils import combine_message_chain
from danswer.configs.model_configs import GEN_AI_HISTORY_CUTOFF from danswer.configs.model_configs import GEN_AI_HISTORY_CUTOFF
from danswer.dynamic_configs.interface import JSON_ro from danswer.key_value_store.interface import JSON_ro
from danswer.llm.answering.models import PreviousMessage from danswer.llm.answering.models import PreviousMessage
from danswer.llm.headers import build_llm_extra_headers from danswer.llm.headers import build_llm_extra_headers
from danswer.llm.interfaces import LLM from danswer.llm.interfaces import LLM

View File

@ -10,7 +10,7 @@ from danswer.chat.chat_utils import combine_message_chain
from danswer.chat.models import LlmDoc from danswer.chat.models import LlmDoc
from danswer.configs.constants import DocumentSource from danswer.configs.constants import DocumentSource
from danswer.configs.model_configs import GEN_AI_HISTORY_CUTOFF from danswer.configs.model_configs import GEN_AI_HISTORY_CUTOFF
from danswer.dynamic_configs.interface import JSON_ro from danswer.key_value_store.interface import JSON_ro
from danswer.llm.answering.models import PreviousMessage from danswer.llm.answering.models import PreviousMessage
from danswer.llm.interfaces import LLM from danswer.llm.interfaces import LLM
from danswer.llm.utils import message_to_string from danswer.llm.utils import message_to_string

View File

@ -16,7 +16,7 @@ from danswer.configs.chat_configs import CONTEXT_CHUNKS_BELOW
from danswer.configs.model_configs import GEN_AI_MODEL_FALLBACK_MAX_TOKENS from danswer.configs.model_configs import GEN_AI_MODEL_FALLBACK_MAX_TOKENS
from danswer.db.models import Persona from danswer.db.models import Persona
from danswer.db.models import User from danswer.db.models import User
from danswer.dynamic_configs.interface import JSON_ro from danswer.key_value_store.interface import JSON_ro
from danswer.llm.answering.models import ContextualPruningConfig from danswer.llm.answering.models import ContextualPruningConfig
from danswer.llm.answering.models import DocumentPruningConfig from danswer.llm.answering.models import DocumentPruningConfig
from danswer.llm.answering.models import PreviousMessage from danswer.llm.answering.models import PreviousMessage

View File

@ -2,7 +2,7 @@ import abc
from collections.abc import Generator from collections.abc import Generator
from typing import Any from typing import Any
from danswer.dynamic_configs.interface import JSON_ro from danswer.key_value_store.interface import JSON_ro
from danswer.llm.answering.models import PreviousMessage from danswer.llm.answering.models import PreviousMessage
from danswer.llm.interfaces import LLM from danswer.llm.interfaces import LLM
from danswer.tools.models import ToolResponse from danswer.tools.models import ToolResponse

View File

@ -12,8 +12,8 @@ from danswer.configs.constants import KV_CUSTOMER_UUID_KEY
from danswer.configs.constants import KV_INSTANCE_DOMAIN_KEY from danswer.configs.constants import KV_INSTANCE_DOMAIN_KEY
from danswer.db.engine import get_sqlalchemy_engine from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.models import User from danswer.db.models import User
from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.key_value_store.factory import get_kv_store
from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.key_value_store.interface import KvKeyNotFoundError
_DANSWER_TELEMETRY_ENDPOINT = "https://telemetry.danswer.ai/anonymous_telemetry" _DANSWER_TELEMETRY_ENDPOINT = "https://telemetry.danswer.ai/anonymous_telemetry"
_CACHED_UUID: str | None = None _CACHED_UUID: str | None = None
@ -34,11 +34,11 @@ def get_or_generate_uuid() -> str:
if _CACHED_UUID is not None: if _CACHED_UUID is not None:
return _CACHED_UUID return _CACHED_UUID
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
try: try:
_CACHED_UUID = cast(str, kv_store.load(KV_CUSTOMER_UUID_KEY)) _CACHED_UUID = cast(str, kv_store.load(KV_CUSTOMER_UUID_KEY))
except ConfigNotFoundError: except KvKeyNotFoundError:
_CACHED_UUID = str(uuid.uuid4()) _CACHED_UUID = str(uuid.uuid4())
kv_store.store(KV_CUSTOMER_UUID_KEY, _CACHED_UUID, encrypt=True) kv_store.store(KV_CUSTOMER_UUID_KEY, _CACHED_UUID, encrypt=True)
@ -51,11 +51,11 @@ def _get_or_generate_instance_domain() -> str | None:
if _CACHED_INSTANCE_DOMAIN is not None: if _CACHED_INSTANCE_DOMAIN is not None:
return _CACHED_INSTANCE_DOMAIN return _CACHED_INSTANCE_DOMAIN
kv_store = get_dynamic_config_store() kv_store = get_kv_store()
try: try:
_CACHED_INSTANCE_DOMAIN = cast(str, kv_store.load(KV_INSTANCE_DOMAIN_KEY)) _CACHED_INSTANCE_DOMAIN = cast(str, kv_store.load(KV_INSTANCE_DOMAIN_KEY))
except ConfigNotFoundError: except KvKeyNotFoundError:
with Session(get_sqlalchemy_engine()) as db_session: with Session(get_sqlalchemy_engine()) as db_session:
first_user = db_session.query(User).first() first_user = db_session.query(User).first()
if first_user: if first_user:

View File

@ -11,9 +11,9 @@ from sqlalchemy.orm import Session
from danswer.configs.constants import FileOrigin from danswer.configs.constants import FileOrigin
from danswer.configs.constants import KV_CUSTOM_ANALYTICS_SCRIPT_KEY from danswer.configs.constants import KV_CUSTOM_ANALYTICS_SCRIPT_KEY
from danswer.configs.constants import KV_ENTERPRISE_SETTINGS_KEY from danswer.configs.constants import KV_ENTERPRISE_SETTINGS_KEY
from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.file_store.file_store import get_default_file_store from danswer.file_store.file_store import get_default_file_store
from danswer.key_value_store.factory import get_kv_store
from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.utils.logger import setup_logger from danswer.utils.logger import setup_logger
from ee.danswer.server.enterprise_settings.models import AnalyticsScriptUpload from ee.danswer.server.enterprise_settings.models import AnalyticsScriptUpload
from ee.danswer.server.enterprise_settings.models import EnterpriseSettings from ee.danswer.server.enterprise_settings.models import EnterpriseSettings
@ -23,12 +23,12 @@ logger = setup_logger()
def load_settings() -> EnterpriseSettings: def load_settings() -> EnterpriseSettings:
dynamic_config_store = get_dynamic_config_store() dynamic_config_store = get_kv_store()
try: try:
settings = EnterpriseSettings( settings = EnterpriseSettings(
**cast(dict, dynamic_config_store.load(KV_ENTERPRISE_SETTINGS_KEY)) **cast(dict, dynamic_config_store.load(KV_ENTERPRISE_SETTINGS_KEY))
) )
except ConfigNotFoundError: except KvKeyNotFoundError:
settings = EnterpriseSettings() settings = EnterpriseSettings()
dynamic_config_store.store(KV_ENTERPRISE_SETTINGS_KEY, settings.model_dump()) dynamic_config_store.store(KV_ENTERPRISE_SETTINGS_KEY, settings.model_dump())
@ -36,17 +36,17 @@ def load_settings() -> EnterpriseSettings:
def store_settings(settings: EnterpriseSettings) -> None: def store_settings(settings: EnterpriseSettings) -> None:
get_dynamic_config_store().store(KV_ENTERPRISE_SETTINGS_KEY, settings.model_dump()) get_kv_store().store(KV_ENTERPRISE_SETTINGS_KEY, settings.model_dump())
_CUSTOM_ANALYTICS_SECRET_KEY = os.environ.get("CUSTOM_ANALYTICS_SECRET_KEY") _CUSTOM_ANALYTICS_SECRET_KEY = os.environ.get("CUSTOM_ANALYTICS_SECRET_KEY")
def load_analytics_script() -> str | None: def load_analytics_script() -> str | None:
dynamic_config_store = get_dynamic_config_store() dynamic_config_store = get_kv_store()
try: try:
return cast(str, dynamic_config_store.load(KV_CUSTOM_ANALYTICS_SCRIPT_KEY)) return cast(str, dynamic_config_store.load(KV_CUSTOM_ANALYTICS_SCRIPT_KEY))
except ConfigNotFoundError: except KvKeyNotFoundError:
return None return None
@ -57,9 +57,7 @@ def store_analytics_script(analytics_script_upload: AnalyticsScriptUpload) -> No
): ):
raise ValueError("Invalid secret key") raise ValueError("Invalid secret key")
get_dynamic_config_store().store( get_kv_store().store(KV_CUSTOM_ANALYTICS_SCRIPT_KEY, analytics_script_upload.script)
KV_CUSTOM_ANALYTICS_SCRIPT_KEY, analytics_script_upload.script
)
_LOGO_FILENAME = "__logo__" _LOGO_FILENAME = "__logo__"

View File

@ -4,6 +4,10 @@ mypy_path = "$MYPY_CONFIG_FILE_DIR"
explicit_package_bases = true explicit_package_bases = true
disallow_untyped_defs = true disallow_untyped_defs = true
[[tool.mypy.overrides]]
module = "alembic.versions.*"
disable_error_code = ["var-annotated"]
[tool.ruff] [tool.ruff]
ignore = [] ignore = []
line-length = 130 line-length = 130