From cf4ede2130d702c1d374c850d6de26e546560556 Mon Sep 17 00:00:00 2001 From: Yuhong Sun Date: Fri, 26 Jan 2024 18:40:53 -0800 Subject: [PATCH] Embedding Models Table (#1006) --- .../versions/dbaa756c2ccf_embedding_models.py | 70 +++++++++ backend/danswer/background/celery/celery.py | 36 ++--- .../danswer/background/connector_deletion.py | 9 +- .../background/indexing/run_indexing.py | 3 +- backend/danswer/chat/process_message.py | 7 +- backend/danswer/configs/constants.py | 6 - .../slack/handlers/handle_buttons.py | 8 +- backend/danswer/db/embedding_model.py | 43 ++++++ backend/danswer/db/feedback.py | 10 +- backend/danswer/db/models.py | 45 ++++++ .../document_index/document_index_utils.py | 66 +++++---- backend/danswer/document_index/factory.py | 12 +- backend/danswer/document_index/interfaces.py | 26 ++-- backend/danswer/document_index/vespa/index.py | 138 ++++++++---------- backend/danswer/indexing/indexing_pipeline.py | 14 +- backend/danswer/indexing/models.py | 10 ++ backend/danswer/main.py | 33 ++++- .../one_shot_answer/answer_question.py | 6 +- backend/danswer/search/search_runner.py | 5 - .../danswer/server/danswer_api/ingestion.py | 21 ++- backend/danswer/server/documents/document.py | 10 +- backend/danswer/server/gpts/api.py | 7 +- .../danswer/server/manage/administrative.py | 15 +- backend/danswer/server/manage/models.py | 4 + .../danswer/server/manage/secondary_index.py | 33 +++++ .../server/query_and_chat/chat_backend.py | 9 +- .../server/query_and_chat/query_backend.py | 10 +- backend/danswer/utils/acl.py | 28 ++-- .../regression/search_quality/eval_search.py | 12 +- 29 files changed, 487 insertions(+), 209 deletions(-) create mode 100644 backend/alembic/versions/dbaa756c2ccf_embedding_models.py create mode 100644 backend/danswer/db/embedding_model.py create mode 100644 backend/danswer/server/manage/secondary_index.py diff --git a/backend/alembic/versions/dbaa756c2ccf_embedding_models.py b/backend/alembic/versions/dbaa756c2ccf_embedding_models.py new file mode 100644 index 000000000..421356645 --- /dev/null +++ b/backend/alembic/versions/dbaa756c2ccf_embedding_models.py @@ -0,0 +1,70 @@ +"""Embedding Models + +Revision ID: dbaa756c2ccf +Revises: 7f726bad5367 +Create Date: 2024-01-25 17:12:31.813160 + +""" +from alembic import op +import sqlalchemy as sa + +from danswer.db.models import IndexModelStatus + +# revision identifiers, used by Alembic. +revision = "dbaa756c2ccf" +down_revision = "7f726bad5367" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "embedding_model", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("model_name", sa.String(), nullable=False), + sa.Column("model_dim", sa.Integer(), nullable=False), + sa.Column("normalize", sa.Boolean(), nullable=False), + sa.Column("query_prefix", sa.String(), nullable=False), + sa.Column("passage_prefix", sa.String(), nullable=False), + sa.Column( + "status", + sa.Enum(IndexModelStatus, native=False), + nullable=False, + ), + sa.PrimaryKeyConstraint("id"), + ) + op.add_column( + "index_attempt", + sa.Column("embedding_model_id", sa.Integer(), nullable=True), + ) + op.create_foreign_key( + "index_attempt__embedding_model_fk", + "index_attempt", + "embedding_model", + ["embedding_model_id"], + ["id"], + ) + op.create_index( + "ix_embedding_model_present_unique", + "embedding_model", + ["status"], + unique=True, + postgresql_where=sa.text("status = 'PRESENT'"), + ) + op.create_index( + "ix_embedding_model_future_unique", + "embedding_model", + ["status"], + unique=True, + postgresql_where=sa.text("status = 'FUTURE'"), + ) + + +def downgrade() -> None: + op.drop_constraint( + "index_attempt__embedding_model_fk", "index_attempt", type_="foreignkey" + ) + op.drop_column("index_attempt", "embedding_model_id") + op.drop_table("embedding_model") + op.drop_index("ix_embedding_model_present_unique", table_name="embedding_model") + op.drop_index("ix_embedding_model_future_unique", table_name="embedding_model") diff --git a/backend/danswer/background/celery/celery.py b/backend/danswer/background/celery/celery.py index 90706feb6..80a8a2a13 100644 --- a/backend/danswer/background/celery/celery.py +++ b/backend/danswer/background/celery/celery.py @@ -30,7 +30,6 @@ from danswer.db.tasks import check_live_task_not_timed_out from danswer.db.tasks import get_latest_task from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index -from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import UpdateRequest from danswer.utils.batching import batch_generator from danswer.utils.logger import setup_logger @@ -79,9 +78,13 @@ def cleanup_connector_credential_pair_task( try: # The bulk of the work is in here, updates Postgres and Vespa + curr_ind_name, sec_ind_name = get_both_index_names(db_session) + document_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + ) return delete_connector_credential_pair( db_session=db_session, - document_index=get_default_document_index(), + document_index=document_index, cc_pair=cc_pair, ) except Exception as e: @@ -95,9 +98,7 @@ def sync_document_set_task(document_set_id: int) -> None: """For document sets marked as not up to date, sync the state from postgres into the datastore. Also handles deletions.""" - def _sync_document_batch( - document_ids: list[str], document_index: DocumentIndex - ) -> None: + def _sync_document_batch(document_ids: list[str]) -> None: logger.debug(f"Syncing document sets for: {document_ids}") # begin a transaction, release lock at the end with Session(get_sqlalchemy_engine()) as db_session: @@ -115,21 +116,21 @@ def sync_document_set_task(document_set_id: int) -> None: } # update Vespa - for index_name in get_both_index_names(): - update_requests = [ - UpdateRequest( - document_ids=[document_id], - document_sets=set(document_set_map.get(document_id, [])), - ) - for document_id in document_ids - ] - document_index.update( - update_requests=update_requests, index_name=index_name + curr_ind_name, sec_ind_name = get_both_index_names(db_session) + document_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + ) + update_requests = [ + UpdateRequest( + document_ids=[document_id], + document_sets=set(document_set_map.get(document_id, [])), ) + for document_id in document_ids + ] + document_index.update(update_requests=update_requests) with Session(get_sqlalchemy_engine()) as db_session: try: - document_index = get_default_document_index() documents_to_update = fetch_documents_for_document_set( document_set_id=document_set_id, db_session=db_session, @@ -139,8 +140,7 @@ def sync_document_set_task(document_set_id: int) -> None: documents_to_update, _SYNC_BATCH_SIZE ): _sync_document_batch( - document_ids=[document.id for document in document_batch], - document_index=document_index, + document_ids=[document.id for document in document_batch] ) # if there are no connectors, then delete the document set. Otherwise, just diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py index cd7f35bcf..845850144 100644 --- a/backend/danswer/background/connector_deletion.py +++ b/backend/danswer/background/connector_deletion.py @@ -31,7 +31,6 @@ from danswer.db.document_set import ( from danswer.db.engine import get_sqlalchemy_engine from danswer.db.index_attempt import delete_index_attempts from danswer.db.models import ConnectorCredentialPair -from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import UpdateRequest from danswer.server.documents.models import ConnectorCredentialPairIdentifier @@ -63,8 +62,7 @@ def _delete_connector_credential_pair_batch( ] logger.debug(f"Deleting documents: {document_ids_to_delete}") - for index_name in get_both_index_names(): - document_index.delete(doc_ids=document_ids_to_delete, index_name=index_name) + document_index.delete(doc_ids=document_ids_to_delete) delete_documents_complete( db_session=db_session, @@ -92,10 +90,7 @@ def _delete_connector_credential_pair_batch( ] logger.debug(f"Updating documents: {document_ids_to_update}") - for index_name in get_both_index_names(): - document_index.update( - update_requests=update_requests, index_name=index_name - ) + document_index.update(update_requests=update_requests) delete_document_by_connector_credential_pair( db_session=db_session, diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index 3bcb367b4..496e6a155 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -26,7 +26,6 @@ from danswer.db.index_attempt import mark_attempt_succeeded from danswer.db.index_attempt import update_docs_indexed from danswer.db.models import IndexAttempt from danswer.db.models import IndexingStatus -from danswer.document_index.document_index_utils import get_index_name from danswer.indexing.indexing_pipeline import build_indexing_pipeline from danswer.utils.logger import IndexAttemptSingleton from danswer.utils.logger import setup_logger @@ -104,7 +103,7 @@ def _run_indexing( ) # TODO UPDATE THIS FOR SECONDARY INDEXING - indexing_pipeline = build_indexing_pipeline(index_name=get_index_name()) + indexing_pipeline = build_indexing_pipeline() db_connector = index_attempt.connector db_credential = index_attempt.credential diff --git a/backend/danswer/chat/process_message.py b/backend/danswer/chat/process_message.py index 31073da3a..44b0c022d 100644 --- a/backend/danswer/chat/process_message.py +++ b/backend/danswer/chat/process_message.py @@ -35,6 +35,7 @@ from danswer.db.chat import translate_db_search_doc_to_server_search_doc from danswer.db.models import ChatMessage from danswer.db.models import SearchDoc as DbSearchDoc from danswer.db.models import User +from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.factory import get_default_document_index from danswer.indexing.models import InferenceChunk from danswer.llm.exceptions import GenAIDisabledException @@ -187,7 +188,9 @@ def stream_chat_message( llm = None llm_tokenizer = get_default_llm_token_encode() - document_index = get_default_document_index() + document_index = get_default_document_index( + primary_index_name=get_index_name(db_session), secondary_index_name=None + ) # Every chat Session begins with an empty root message root_message = get_or_create_root_message( @@ -256,7 +259,7 @@ def stream_chat_message( # May extend to include chunk ranges llm_docs: list[LlmDoc] = inference_documents_from_ids( doc_identifiers=identifier_tuples, - document_index=get_default_document_index(), + document_index=document_index, ) doc_id_to_rank_map = map_document_id_order( cast(list[InferenceChunk | LlmDoc], llm_docs) diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index ea5628fa1..0151634ed 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -47,12 +47,6 @@ SECTION_SEPARATOR = "\n\n" # For combining attributes, doesn't have to be unique/perfect to work INDEX_SEPARATOR = "===" -# Index Related -CURRENT_EMBEDDING_MODEL = "CURRENT_EMBEDDING_MODEL" -CURRENT_EMBEDDING_DIM = "CURRENT_EMBEDDING_DIM" -UPCOMING_EMBEDDING_MODEL = "UPCOMING_EMBEDDING_MODEL" -UPCOMING_EMBEDDING_DIM = "UPCOMING_EMBEDDING_DIM" - # Messages DISABLED_GEN_AI_MSG = ( diff --git a/backend/danswer/danswerbot/slack/handlers/handle_buttons.py b/backend/danswer/danswerbot/slack/handlers/handle_buttons.py index aac0df96e..0a88c0b96 100644 --- a/backend/danswer/danswerbot/slack/handlers/handle_buttons.py +++ b/backend/danswer/danswerbot/slack/handlers/handle_buttons.py @@ -27,6 +27,7 @@ from danswer.danswerbot.slack.utils import update_emote_react from danswer.db.engine import get_sqlalchemy_engine from danswer.db.feedback import create_chat_message_feedback from danswer.db.feedback import create_doc_retrieval_feedback +from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index from danswer.utils.logger import setup_logger @@ -102,11 +103,16 @@ def handle_slack_feedback( else: feedback = SearchFeedbackType.HIDE + curr_ind_name, sec_ind_name = get_both_index_names(db_session) + document_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + ) + create_doc_retrieval_feedback( message_id=message_id, document_id=doc_id, document_rank=doc_rank, - document_index=get_default_document_index(), + document_index=document_index, db_session=db_session, clicked=False, # Not tracking this for Slack feedback=feedback, diff --git a/backend/danswer/db/embedding_model.py b/backend/danswer/db/embedding_model.py new file mode 100644 index 000000000..abacc9c85 --- /dev/null +++ b/backend/danswer/db/embedding_model.py @@ -0,0 +1,43 @@ +from sqlalchemy import select +from sqlalchemy.orm import Session + +from danswer.db.models import EmbeddingModel +from danswer.db.models import IndexModelStatus +from danswer.indexing.models import EmbeddingModelDetail +from danswer.utils.logger import setup_logger + +logger = setup_logger() + + +def create_embedding_model( + model_details: EmbeddingModelDetail, + db_session: Session, + status: IndexModelStatus = IndexModelStatus.FUTURE, +) -> EmbeddingModel: + embedding_model = EmbeddingModel( + model_name=model_details.model_name, + model_dim=model_details.model_dim, + normalize=model_details.normalize, + query_prefix=model_details.query_prefix, + passage_prefix=model_details.passage_prefix, + status=status, + ) + + db_session.add(embedding_model) + db_session.commit() + + return embedding_model + + +def get_latest_embedding_model_by_status( + status: IndexModelStatus, db_session: Session +) -> EmbeddingModel | None: + query = ( + select(EmbeddingModel) + .where(EmbeddingModel.status == status) + .order_by(EmbeddingModel.id.desc()) + ) + result = db_session.execute(query) + latest_model = result.scalars().first() + + return latest_model diff --git a/backend/danswer/db/feedback.py b/backend/danswer/db/feedback.py index ad7381375..474e46a3a 100644 --- a/backend/danswer/db/feedback.py +++ b/backend/danswer/db/feedback.py @@ -12,7 +12,6 @@ from danswer.db.chat import get_chat_message from danswer.db.models import ChatMessageFeedback from danswer.db.models import Document as DbDocument from danswer.db.models import DocumentRetrievalFeedback -from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import UpdateRequest @@ -58,8 +57,7 @@ def update_document_boost( boost=boost, ) - for index_name in get_both_index_names(): - document_index.update(update_requests=[update], index_name=index_name) + document_index.update(update_requests=[update]) db_session.commit() @@ -79,8 +77,7 @@ def update_document_hidden( hidden=hidden, ) - for index_name in get_both_index_names(): - document_index.update(update_requests=[update], index_name=index_name) + document_index.update(update_requests=[update]) db_session.commit() @@ -126,8 +123,7 @@ def create_doc_retrieval_feedback( document_ids=[document_id], boost=db_doc.boost, hidden=db_doc.hidden ) # Updates are generally batched for efficiency, this case only 1 doc/value is updated - for index_name in get_both_index_names(): - document_index.update(update_requests=[update], index_name=index_name) + document_index.update(update_requests=[update]) db_session.add(retrieval_feedback) db_session.commit() diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py index fbbfa8dd1..fcc46682d 100644 --- a/backend/danswer/db/models.py +++ b/backend/danswer/db/models.py @@ -62,6 +62,12 @@ class TaskStatus(str, PyEnum): FAILURE = "FAILURE" +class IndexModelStatus(str, PyEnum): + PAST = "PAST" + PRESENT = "PRESENT" + FUTURE = "FUTURE" + + class Base(DeclarativeBase): pass @@ -363,6 +369,37 @@ class Credential(Base): user: Mapped[User | None] = relationship("User", back_populates="credentials") +class EmbeddingModel(Base): + __tablename__ = "embedding_model" + # ID is used also to indicate the order that the models are configured by the admin + id: Mapped[int] = mapped_column(primary_key=True) + model_name: Mapped[str] = mapped_column(String) + model_dim: Mapped[int] = mapped_column(Integer) + normalize: Mapped[bool] = mapped_column(Boolean) + query_prefix: Mapped[str] = mapped_column(String) + passage_prefix: Mapped[str] = mapped_column(String) + status: Mapped[IndexModelStatus] = mapped_column(Enum(IndexModelStatus)) + + index_attempts: Mapped[List["IndexAttempt"]] = relationship( + "IndexAttempt", back_populates="embedding_model" + ) + + __table_args__ = ( + Index( + "ix_embedding_model_present_unique", + "status", + unique=True, + postgresql_where=(status == IndexModelStatus.PRESENT), + ), + Index( + "ix_embedding_model_future_unique", + "status", + unique=True, + postgresql_where=(status == IndexModelStatus.FUTURE), + ), + ) + + class IndexAttempt(Base): """ Represents an attempt to index a group of 1 or more documents from a @@ -387,6 +424,11 @@ class IndexAttempt(Base): error_msg: Mapped[str | None] = mapped_column( Text, default=None ) # only filled if status = "failed" + # Nullable because in the past, we didn't allow swapping out embedding models live + embedding_model_id: Mapped[int | None] = mapped_column( + ForeignKey("embedding_model.id"), + nullable=True, + ) time_created: Mapped[datetime.datetime] = mapped_column( DateTime(timezone=True), server_default=func.now(), @@ -408,6 +450,9 @@ class IndexAttempt(Base): credential: Mapped[Credential] = relationship( "Credential", back_populates="index_attempts" ) + embedding_model: Mapped[EmbeddingModel] = relationship( + "EmbeddingModel", back_populates="index_attempts" + ) __table_args__ = ( Index( diff --git a/backend/danswer/document_index/document_index_utils.py b/backend/danswer/document_index/document_index_utils.py index ae10bccf9..dd369b499 100644 --- a/backend/danswer/document_index/document_index_utils.py +++ b/backend/danswer/document_index/document_index_utils.py @@ -1,11 +1,10 @@ import math import uuid -from typing import cast -from danswer.configs.constants import CURRENT_EMBEDDING_MODEL -from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL -from danswer.dynamic_configs import get_dynamic_config_store -from danswer.dynamic_configs.interface import ConfigNotFoundError +from sqlalchemy.orm import Session + +from danswer.db.embedding_model import get_latest_embedding_model_by_status +from danswer.db.models import IndexModelStatus from danswer.indexing.models import IndexChunk from danswer.indexing.models import InferenceChunk @@ -17,34 +16,43 @@ def clean_model_name(model_str: str) -> str: return model_str.replace("/", "_").replace("-", "_").replace(".", "_") -def get_index_name(secondary_index: bool = False) -> str: - # TODO make this more efficient if needed - kv_store = get_dynamic_config_store() - if not secondary_index: - try: - embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL)) - return f"danswer_chunk_{clean_model_name(embed_model)}" - except ConfigNotFoundError: - return "danswer_chunk" +def get_index_name( + db_session: Session, + secondary_index: bool = False, +) -> str: + if secondary_index: + model = get_latest_embedding_model_by_status( + status=IndexModelStatus.FUTURE, db_session=db_session + ) + if model is None: + raise RuntimeError("No secondary index being built") + return f"danswer_chunk_{clean_model_name(model.model_name)}" - embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL)) - return f"danswer_chunk_{clean_model_name(embed_model)}" + model = get_latest_embedding_model_by_status( + status=IndexModelStatus.PRESENT, db_session=db_session + ) + if not model: + return "danswer_chunk" + return f"danswer_chunk_{clean_model_name(model.model_name)}" -def get_both_index_names() -> list[str]: - kv_store = get_dynamic_config_store() - try: - embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL)) - indices = [f"danswer_chunk_{clean_model_name(embed_model)}"] - except ConfigNotFoundError: - indices = ["danswer_chunk"] +def get_both_index_names(db_session: Session) -> tuple[str, str | None]: + model = get_latest_embedding_model_by_status( + status=IndexModelStatus.PRESENT, db_session=db_session + ) + curr_index = ( + "danswer_chunk" + if not model + else f"danswer_chunk_{clean_model_name(model.model_name)}" + ) - try: - embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL)) - indices.append(f"danswer_chunk_{clean_model_name(embed_model)}") - return indices - except ConfigNotFoundError: - return indices + model_new = get_latest_embedding_model_by_status( + status=IndexModelStatus.FUTURE, db_session=db_session + ) + if not model_new: + return curr_index, None + + return curr_index, f"danswer_chunk_{clean_model_name(model_new.model_name)}" def translate_boost_count_to_multiplier(boost: int) -> float: diff --git a/backend/danswer/document_index/factory.py b/backend/danswer/document_index/factory.py index 40a44cf6a..17701d98e 100644 --- a/backend/danswer/document_index/factory.py +++ b/backend/danswer/document_index/factory.py @@ -2,6 +2,14 @@ from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.vespa.index import VespaIndex -def get_default_document_index() -> DocumentIndex: +def get_default_document_index( + primary_index_name: str, + secondary_index_name: str | None, +) -> DocumentIndex: + """Primary index is the index that is used for querying/updating etc. + Secondary index is for when both the currently used index and the upcoming + index both need to be updated, updates are applied to both indices""" # Currently only supporting Vespa - return VespaIndex() + return VespaIndex( + index_name=primary_index_name, secondary_index_name=secondary_index_name + ) diff --git a/backend/danswer/document_index/interfaces.py b/backend/danswer/document_index/interfaces.py index 1943f2369..51fe0366d 100644 --- a/backend/danswer/document_index/interfaces.py +++ b/backend/danswer/document_index/interfaces.py @@ -4,7 +4,6 @@ from datetime import datetime from typing import Any from danswer.access.models import DocumentAccess -from danswer.configs.model_configs import DOC_EMBEDDING_DIM from danswer.indexing.models import DocMetadataAwareIndexChunk from danswer.indexing.models import InferenceChunk from danswer.search.models import IndexFilters @@ -46,12 +45,23 @@ class UpdateRequest: class Verifiable(abc.ABC): @abc.abstractmethod - def __init__(self, index_name: str, *args: Any, **kwargs: Any) -> None: + def __init__( + self, + index_name: str, + secondary_index_name: str | None, + *args: Any, + **kwargs: Any + ) -> None: super().__init__(*args, **kwargs) self.index_name = index_name + self.secondary_index_name = secondary_index_name @abc.abstractmethod - def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None: + def ensure_indices_exist( + self, + index_embedding_dim: int, + secondary_index_embedding_dim: int | None, + ) -> None: raise NotImplementedError @@ -60,7 +70,6 @@ class Indexable(abc.ABC): def index( self, chunks: list[DocMetadataAwareIndexChunk], - index_name: str, ) -> set[DocumentInsertionRecord]: """Indexes document chunks into the Document Index and return the IDs of all the documents indexed""" raise NotImplementedError @@ -68,14 +77,14 @@ class Indexable(abc.ABC): class Deletable(abc.ABC): @abc.abstractmethod - def delete(self, doc_ids: list[str], index_name: str) -> None: + def delete(self, doc_ids: list[str]) -> None: """Removes the specified documents from the Index""" raise NotImplementedError class Updatable(abc.ABC): @abc.abstractmethod - def update(self, update_requests: list[UpdateRequest], index_name: str) -> None: + def update(self, update_requests: list[UpdateRequest]) -> None: """Updates metadata for the specified documents sets in the Index""" raise NotImplementedError @@ -87,7 +96,6 @@ class IdRetrievalCapable(abc.ABC): document_id: str, chunk_ind: int | None, filters: IndexFilters, - index_name: str, ) -> list[InferenceChunk]: raise NotImplementedError @@ -99,7 +107,6 @@ class KeywordCapable(abc.ABC): query: str, filters: IndexFilters, time_decay_multiplier: float, - index_name: str, num_to_retrieve: int, offset: int = 0, ) -> list[InferenceChunk]: @@ -113,7 +120,6 @@ class VectorCapable(abc.ABC): query: str, filters: IndexFilters, time_decay_multiplier: float, - index_name: str, num_to_retrieve: int, offset: int = 0, ) -> list[InferenceChunk]: @@ -128,7 +134,6 @@ class HybridCapable(abc.ABC): filters: IndexFilters, time_decay_multiplier: float, num_to_retrieve: int, - index_name: str, offset: int = 0, hybrid_alpha: float | None = None, ) -> list[InferenceChunk]: @@ -141,7 +146,6 @@ class AdminCapable(abc.ABC): self, query: str, filters: IndexFilters, - index_name: str, num_to_retrieve: int, offset: int = 0, ) -> list[InferenceChunk]: diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index fef57f862..192af61b7 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -20,7 +20,6 @@ import requests from retry import retry from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION -from danswer.configs.app_configs import VESPA_DEPLOYMENT_ZIP from danswer.configs.app_configs import VESPA_HOST from danswer.configs.app_configs import VESPA_PORT from danswer.configs.app_configs import VESPA_TENANT_PORT @@ -34,8 +33,6 @@ from danswer.configs.constants import BLURB from danswer.configs.constants import BOOST from danswer.configs.constants import CHUNK_ID from danswer.configs.constants import CONTENT -from danswer.configs.constants import CURRENT_EMBEDDING_DIM -from danswer.configs.constants import CURRENT_EMBEDDING_MODEL from danswer.configs.constants import DOC_UPDATED_AT from danswer.configs.constants import DOCUMENT_ID from danswer.configs.constants import DOCUMENT_SETS @@ -55,21 +52,15 @@ from danswer.configs.constants import SOURCE_TYPE from danswer.configs.constants import TITLE from danswer.configs.constants import TITLE_EMBEDDING from danswer.configs.constants import TITLE_SEPARATOR -from danswer.configs.constants import UPCOMING_EMBEDDING_DIM -from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL -from danswer.configs.model_configs import DOC_EMBEDDING_DIM from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF from danswer.connectors.cross_connector_utils.miscellaneous_utils import ( get_experts_stores_representations, ) -from danswer.document_index.document_index_utils import clean_model_name from danswer.document_index.document_index_utils import get_uuid_from_chunk from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import DocumentInsertionRecord from danswer.document_index.interfaces import UpdateRequest from danswer.document_index.vespa.utils import remove_invalid_unicode_chars -from danswer.dynamic_configs import get_dynamic_config_store -from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.indexing.models import DocMetadataAwareIndexChunk from danswer.indexing.models import InferenceChunk from danswer.search.models import IndexFilters @@ -621,9 +612,11 @@ def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO: return zip_buffer -def _create_document_xml_lines(doc_names: list[str]) -> str: +def _create_document_xml_lines(doc_names: list[str | None]) -> str: doc_lines = [ - f'' for doc_name in doc_names + f'' + for doc_name in doc_names + if doc_name ] return "\n".join(doc_lines) @@ -650,17 +643,15 @@ class VespaIndex(DocumentIndex): f"from {{index_name}} where " ) - def __init__(self, deployment_zip: str = VESPA_DEPLOYMENT_ZIP) -> None: - # Vespa index name isn't configurable via code alone because of the config .sd file that needs - # to be updated + zipped + deployed, not supporting the option for simplicity - self.deployment_zip = deployment_zip + def __init__(self, index_name: str, secondary_index_name: str | None) -> None: + self.index_name = index_name + self.secondary_index_name = secondary_index_name - def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None: - """Verifying indices is more involved as there is no good way to - verify the deployed app against the zip locally. But deploying the latest app.zip will ensure that - the index is up-to-date with the expected schema and this does not erase the existing index. - If the changes cannot be applied without conflict with existing data, it will fail with a non 200 - """ + def ensure_indices_exist( + self, + index_embedding_dim: int, + secondary_index_embedding_dim: int | None, + ) -> None: deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate" logger.debug(f"Sending Vespa zip to {deploy_url}") @@ -670,30 +661,12 @@ class VespaIndex(DocumentIndex): schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd") services_file = os.path.join(vespa_schema_path, "services.xml") - kv_store = get_dynamic_config_store() - try: - curr_embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL)) - schema_name = f"danswer_chunk_{clean_model_name(curr_embed_model)}" - embedding_dim = cast(int, kv_store.load(CURRENT_EMBEDDING_DIM)) - except ConfigNotFoundError: - schema_name = "danswer_chunk" - - doc_names = [schema_name] - - try: - upcoming_embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL)) - upcoming_schema_name = ( - f"danswer_chunk_{clean_model_name(upcoming_embed_model)}" - ) - upcoming_embedding_dim = cast(int, kv_store.load(UPCOMING_EMBEDDING_DIM)) - doc_names.append(upcoming_schema_name) - except ConfigNotFoundError: - upcoming_schema_name = None - with open(services_file, "r") as services_f: services_template = services_f.read() - doc_lines = _create_document_xml_lines(doc_names) + schema_names = [self.index_name, self.secondary_index_name] + + doc_lines = _create_document_xml_lines(schema_names) services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines) zip_dict = { @@ -704,17 +677,15 @@ class VespaIndex(DocumentIndex): schema_template = schema_f.read() schema = schema_template.replace( - DANSWER_CHUNK_REPLACEMENT_PAT, schema_name - ).replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim)) - zip_dict[f"schemas/{schema_name}.sd"] = schema.encode("utf-8") + DANSWER_CHUNK_REPLACEMENT_PAT, self.index_name + ).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim)) + zip_dict[f"schemas/{schema_names[0]}.sd"] = schema.encode("utf-8") - if upcoming_schema_name: + if self.secondary_index_name: upcoming_schema = schema_template.replace( - DANSWER_CHUNK_REPLACEMENT_PAT, upcoming_schema_name - ).replace(VESPA_DIM_REPLACEMENT_PAT, str(upcoming_embedding_dim)) - zip_dict[f"schemas/{upcoming_schema_name}.sd"] = upcoming_schema.encode( - "utf-8" - ) + DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name + ).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim)) + zip_dict[f"schemas/{schema_names[1]}.sd"] = upcoming_schema.encode("utf-8") zip_file = in_memory_zip_from_file_bytes(zip_dict) @@ -728,9 +699,9 @@ class VespaIndex(DocumentIndex): def index( self, chunks: list[DocMetadataAwareIndexChunk], - index_name: str, ) -> set[DocumentInsertionRecord]: - return _clear_and_index_vespa_chunks(chunks=chunks, index_name=index_name) + # IMPORTANT: This must be done one index at a time, do not use secondary index here + return _clear_and_index_vespa_chunks(chunks=chunks, index_name=self.index_name) @staticmethod def _apply_updates_batched( @@ -774,7 +745,7 @@ class VespaIndex(DocumentIndex): failure_msg = f"Failed to update document: {future_to_document_id[future]}" raise requests.HTTPError(failure_msg) from e - def update(self, update_requests: list[UpdateRequest], index_name: str) -> None: + def update(self, update_requests: list[UpdateRequest]) -> None: logger.info(f"Updating {len(update_requests)} documents in Vespa") start = time.time() @@ -802,50 +773,61 @@ class VespaIndex(DocumentIndex): logger.error("Update request received but nothing to update") continue - for document_id in update_request.document_ids: - for doc_chunk_id in _get_vespa_chunk_ids_by_document_id( - document_id=document_id, index_name=index_name - ): - processed_updates_requests.append( - _VespaUpdateRequest( - document_id=document_id, - url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}", - update_request=update_dict, + index_names = [self.index_name] + if self.secondary_index_name: + index_names.append(self.secondary_index_name) + + for index_name in index_names: + for document_id in update_request.document_ids: + for doc_chunk_id in _get_vespa_chunk_ids_by_document_id( + document_id=document_id, index_name=index_name + ): + processed_updates_requests.append( + _VespaUpdateRequest( + document_id=document_id, + url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}", + update_request=update_dict, + ) ) - ) self._apply_updates_batched(processed_updates_requests) logger.info( "Finished updating Vespa documents in %s seconds", time.time() - start ) - def delete(self, doc_ids: list[str], index_name: str) -> None: + def delete(self, doc_ids: list[str]) -> None: logger.info(f"Deleting {len(doc_ids)} documents from Vespa") # NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for # indexing / updates / deletes since we have to make a large volume of requests. with httpx.Client(http2=True) as http_client: - _delete_vespa_docs( - document_ids=doc_ids, index_name=index_name, http_client=http_client - ) + index_names = [self.index_name] + if self.secondary_index_name: + index_names.append(self.secondary_index_name) + + for index_name in index_names: + _delete_vespa_docs( + document_ids=doc_ids, index_name=index_name, http_client=http_client + ) def id_based_retrieval( self, document_id: str, chunk_ind: int | None, filters: IndexFilters, - index_name: str, ) -> list[InferenceChunk]: if chunk_ind is None: vespa_chunk_ids = _get_vespa_chunk_ids_by_document_id( - document_id=document_id, index_name=index_name, index_filters=filters + document_id=document_id, + index_name=self.index_name, + index_filters=filters, ) if not vespa_chunk_ids: return [] functions_with_args: list[tuple[Callable, tuple]] = [ - (_inference_chunk_by_vespa_id, (vespa_chunk_id, index_name)) + (_inference_chunk_by_vespa_id, (vespa_chunk_id, self.index_name)) for vespa_chunk_id in vespa_chunk_ids ] @@ -861,7 +843,7 @@ class VespaIndex(DocumentIndex): else: filters_str = _build_vespa_filters(filters=filters, include_hidden=True) yql = ( - VespaIndex.yql_base.format(index_name=index_name) + VespaIndex.yql_base.format(index_name=self.index_name) + filters_str + f"({DOCUMENT_ID} contains '{document_id}' and {CHUNK_ID} contains '{chunk_ind}')" ) @@ -872,7 +854,6 @@ class VespaIndex(DocumentIndex): query: str, filters: IndexFilters, time_decay_multiplier: float, - index_name: str, num_to_retrieve: int = NUM_RETURNED_HITS, offset: int = 0, edit_keyword_query: bool = EDIT_KEYWORD_QUERY, @@ -880,7 +861,7 @@ class VespaIndex(DocumentIndex): # IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY vespa_where_clauses = _build_vespa_filters(filters) yql = ( - VespaIndex.yql_base.format(index_name=index_name) + VespaIndex.yql_base.format(index_name=self.index_name) + vespa_where_clauses # `({defaultIndex: "content_summary"}userInput(@query))` section is # needed for highlighting while the N-gram highlighting is broken / @@ -908,7 +889,6 @@ class VespaIndex(DocumentIndex): query: str, filters: IndexFilters, time_decay_multiplier: float, - index_name: str, num_to_retrieve: int = NUM_RETURNED_HITS, offset: int = 0, distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF, @@ -917,7 +897,7 @@ class VespaIndex(DocumentIndex): # IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY vespa_where_clauses = _build_vespa_filters(filters) yql = ( - VespaIndex.yql_base.format(index_name=index_name) + VespaIndex.yql_base.format(index_name=self.index_name) + vespa_where_clauses + f"(({{targetHits: {10 * num_to_retrieve}}}nearestNeighbor(embeddings, query_embedding)) " # `({defaultIndex: "content_summary"}userInput(@query))` section is @@ -953,7 +933,6 @@ class VespaIndex(DocumentIndex): filters: IndexFilters, time_decay_multiplier: float, num_to_retrieve: int, - index_name: str, offset: int = 0, hybrid_alpha: float | None = HYBRID_ALPHA, title_content_ratio: float | None = TITLE_CONTENT_RATIO, @@ -964,7 +943,7 @@ class VespaIndex(DocumentIndex): # Needs to be at least as much as the value set in Vespa schema config target_hits = max(10 * num_to_retrieve, 1000) yql = ( - VespaIndex.yql_base.format(index_name=index_name) + VespaIndex.yql_base.format(index_name=self.index_name) + vespa_where_clauses + f"(({{targetHits: {target_hits}}}nearestNeighbor(embeddings, query_embedding)) " + f"or ({{targetHits: {target_hits}}}nearestNeighbor(title_embedding, query_embedding)) " @@ -1003,13 +982,12 @@ class VespaIndex(DocumentIndex): self, query: str, filters: IndexFilters, - index_name: str, num_to_retrieve: int = NUM_RETURNED_HITS, offset: int = 0, ) -> list[InferenceChunk]: vespa_where_clauses = _build_vespa_filters(filters, include_hidden=True) yql = ( - VespaIndex.yql_base.format(index_name=index_name) + VespaIndex.yql_base.format(index_name=self.index_name) + vespa_where_clauses + '({grammar: "weakAnd"}userInput(@query) ' # `({defaultIndex: "content_summary"}userInput(@query))` section is diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py index 23fa2bd86..1b6891a32 100644 --- a/backend/danswer/indexing/indexing_pipeline.py +++ b/backend/danswer/indexing/indexing_pipeline.py @@ -19,6 +19,7 @@ from danswer.db.document_set import fetch_document_sets_for_documents from danswer.db.engine import get_sqlalchemy_engine from danswer.db.tag import create_or_add_document_tag from danswer.db.tag import create_or_add_document_tag_list +from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.factory import get_default_document_index from danswer.document_index.interfaces import DocumentIndex from danswer.document_index.interfaces import DocumentMetadata @@ -96,7 +97,6 @@ def index_doc_batch( chunker: Chunker, embedder: Embedder, document_index: DocumentIndex, - index_name: str, documents: list[Document], index_attempt_metadata: IndexAttemptMetadata, ignore_time_skip: bool = False, @@ -188,9 +188,7 @@ def index_doc_batch( # A document will not be spread across different batches, so all the # documents with chunks in this set, are fully represented by the chunks # in this set - insertion_records = document_index.index( - chunks=access_aware_chunks, index_name=index_name - ) + insertion_records = document_index.index(chunks=access_aware_chunks) successful_doc_ids = [record.document_id for record in insertion_records] successful_docs = [ @@ -218,7 +216,6 @@ def build_indexing_pipeline( chunker: Chunker | None = None, embedder: Embedder | None = None, document_index: DocumentIndex | None = None, - index_name: str, ignore_time_skip: bool = False, ) -> IndexingPipelineProtocol: """Builds a pipline which takes in a list (batch) of docs and indexes them.""" @@ -226,13 +223,16 @@ def build_indexing_pipeline( embedder = embedder or DefaultEmbedder() - document_index = document_index or get_default_document_index() + if not document_index: + with Session(get_sqlalchemy_engine()) as db_session: + document_index = get_default_document_index( + primary_index_name=get_index_name(db_session), secondary_index_name=None + ) return partial( index_doc_batch, chunker=chunker, embedder=embedder, document_index=document_index, - index_name=index_name, ignore_time_skip=ignore_time_skip, ) diff --git a/backend/danswer/indexing/models.py b/backend/danswer/indexing/models.py index 331a5fed4..c875c88bd 100644 --- a/backend/danswer/indexing/models.py +++ b/backend/danswer/indexing/models.py @@ -2,6 +2,8 @@ from dataclasses import dataclass from dataclasses import fields from datetime import datetime +from pydantic import BaseModel + from danswer.access.models import DocumentAccess from danswer.configs.constants import DocumentSource from danswer.connectors.models import Document @@ -120,3 +122,11 @@ class InferenceChunk(BaseChunk): break short_blurb += " " + word return f"Inference Chunk: {self.document_id} - {short_blurb}..." + + +class EmbeddingModelDetail(BaseModel): + model_name: str + model_dim: int + normalize: bool + query_prefix: str | None + passage_prefix: str | None diff --git a/backend/danswer/main.py b/backend/danswer/main.py index 8746b37ab..295dc3173 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -35,6 +35,7 @@ from danswer.configs.chat_configs import MULTILINGUAL_QUERY_EXPANSION from danswer.configs.constants import AuthType from danswer.configs.model_configs import ASYM_PASSAGE_PREFIX from danswer.configs.model_configs import ASYM_QUERY_PREFIX +from danswer.configs.model_configs import DOC_EMBEDDING_DIM from danswer.configs.model_configs import DOCUMENT_ENCODER_MODEL from danswer.configs.model_configs import ENABLE_RERANKING_REAL_TIME_FLOW from danswer.configs.model_configs import FAST_GEN_AI_MODEL_VERSION @@ -44,7 +45,10 @@ from danswer.configs.model_configs import GEN_AI_MODEL_VERSION from danswer.db.connector import create_initial_default_connector from danswer.db.connector_credential_pair import associate_default_cc_pair from danswer.db.credentials import create_initial_public_credential +from danswer.db.embedding_model import get_latest_embedding_model_by_status from danswer.db.engine import get_sqlalchemy_engine +from danswer.db.models import IndexModelStatus +from danswer.document_index.document_index_utils import clean_model_name from danswer.document_index.factory import get_default_document_index from danswer.llm.factory import get_default_llm from danswer.search.search_nlp_models import warm_up_models @@ -278,7 +282,34 @@ def get_application() -> FastAPI: load_chat_yamls() logger.info("Verifying Document Index(s) is/are available.") - get_default_document_index().ensure_indices_exist() + primary_embedding_model = get_latest_embedding_model_by_status( + status=IndexModelStatus.PRESENT, db_session=db_session + ) + secondary_embedding_model = get_latest_embedding_model_by_status( + status=IndexModelStatus.FUTURE, db_session=db_session + ) + primary_index = ( + f"danswer_chunk_{clean_model_name(primary_embedding_model.model_name)}" + if primary_embedding_model + else "danswer_chunk" + ) + second_index = ( + f"danswer_chunk_{clean_model_name(secondary_embedding_model.model_name)}" + if secondary_embedding_model + else None + ) + + document_index = get_default_document_index( + primary_index_name=primary_index, secondary_index_name=second_index + ) + document_index.ensure_indices_exist( + index_embedding_dim=primary_embedding_model.model_dim + if primary_embedding_model + else DOC_EMBEDDING_DIM, + secondary_index_embedding_dim=secondary_embedding_model.model_dim + if secondary_embedding_model + else None, + ) optional_telemetry( record_type=RecordType.VERSION, data={"version": __version__} diff --git a/backend/danswer/one_shot_answer/answer_question.py b/backend/danswer/one_shot_answer/answer_question.py index 1ebbd2980..4593b4f65 100644 --- a/backend/danswer/one_shot_answer/answer_question.py +++ b/backend/danswer/one_shot_answer/answer_question.py @@ -22,6 +22,7 @@ from danswer.db.chat import get_persona_by_id from danswer.db.chat import get_prompt_by_id from danswer.db.chat import translate_db_message_to_chat_message_detail from danswer.db.models import User +from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.factory import get_default_document_index from danswer.indexing.models import InferenceChunk from danswer.llm.utils import get_default_llm_token_encode @@ -89,7 +90,10 @@ def stream_answer_objects( ) llm_tokenizer = get_default_llm_token_encode() - document_index = get_default_document_index() + + document_index = get_default_document_index( + primary_index_name=get_index_name(db_session), secondary_index_name=None + ) # Create a chat session which will just store the root message, the query, and the AI response root_message = get_or_create_root_message( diff --git a/backend/danswer/search/search_runner.py b/backend/danswer/search/search_runner.py index c920d59b6..54e69c31e 100644 --- a/backend/danswer/search/search_runner.py +++ b/backend/danswer/search/search_runner.py @@ -17,7 +17,6 @@ from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MAX from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MIN from danswer.configs.model_configs import SIM_SCORE_RANGE_HIGH from danswer.configs.model_configs import SIM_SCORE_RANGE_LOW -from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.document_index_utils import ( translate_boost_count_to_multiplier, ) @@ -143,14 +142,12 @@ def doc_index_retrieval( document_index: DocumentIndex, hybrid_alpha: float = HYBRID_ALPHA, ) -> list[InferenceChunk]: - index = get_index_name() if query.search_type == SearchType.KEYWORD: top_chunks = document_index.keyword_retrieval( query=query.query, filters=query.filters, time_decay_multiplier=query.recency_bias_multiplier, num_to_retrieve=query.num_hits, - index_name=index, ) elif query.search_type == SearchType.SEMANTIC: @@ -159,7 +156,6 @@ def doc_index_retrieval( filters=query.filters, time_decay_multiplier=query.recency_bias_multiplier, num_to_retrieve=query.num_hits, - index_name=index, ) elif query.search_type == SearchType.HYBRID: @@ -170,7 +166,6 @@ def doc_index_retrieval( num_to_retrieve=query.num_hits, offset=query.offset, hybrid_alpha=hybrid_alpha, - index_name=index, ) else: diff --git a/backend/danswer/server/danswer_api/ingestion.py b/backend/danswer/server/danswer_api/ingestion.py index 712310b34..2a0281657 100644 --- a/backend/danswer/server/danswer_api/ingestion.py +++ b/backend/danswer/server/danswer_api/ingestion.py @@ -16,6 +16,7 @@ from danswer.db.connector_credential_pair import get_connector_credential_pair from danswer.db.credentials import fetch_credential_by_id from danswer.db.engine import get_session from danswer.document_index.document_index_utils import get_both_index_names +from danswer.document_index.factory import get_default_document_index from danswer.dynamic_configs import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.indexing.indexing_pipeline import build_indexing_pipeline @@ -142,10 +143,14 @@ def document_ingestion( if document.source == DocumentSource.INGESTION_API: document.source = DocumentSource.FILE - index_names = get_both_index_names() + # Need to index for both the primary and secondary index if possible + curr_ind_name, sec_ind_name = get_both_index_names(db_session) + curr_doc_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=None + ) indexing_pipeline = build_indexing_pipeline( - ignore_time_skip=True, index_name=index_names[0] + ignore_time_skip=True, document_index=curr_doc_index ) new_doc, chunks = indexing_pipeline( @@ -157,8 +162,16 @@ def document_ingestion( ) # If there's a secondary index being built, index the doc but don't use it for return here - if len(index_names) > 1: - indexing_pipeline( + if sec_ind_name: + sec_doc_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=None + ) + + sec_ind_pipeline = build_indexing_pipeline( + ignore_time_skip=True, document_index=sec_doc_index + ) + + sec_ind_pipeline( documents=[document], index_attempt_metadata=IndexAttemptMetadata( connector_id=connector_id, diff --git a/backend/danswer/server/documents/document.py b/backend/danswer/server/documents/document.py index 7552322f2..57f515bff 100644 --- a/backend/danswer/server/documents/document.py +++ b/backend/danswer/server/documents/document.py @@ -27,7 +27,9 @@ def get_document_info( user: User | None = Depends(current_user), db_session: Session = Depends(get_session), ) -> DocumentInfo: - document_index = get_default_document_index() + document_index = get_default_document_index( + primary_index_name=get_index_name(db_session), secondary_index_name=None + ) user_acl_filters = build_access_filters_for_user(user, db_session) filters = IndexFilters(access_control_list=user_acl_filters) @@ -36,7 +38,6 @@ def get_document_info( document_id=document_id, chunk_ind=None, filters=filters, - index_name=get_index_name(), ) if not inference_chunks: @@ -60,7 +61,9 @@ def get_chunk_info( user: User | None = Depends(current_user), db_session: Session = Depends(get_session), ) -> ChunkInfo: - document_index = get_default_document_index() + document_index = get_default_document_index( + primary_index_name=get_index_name(db_session), secondary_index_name=None + ) user_acl_filters = build_access_filters_for_user(user, db_session) filters = IndexFilters(access_control_list=user_acl_filters) @@ -69,7 +72,6 @@ def get_chunk_info( document_id=document_id, chunk_ind=chunk_id, filters=filters, - index_name=get_index_name(), ) if not inference_chunks: diff --git a/backend/danswer/server/gpts/api.py b/backend/danswer/server/gpts/api.py index 67b637bf8..772726b2f 100644 --- a/backend/danswer/server/gpts/api.py +++ b/backend/danswer/server/gpts/api.py @@ -7,6 +7,7 @@ from pydantic import BaseModel from sqlalchemy.orm import Session from danswer.db.engine import get_session +from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.factory import get_default_document_index from danswer.search.access_filters import build_access_filters_for_user from danswer.search.models import IndexFilters @@ -83,9 +84,13 @@ def gpt_search( skip_llm_chunk_filter=True, ) + document_index = get_default_document_index( + primary_index_name=get_index_name(db_session), secondary_index_name=None + ) + top_chunks, __ = full_chunk_search( query=search_query, - document_index=get_default_document_index(), + document_index=document_index, ) return GptSearchResponse( diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py index 4fbe07256..50236c5d0 100644 --- a/backend/danswer/server/manage/administrative.py +++ b/backend/danswer/server/manage/administrative.py @@ -18,6 +18,7 @@ from danswer.db.feedback import fetch_docs_ranked_by_boost from danswer.db.feedback import update_document_boost from danswer.db.feedback import update_document_hidden from danswer.db.models import User +from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index from danswer.dynamic_configs import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError @@ -68,12 +69,17 @@ def document_boost_update( _: User | None = Depends(current_admin_user), db_session: Session = Depends(get_session), ) -> None: + curr_ind_name, sec_ind_name = get_both_index_names(db_session) + document_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + ) + try: update_document_boost( db_session=db_session, document_id=boost_update.document_id, boost=boost_update.boost, - document_index=get_default_document_index(), + document_index=document_index, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -85,12 +91,17 @@ def document_hidden_update( _: User | None = Depends(current_admin_user), db_session: Session = Depends(get_session), ) -> None: + curr_ind_name, sec_ind_name = get_both_index_names(db_session) + document_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + ) + try: update_document_hidden( db_session=db_session, document_id=hidden_update.document_id, hidden=hidden_update.hidden, - document_index=get_default_document_index(), + document_index=document_index, ) except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) diff --git a/backend/danswer/server/manage/models.py b/backend/danswer/server/manage/models.py index 64af01a9f..01cff7c97 100644 --- a/backend/danswer/server/manage/models.py +++ b/backend/danswer/server/manage/models.py @@ -103,3 +103,7 @@ class SlackBotConfig(BaseModel): id: int persona: PersonaSnapshot | None channel_config: ChannelConfig + + +class ModelVersionResponse(BaseModel): + model_name: str | None # None only applicable to secondary index diff --git a/backend/danswer/server/manage/secondary_index.py b/backend/danswer/server/manage/secondary_index.py new file mode 100644 index 000000000..198730e3d --- /dev/null +++ b/backend/danswer/server/manage/secondary_index.py @@ -0,0 +1,33 @@ +from fastapi import APIRouter +from fastapi import Depends + +from danswer.auth.users import current_admin_user +from danswer.db.models import User +from danswer.indexing.models import EmbeddingModelDetail +from danswer.server.manage.models import ModelVersionResponse +from danswer.utils.logger import setup_logger + +router = APIRouter(prefix="/secondary-index") +logger = setup_logger() + + +@router.post("/set-new-embedding-model") +def set_new_embedding_model( + embed_model_details: EmbeddingModelDetail, + _: User | None = Depends(current_admin_user), +) -> None: + raise NotImplementedError() + + +@router.get("/get-current-embedding-model") +def get_current_embedding_model( + _: User | None = Depends(current_admin_user), +) -> ModelVersionResponse: + raise NotImplementedError() + + +@router.get("/get-secondary-embedding-model") +def get_secondary_embedding_model( + _: User | None = Depends(current_admin_user), +) -> ModelVersionResponse: + raise NotImplementedError() diff --git a/backend/danswer/server/query_and_chat/chat_backend.py b/backend/danswer/server/query_and_chat/chat_backend.py index 2a1714104..726c3342d 100644 --- a/backend/danswer/server/query_and_chat/chat_backend.py +++ b/backend/danswer/server/query_and_chat/chat_backend.py @@ -20,6 +20,7 @@ from danswer.db.engine import get_session from danswer.db.feedback import create_chat_message_feedback from danswer.db.feedback import create_doc_retrieval_feedback from danswer.db.models import User +from danswer.document_index.document_index_utils import get_both_index_names from danswer.document_index.factory import get_default_document_index from danswer.secondary_llm_flows.chat_session_naming import ( get_renamed_conversation_name, @@ -228,12 +229,18 @@ def create_search_feedback( """This endpoint isn't protected - it does not check if the user has access to the document Users could try changing boosts of arbitrary docs but this does not leak any data. """ + + curr_ind_name, sec_ind_name = get_both_index_names(db_session) + document_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + ) + create_doc_retrieval_feedback( message_id=feedback.message_id, document_id=feedback.document_id, document_rank=feedback.document_rank, clicked=feedback.click, feedback=feedback.search_feedback, - document_index=get_default_document_index(), + document_index=document_index, db_session=db_session, ) diff --git a/backend/danswer/server/query_and_chat/query_backend.py b/backend/danswer/server/query_and_chat/query_backend.py index 27d02973d..552bf266a 100644 --- a/backend/danswer/server/query_and_chat/query_backend.py +++ b/backend/danswer/server/query_and_chat/query_backend.py @@ -53,16 +53,18 @@ def admin_search( time_cutoff=question.filters.time_cutoff, access_control_list=user_acl_filters, ) - document_index = get_default_document_index() + + document_index = get_default_document_index( + primary_index_name=get_index_name(db_session), secondary_index_name=None + ) + if not isinstance(document_index, VespaIndex): raise HTTPException( status_code=400, detail="Cannot use admin-search when using a non-Vespa document index", ) - matching_chunks = document_index.admin_retrieval( - query=query, filters=final_filters, index_name=get_index_name() - ) + matching_chunks = document_index.admin_retrieval(query=query, filters=final_filters) documents = chunks_to_search_docs(matching_chunks) diff --git a/backend/danswer/utils/acl.py b/backend/danswer/utils/acl.py index 01256a813..268457bfd 100644 --- a/backend/danswer/utils/acl.py +++ b/backend/danswer/utils/acl.py @@ -32,10 +32,6 @@ def set_acl_for_vespa(should_check_if_already_done: bool = False) -> None: except ConfigNotFoundError: pass - vespa_index = get_default_document_index() - if not isinstance(vespa_index, VespaIndex): - raise ValueError("This script is only for Vespa indexes") - logger.info("Populating Access Control List fields in Vespa") with Session(get_sqlalchemy_engine()) as db_session: # for all documents, set the `access_control_list` field appropriately @@ -46,15 +42,21 @@ def set_acl_for_vespa(should_check_if_already_done: bool = False) -> None: document_ids=[document.id for document in documents], ) - for index_name in get_both_index_names(): - update_requests = [ - UpdateRequest( - document_ids=[document_id], - access=DocumentAccess.build(user_ids, is_public), - ) - for document_id, user_ids, is_public in document_access_info - ] - vespa_index.update(update_requests=update_requests, index_name=index_name) + curr_ind_name, sec_ind_name = get_both_index_names(db_session) + vespa_index = get_default_document_index( + primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name + ) + if not isinstance(vespa_index, VespaIndex): + raise ValueError("This script is only for Vespa indexes") + + update_requests = [ + UpdateRequest( + document_ids=[document_id], + access=DocumentAccess.build(user_ids, is_public), + ) + for document_id, user_ids, is_public in document_access_info + ] + vespa_index.update(update_requests=update_requests) dynamic_config_store.store(_COMPLETED_ACL_UPDATE_KEY, True) diff --git a/backend/tests/regression/search_quality/eval_search.py b/backend/tests/regression/search_quality/eval_search.py index fd83b8d93..9561ca9f6 100644 --- a/backend/tests/regression/search_quality/eval_search.py +++ b/backend/tests/regression/search_quality/eval_search.py @@ -5,8 +5,11 @@ from contextlib import contextmanager from typing import Any from typing import TextIO +from sqlalchemy.orm import Session + from danswer.chat.chat_utils import get_chunks_for_qa from danswer.db.engine import get_sqlalchemy_engine +from danswer.document_index.document_index_utils import get_index_name from danswer.document_index.factory import get_default_document_index from danswer.indexing.models import InferenceChunk from danswer.search.models import IndexFilters @@ -93,9 +96,16 @@ def get_search_results( retrieval_metrics = MetricsHander[RetrievalMetricsContainer]() rerank_metrics = MetricsHander[RerankMetricsContainer]() + with Session(get_sqlalchemy_engine()) as db_session: + ind_name = get_index_name(db_session) + + document_index = get_default_document_index( + primary_index_name=ind_name, secondary_index_name=None + ) + top_chunks, llm_chunk_selection = full_chunk_search( query=search_query, - document_index=get_default_document_index(), + document_index=document_index, retrieval_metrics_callback=retrieval_metrics.record_metric, rerank_metrics_callback=rerank_metrics.record_metric, )