mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-03-18 13:51:46 +01:00
377 lines
15 KiB
Python
377 lines
15 KiB
Python
import time
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from onyx.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
|
|
from onyx.configs.app_configs import MANAGED_VESPA
|
|
from onyx.configs.app_configs import VESPA_NUM_ATTEMPTS_ON_STARTUP
|
|
from onyx.configs.constants import KV_REINDEX_KEY
|
|
from onyx.configs.constants import KV_SEARCH_SETTINGS
|
|
from onyx.configs.model_configs import FAST_GEN_AI_MODEL_VERSION
|
|
from onyx.configs.model_configs import GEN_AI_API_KEY
|
|
from onyx.configs.model_configs import GEN_AI_MODEL_VERSION
|
|
from onyx.context.search.models import SavedSearchSettings
|
|
from onyx.context.search.retrieval.search_runner import (
|
|
download_nltk_data,
|
|
)
|
|
from onyx.db.connector import check_connectors_exist
|
|
from onyx.db.connector import create_initial_default_connector
|
|
from onyx.db.connector_credential_pair import associate_default_cc_pair
|
|
from onyx.db.connector_credential_pair import get_connector_credential_pairs
|
|
from onyx.db.connector_credential_pair import resync_cc_pair
|
|
from onyx.db.credentials import create_initial_public_credential
|
|
from onyx.db.document import check_docs_exist
|
|
from onyx.db.index_attempt import cancel_indexing_attempts_past_model
|
|
from onyx.db.index_attempt import expire_index_attempts
|
|
from onyx.db.llm import fetch_default_provider
|
|
from onyx.db.llm import update_default_provider
|
|
from onyx.db.llm import upsert_llm_provider
|
|
from onyx.db.persona import delete_old_default_personas
|
|
from onyx.db.search_settings import get_active_search_settings
|
|
from onyx.db.search_settings import get_current_search_settings
|
|
from onyx.db.search_settings import get_secondary_search_settings
|
|
from onyx.db.search_settings import update_current_search_settings
|
|
from onyx.db.search_settings import update_secondary_search_settings
|
|
from onyx.db.swap_index import check_index_swap
|
|
from onyx.document_index.factory import get_default_document_index
|
|
from onyx.document_index.interfaces import DocumentIndex
|
|
from onyx.document_index.vespa.index import VespaIndex
|
|
from onyx.indexing.models import IndexingSetting
|
|
from onyx.key_value_store.factory import get_kv_store
|
|
from onyx.key_value_store.interface import KvKeyNotFoundError
|
|
from onyx.llm.llm_provider_options import OPEN_AI_MODEL_NAMES
|
|
from onyx.natural_language_processing.search_nlp_models import EmbeddingModel
|
|
from onyx.natural_language_processing.search_nlp_models import warm_up_bi_encoder
|
|
from onyx.natural_language_processing.search_nlp_models import warm_up_cross_encoder
|
|
from onyx.seeding.load_docs import seed_initial_documents
|
|
from onyx.seeding.load_yamls import load_chat_yamls
|
|
from onyx.server.manage.llm.models import LLMProviderUpsertRequest
|
|
from onyx.server.settings.store import load_settings
|
|
from onyx.server.settings.store import store_settings
|
|
from onyx.tools.built_in_tools import auto_add_search_tool_to_personas
|
|
from onyx.tools.built_in_tools import load_builtin_tools
|
|
from onyx.tools.built_in_tools import refresh_built_in_tools_cache
|
|
from onyx.utils.gpu_utils import gpu_status_request
|
|
from onyx.utils.logger import setup_logger
|
|
from shared_configs.configs import ALT_INDEX_SUFFIX
|
|
from shared_configs.configs import MODEL_SERVER_HOST
|
|
from shared_configs.configs import MODEL_SERVER_PORT
|
|
from shared_configs.configs import MULTI_TENANT
|
|
from shared_configs.configs import SUPPORTED_EMBEDDING_MODELS
|
|
from shared_configs.model_server_models import SupportedEmbeddingModel
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
|
|
def setup_onyx(
|
|
db_session: Session, tenant_id: str, cohere_enabled: bool = False
|
|
) -> None:
|
|
"""
|
|
Setup Onyx for a particular tenant. In the Single Tenant case, it will set it up for the default schema
|
|
on server startup. In the MT case, it will be called when the tenant is created.
|
|
|
|
The Tenant Service calls the tenants/create endpoint which runs this.
|
|
"""
|
|
check_index_swap(db_session=db_session)
|
|
|
|
active_search_settings = get_active_search_settings(db_session)
|
|
search_settings = active_search_settings.primary
|
|
secondary_search_settings = active_search_settings.secondary
|
|
|
|
# search_settings = get_current_search_settings(db_session)
|
|
# multipass_config_1 = get_multipass_config(search_settings)
|
|
|
|
# secondary_large_chunks_enabled: bool | None = None
|
|
# secondary_search_settings = get_secondary_search_settings(db_session)
|
|
# if secondary_search_settings:
|
|
# multipass_config_2 = get_multipass_config(secondary_search_settings)
|
|
# secondary_large_chunks_enabled = multipass_config_2.enable_large_chunks
|
|
|
|
# Break bad state for thrashing indexes
|
|
if secondary_search_settings and DISABLE_INDEX_UPDATE_ON_SWAP:
|
|
expire_index_attempts(
|
|
search_settings_id=search_settings.id, db_session=db_session
|
|
)
|
|
|
|
for cc_pair in get_connector_credential_pairs(db_session):
|
|
resync_cc_pair(cc_pair, db_session=db_session)
|
|
|
|
# Expire all old embedding models indexing attempts, technically redundant
|
|
cancel_indexing_attempts_past_model(db_session)
|
|
|
|
logger.notice(f'Using Embedding model: "{search_settings.model_name}"')
|
|
if search_settings.query_prefix or search_settings.passage_prefix:
|
|
logger.notice(f'Query embedding prefix: "{search_settings.query_prefix}"')
|
|
logger.notice(f'Passage embedding prefix: "{search_settings.passage_prefix}"')
|
|
|
|
if search_settings:
|
|
if not search_settings.disable_rerank_for_streaming:
|
|
logger.notice("Reranking is enabled.")
|
|
|
|
if search_settings.multilingual_expansion:
|
|
logger.notice(
|
|
f"Multilingual query expansion is enabled with {search_settings.multilingual_expansion}."
|
|
)
|
|
if (
|
|
search_settings.rerank_model_name
|
|
and not search_settings.provider_type
|
|
and not search_settings.rerank_provider_type
|
|
):
|
|
warm_up_cross_encoder(search_settings.rerank_model_name)
|
|
|
|
logger.notice("Verifying query preprocessing (NLTK) data is downloaded")
|
|
download_nltk_data()
|
|
|
|
# setup Postgres with default credential, llm providers, etc.
|
|
setup_postgres(db_session)
|
|
|
|
translate_saved_search_settings(db_session)
|
|
|
|
# Does the user need to trigger a reindexing to bring the document index
|
|
# into a good state, marked in the kv store
|
|
if not MULTI_TENANT:
|
|
mark_reindex_flag(db_session)
|
|
|
|
# Ensure Vespa is setup correctly, this step is relatively near the end because Vespa
|
|
# takes a bit of time to start up
|
|
logger.notice("Verifying Document Index(s) is/are available.")
|
|
document_index = get_default_document_index(
|
|
search_settings,
|
|
secondary_search_settings,
|
|
)
|
|
|
|
success = setup_vespa(
|
|
document_index,
|
|
IndexingSetting.from_db_model(search_settings),
|
|
IndexingSetting.from_db_model(secondary_search_settings)
|
|
if secondary_search_settings
|
|
else None,
|
|
)
|
|
if not success:
|
|
raise RuntimeError("Could not connect to Vespa within the specified timeout.")
|
|
|
|
logger.notice(f"Model Server: http://{MODEL_SERVER_HOST}:{MODEL_SERVER_PORT}")
|
|
if search_settings.provider_type is None:
|
|
warm_up_bi_encoder(
|
|
embedding_model=EmbeddingModel.from_db_model(
|
|
search_settings=search_settings,
|
|
server_host=MODEL_SERVER_HOST,
|
|
server_port=MODEL_SERVER_PORT,
|
|
),
|
|
)
|
|
|
|
# update multipass indexing setting based on GPU availability
|
|
update_default_multipass_indexing(db_session)
|
|
|
|
seed_initial_documents(db_session, tenant_id, cohere_enabled)
|
|
|
|
|
|
def translate_saved_search_settings(db_session: Session) -> None:
|
|
kv_store = get_kv_store()
|
|
|
|
try:
|
|
search_settings_dict = kv_store.load(KV_SEARCH_SETTINGS)
|
|
if isinstance(search_settings_dict, dict):
|
|
# Update current search settings
|
|
current_settings = get_current_search_settings(db_session)
|
|
|
|
# Update non-preserved fields
|
|
if current_settings:
|
|
current_settings_dict = SavedSearchSettings.from_db_model(
|
|
current_settings
|
|
).dict()
|
|
|
|
new_current_settings = SavedSearchSettings(
|
|
**{**current_settings_dict, **search_settings_dict}
|
|
)
|
|
update_current_search_settings(db_session, new_current_settings)
|
|
|
|
# Update secondary search settings
|
|
secondary_settings = get_secondary_search_settings(db_session)
|
|
if secondary_settings:
|
|
secondary_settings_dict = SavedSearchSettings.from_db_model(
|
|
secondary_settings
|
|
).dict()
|
|
|
|
new_secondary_settings = SavedSearchSettings(
|
|
**{**secondary_settings_dict, **search_settings_dict}
|
|
)
|
|
update_secondary_search_settings(
|
|
db_session,
|
|
new_secondary_settings,
|
|
)
|
|
# Delete the KV store entry after successful update
|
|
kv_store.delete(KV_SEARCH_SETTINGS)
|
|
logger.notice("Search settings updated and KV store entry deleted.")
|
|
else:
|
|
logger.notice("KV store search settings is empty.")
|
|
except KvKeyNotFoundError:
|
|
logger.notice("No search config found in KV store.")
|
|
|
|
|
|
def mark_reindex_flag(db_session: Session) -> None:
|
|
kv_store = get_kv_store()
|
|
try:
|
|
value = kv_store.load(KV_REINDEX_KEY)
|
|
logger.debug(f"Re-indexing flag has value {value}")
|
|
return
|
|
except KvKeyNotFoundError:
|
|
# Only need to update the flag if it hasn't been set
|
|
pass
|
|
|
|
# If their first deployment is after the changes, it will
|
|
# enable this when the other changes go in, need to avoid
|
|
# this being set to False, then the user indexes things on the old version
|
|
docs_exist = check_docs_exist(db_session)
|
|
connectors_exist = check_connectors_exist(db_session)
|
|
if docs_exist or connectors_exist:
|
|
kv_store.store(KV_REINDEX_KEY, True)
|
|
else:
|
|
kv_store.store(KV_REINDEX_KEY, False)
|
|
|
|
|
|
def setup_vespa(
|
|
document_index: DocumentIndex,
|
|
index_setting: IndexingSetting,
|
|
secondary_index_setting: IndexingSetting | None,
|
|
num_attempts: int = VESPA_NUM_ATTEMPTS_ON_STARTUP,
|
|
) -> bool:
|
|
# Vespa startup is a bit slow, so give it a few seconds
|
|
WAIT_SECONDS = 5
|
|
for x in range(num_attempts):
|
|
try:
|
|
logger.notice(f"Setting up Vespa (attempt {x+1}/{num_attempts})...")
|
|
document_index.ensure_indices_exist(
|
|
index_embedding_dim=index_setting.model_dim,
|
|
secondary_index_embedding_dim=secondary_index_setting.model_dim
|
|
if secondary_index_setting
|
|
else None,
|
|
)
|
|
|
|
logger.notice("Vespa setup complete.")
|
|
return True
|
|
except Exception:
|
|
logger.exception(
|
|
f"Vespa setup did not succeed. The Vespa service may not be ready yet. Retrying in {WAIT_SECONDS} seconds."
|
|
)
|
|
time.sleep(WAIT_SECONDS)
|
|
|
|
logger.error(
|
|
f"Vespa setup did not succeed. Attempt limit reached. ({num_attempts})"
|
|
)
|
|
return False
|
|
|
|
|
|
def setup_postgres(db_session: Session) -> None:
|
|
logger.notice("Verifying default connector/credential exist.")
|
|
create_initial_public_credential(db_session)
|
|
create_initial_default_connector(db_session)
|
|
associate_default_cc_pair(db_session)
|
|
delete_old_default_personas(db_session)
|
|
|
|
logger.notice("Loading built-in tools")
|
|
load_builtin_tools(db_session)
|
|
|
|
logger.notice("Loading default Prompts and Personas")
|
|
load_chat_yamls(db_session)
|
|
|
|
refresh_built_in_tools_cache(db_session)
|
|
auto_add_search_tool_to_personas(db_session)
|
|
|
|
if GEN_AI_API_KEY and fetch_default_provider(db_session) is None:
|
|
# Only for dev flows
|
|
logger.notice("Setting up default OpenAI LLM for dev.")
|
|
|
|
llm_model = GEN_AI_MODEL_VERSION or "gpt-4o-mini"
|
|
fast_model = FAST_GEN_AI_MODEL_VERSION or "gpt-4o-mini"
|
|
model_req = LLMProviderUpsertRequest(
|
|
name="DevEnvPresetOpenAI",
|
|
provider="openai",
|
|
api_key=GEN_AI_API_KEY,
|
|
api_base=None,
|
|
api_version=None,
|
|
custom_config=None,
|
|
default_model_name=llm_model,
|
|
fast_default_model_name=fast_model,
|
|
is_public=True,
|
|
groups=[],
|
|
display_model_names=OPEN_AI_MODEL_NAMES,
|
|
model_names=OPEN_AI_MODEL_NAMES,
|
|
)
|
|
new_llm_provider = upsert_llm_provider(
|
|
llm_provider=model_req, db_session=db_session
|
|
)
|
|
update_default_provider(provider_id=new_llm_provider.id, db_session=db_session)
|
|
|
|
|
|
def update_default_multipass_indexing(db_session: Session) -> None:
|
|
docs_exist = check_docs_exist(db_session)
|
|
connectors_exist = check_connectors_exist(db_session)
|
|
logger.debug(f"Docs exist: {docs_exist}, Connectors exist: {connectors_exist}")
|
|
|
|
if not docs_exist and not connectors_exist:
|
|
logger.info(
|
|
"No existing docs or connectors found. Checking GPU availability for multipass indexing."
|
|
)
|
|
gpu_available = gpu_status_request()
|
|
logger.info(f"GPU available: {gpu_available}")
|
|
|
|
current_settings = get_current_search_settings(db_session)
|
|
|
|
logger.notice(f"Updating multipass indexing setting to: {gpu_available}")
|
|
updated_settings = SavedSearchSettings.from_db_model(current_settings)
|
|
# Enable multipass indexing if GPU is available or if using a cloud provider
|
|
updated_settings.multipass_indexing = (
|
|
gpu_available or current_settings.cloud_provider is not None
|
|
)
|
|
update_current_search_settings(db_session, updated_settings)
|
|
|
|
# Update settings with GPU availability
|
|
settings = load_settings()
|
|
settings.gpu_enabled = gpu_available
|
|
store_settings(settings)
|
|
logger.notice(f"Updated settings with GPU availability: {gpu_available}")
|
|
|
|
else:
|
|
logger.debug(
|
|
"Existing docs or connectors found. Skipping multipass indexing update."
|
|
)
|
|
|
|
|
|
def setup_multitenant_onyx() -> None:
|
|
# For Managed Vespa, the schema is sent over via the Vespa Console manually.
|
|
if not MANAGED_VESPA:
|
|
setup_vespa_multitenant(SUPPORTED_EMBEDDING_MODELS)
|
|
|
|
|
|
def setup_vespa_multitenant(supported_indices: list[SupportedEmbeddingModel]) -> bool:
|
|
# This is for local testing
|
|
WAIT_SECONDS = 5
|
|
VESPA_ATTEMPTS = 5
|
|
for x in range(VESPA_ATTEMPTS):
|
|
try:
|
|
logger.notice(f"Setting up Vespa (attempt {x+1}/{VESPA_ATTEMPTS})...")
|
|
VespaIndex.register_multitenant_indices(
|
|
indices=[index.index_name for index in supported_indices]
|
|
+ [
|
|
f"{index.index_name}{ALT_INDEX_SUFFIX}"
|
|
for index in supported_indices
|
|
],
|
|
embedding_dims=[index.dim for index in supported_indices]
|
|
+ [index.dim for index in supported_indices],
|
|
)
|
|
|
|
logger.notice("Vespa setup complete.")
|
|
return True
|
|
except Exception:
|
|
logger.notice(
|
|
f"Vespa setup did not succeed. The Vespa service may not be ready yet. Retrying in {WAIT_SECONDS} seconds."
|
|
)
|
|
time.sleep(WAIT_SECONDS)
|
|
|
|
logger.error(
|
|
f"Vespa setup did not succeed. Attempt limit reached. ({VESPA_ATTEMPTS})"
|
|
)
|
|
return False
|