diff --git a/backend/alembic/versions/dbaa756c2ccf_embedding_models.py b/backend/alembic/versions/dbaa756c2ccf_embedding_models.py
new file mode 100644
index 000000000..421356645
--- /dev/null
+++ b/backend/alembic/versions/dbaa756c2ccf_embedding_models.py
@@ -0,0 +1,70 @@
+"""Embedding Models
+
+Revision ID: dbaa756c2ccf
+Revises: 7f726bad5367
+Create Date: 2024-01-25 17:12:31.813160
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+from danswer.db.models import IndexModelStatus
+
+# revision identifiers, used by Alembic.
+revision = "dbaa756c2ccf"
+down_revision = "7f726bad5367"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ op.create_table(
+ "embedding_model",
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("model_name", sa.String(), nullable=False),
+ sa.Column("model_dim", sa.Integer(), nullable=False),
+ sa.Column("normalize", sa.Boolean(), nullable=False),
+ sa.Column("query_prefix", sa.String(), nullable=False),
+ sa.Column("passage_prefix", sa.String(), nullable=False),
+ sa.Column(
+ "status",
+ sa.Enum(IndexModelStatus, native=False),
+ nullable=False,
+ ),
+ sa.PrimaryKeyConstraint("id"),
+ )
+ op.add_column(
+ "index_attempt",
+ sa.Column("embedding_model_id", sa.Integer(), nullable=True),
+ )
+ op.create_foreign_key(
+ "index_attempt__embedding_model_fk",
+ "index_attempt",
+ "embedding_model",
+ ["embedding_model_id"],
+ ["id"],
+ )
+ op.create_index(
+ "ix_embedding_model_present_unique",
+ "embedding_model",
+ ["status"],
+ unique=True,
+ postgresql_where=sa.text("status = 'PRESENT'"),
+ )
+ op.create_index(
+ "ix_embedding_model_future_unique",
+ "embedding_model",
+ ["status"],
+ unique=True,
+ postgresql_where=sa.text("status = 'FUTURE'"),
+ )
+
+
+def downgrade() -> None:
+ op.drop_constraint(
+ "index_attempt__embedding_model_fk", "index_attempt", type_="foreignkey"
+ )
+ op.drop_column("index_attempt", "embedding_model_id")
+ op.drop_table("embedding_model")
+ op.drop_index("ix_embedding_model_present_unique", table_name="embedding_model")
+ op.drop_index("ix_embedding_model_future_unique", table_name="embedding_model")
diff --git a/backend/danswer/background/celery/celery.py b/backend/danswer/background/celery/celery.py
index 90706feb6..80a8a2a13 100644
--- a/backend/danswer/background/celery/celery.py
+++ b/backend/danswer/background/celery/celery.py
@@ -30,7 +30,6 @@ from danswer.db.tasks import check_live_task_not_timed_out
from danswer.db.tasks import get_latest_task
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
-from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
from danswer.utils.batching import batch_generator
from danswer.utils.logger import setup_logger
@@ -79,9 +78,13 @@ def cleanup_connector_credential_pair_task(
try:
# The bulk of the work is in here, updates Postgres and Vespa
+ curr_ind_name, sec_ind_name = get_both_index_names(db_session)
+ document_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
+ )
return delete_connector_credential_pair(
db_session=db_session,
- document_index=get_default_document_index(),
+ document_index=document_index,
cc_pair=cc_pair,
)
except Exception as e:
@@ -95,9 +98,7 @@ def sync_document_set_task(document_set_id: int) -> None:
"""For document sets marked as not up to date, sync the state from postgres
into the datastore. Also handles deletions."""
- def _sync_document_batch(
- document_ids: list[str], document_index: DocumentIndex
- ) -> None:
+ def _sync_document_batch(document_ids: list[str]) -> None:
logger.debug(f"Syncing document sets for: {document_ids}")
# begin a transaction, release lock at the end
with Session(get_sqlalchemy_engine()) as db_session:
@@ -115,21 +116,21 @@ def sync_document_set_task(document_set_id: int) -> None:
}
# update Vespa
- for index_name in get_both_index_names():
- update_requests = [
- UpdateRequest(
- document_ids=[document_id],
- document_sets=set(document_set_map.get(document_id, [])),
- )
- for document_id in document_ids
- ]
- document_index.update(
- update_requests=update_requests, index_name=index_name
+ curr_ind_name, sec_ind_name = get_both_index_names(db_session)
+ document_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
+ )
+ update_requests = [
+ UpdateRequest(
+ document_ids=[document_id],
+ document_sets=set(document_set_map.get(document_id, [])),
)
+ for document_id in document_ids
+ ]
+ document_index.update(update_requests=update_requests)
with Session(get_sqlalchemy_engine()) as db_session:
try:
- document_index = get_default_document_index()
documents_to_update = fetch_documents_for_document_set(
document_set_id=document_set_id,
db_session=db_session,
@@ -139,8 +140,7 @@ def sync_document_set_task(document_set_id: int) -> None:
documents_to_update, _SYNC_BATCH_SIZE
):
_sync_document_batch(
- document_ids=[document.id for document in document_batch],
- document_index=document_index,
+ document_ids=[document.id for document in document_batch]
)
# if there are no connectors, then delete the document set. Otherwise, just
diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py
index cd7f35bcf..845850144 100644
--- a/backend/danswer/background/connector_deletion.py
+++ b/backend/danswer/background/connector_deletion.py
@@ -31,7 +31,6 @@ from danswer.db.document_set import (
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import delete_index_attempts
from danswer.db.models import ConnectorCredentialPair
-from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
@@ -63,8 +62,7 @@ def _delete_connector_credential_pair_batch(
]
logger.debug(f"Deleting documents: {document_ids_to_delete}")
- for index_name in get_both_index_names():
- document_index.delete(doc_ids=document_ids_to_delete, index_name=index_name)
+ document_index.delete(doc_ids=document_ids_to_delete)
delete_documents_complete(
db_session=db_session,
@@ -92,10 +90,7 @@ def _delete_connector_credential_pair_batch(
]
logger.debug(f"Updating documents: {document_ids_to_update}")
- for index_name in get_both_index_names():
- document_index.update(
- update_requests=update_requests, index_name=index_name
- )
+ document_index.update(update_requests=update_requests)
delete_document_by_connector_credential_pair(
db_session=db_session,
diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py
index 3bcb367b4..496e6a155 100644
--- a/backend/danswer/background/indexing/run_indexing.py
+++ b/backend/danswer/background/indexing/run_indexing.py
@@ -26,7 +26,6 @@ from danswer.db.index_attempt import mark_attempt_succeeded
from danswer.db.index_attempt import update_docs_indexed
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
-from danswer.document_index.document_index_utils import get_index_name
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
from danswer.utils.logger import IndexAttemptSingleton
from danswer.utils.logger import setup_logger
@@ -104,7 +103,7 @@ def _run_indexing(
)
# TODO UPDATE THIS FOR SECONDARY INDEXING
- indexing_pipeline = build_indexing_pipeline(index_name=get_index_name())
+ indexing_pipeline = build_indexing_pipeline()
db_connector = index_attempt.connector
db_credential = index_attempt.credential
diff --git a/backend/danswer/chat/process_message.py b/backend/danswer/chat/process_message.py
index 31073da3a..44b0c022d 100644
--- a/backend/danswer/chat/process_message.py
+++ b/backend/danswer/chat/process_message.py
@@ -35,6 +35,7 @@ from danswer.db.chat import translate_db_search_doc_to_server_search_doc
from danswer.db.models import ChatMessage
from danswer.db.models import SearchDoc as DbSearchDoc
from danswer.db.models import User
+from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.factory import get_default_document_index
from danswer.indexing.models import InferenceChunk
from danswer.llm.exceptions import GenAIDisabledException
@@ -187,7 +188,9 @@ def stream_chat_message(
llm = None
llm_tokenizer = get_default_llm_token_encode()
- document_index = get_default_document_index()
+ document_index = get_default_document_index(
+ primary_index_name=get_index_name(db_session), secondary_index_name=None
+ )
# Every chat Session begins with an empty root message
root_message = get_or_create_root_message(
@@ -256,7 +259,7 @@ def stream_chat_message(
# May extend to include chunk ranges
llm_docs: list[LlmDoc] = inference_documents_from_ids(
doc_identifiers=identifier_tuples,
- document_index=get_default_document_index(),
+ document_index=document_index,
)
doc_id_to_rank_map = map_document_id_order(
cast(list[InferenceChunk | LlmDoc], llm_docs)
diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py
index ea5628fa1..0151634ed 100644
--- a/backend/danswer/configs/constants.py
+++ b/backend/danswer/configs/constants.py
@@ -47,12 +47,6 @@ SECTION_SEPARATOR = "\n\n"
# For combining attributes, doesn't have to be unique/perfect to work
INDEX_SEPARATOR = "==="
-# Index Related
-CURRENT_EMBEDDING_MODEL = "CURRENT_EMBEDDING_MODEL"
-CURRENT_EMBEDDING_DIM = "CURRENT_EMBEDDING_DIM"
-UPCOMING_EMBEDDING_MODEL = "UPCOMING_EMBEDDING_MODEL"
-UPCOMING_EMBEDDING_DIM = "UPCOMING_EMBEDDING_DIM"
-
# Messages
DISABLED_GEN_AI_MSG = (
diff --git a/backend/danswer/danswerbot/slack/handlers/handle_buttons.py b/backend/danswer/danswerbot/slack/handlers/handle_buttons.py
index aac0df96e..0a88c0b96 100644
--- a/backend/danswer/danswerbot/slack/handlers/handle_buttons.py
+++ b/backend/danswer/danswerbot/slack/handlers/handle_buttons.py
@@ -27,6 +27,7 @@ from danswer.danswerbot.slack.utils import update_emote_react
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.feedback import create_chat_message_feedback
from danswer.db.feedback import create_doc_retrieval_feedback
+from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.utils.logger import setup_logger
@@ -102,11 +103,16 @@ def handle_slack_feedback(
else:
feedback = SearchFeedbackType.HIDE
+ curr_ind_name, sec_ind_name = get_both_index_names(db_session)
+ document_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
+ )
+
create_doc_retrieval_feedback(
message_id=message_id,
document_id=doc_id,
document_rank=doc_rank,
- document_index=get_default_document_index(),
+ document_index=document_index,
db_session=db_session,
clicked=False, # Not tracking this for Slack
feedback=feedback,
diff --git a/backend/danswer/db/embedding_model.py b/backend/danswer/db/embedding_model.py
new file mode 100644
index 000000000..abacc9c85
--- /dev/null
+++ b/backend/danswer/db/embedding_model.py
@@ -0,0 +1,43 @@
+from sqlalchemy import select
+from sqlalchemy.orm import Session
+
+from danswer.db.models import EmbeddingModel
+from danswer.db.models import IndexModelStatus
+from danswer.indexing.models import EmbeddingModelDetail
+from danswer.utils.logger import setup_logger
+
+logger = setup_logger()
+
+
+def create_embedding_model(
+ model_details: EmbeddingModelDetail,
+ db_session: Session,
+ status: IndexModelStatus = IndexModelStatus.FUTURE,
+) -> EmbeddingModel:
+ embedding_model = EmbeddingModel(
+ model_name=model_details.model_name,
+ model_dim=model_details.model_dim,
+ normalize=model_details.normalize,
+ query_prefix=model_details.query_prefix,
+ passage_prefix=model_details.passage_prefix,
+ status=status,
+ )
+
+ db_session.add(embedding_model)
+ db_session.commit()
+
+ return embedding_model
+
+
+def get_latest_embedding_model_by_status(
+ status: IndexModelStatus, db_session: Session
+) -> EmbeddingModel | None:
+ query = (
+ select(EmbeddingModel)
+ .where(EmbeddingModel.status == status)
+ .order_by(EmbeddingModel.id.desc())
+ )
+ result = db_session.execute(query)
+ latest_model = result.scalars().first()
+
+ return latest_model
diff --git a/backend/danswer/db/feedback.py b/backend/danswer/db/feedback.py
index ad7381375..474e46a3a 100644
--- a/backend/danswer/db/feedback.py
+++ b/backend/danswer/db/feedback.py
@@ -12,7 +12,6 @@ from danswer.db.chat import get_chat_message
from danswer.db.models import ChatMessageFeedback
from danswer.db.models import Document as DbDocument
from danswer.db.models import DocumentRetrievalFeedback
-from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
@@ -58,8 +57,7 @@ def update_document_boost(
boost=boost,
)
- for index_name in get_both_index_names():
- document_index.update(update_requests=[update], index_name=index_name)
+ document_index.update(update_requests=[update])
db_session.commit()
@@ -79,8 +77,7 @@ def update_document_hidden(
hidden=hidden,
)
- for index_name in get_both_index_names():
- document_index.update(update_requests=[update], index_name=index_name)
+ document_index.update(update_requests=[update])
db_session.commit()
@@ -126,8 +123,7 @@ def create_doc_retrieval_feedback(
document_ids=[document_id], boost=db_doc.boost, hidden=db_doc.hidden
)
# Updates are generally batched for efficiency, this case only 1 doc/value is updated
- for index_name in get_both_index_names():
- document_index.update(update_requests=[update], index_name=index_name)
+ document_index.update(update_requests=[update])
db_session.add(retrieval_feedback)
db_session.commit()
diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py
index fbbfa8dd1..fcc46682d 100644
--- a/backend/danswer/db/models.py
+++ b/backend/danswer/db/models.py
@@ -62,6 +62,12 @@ class TaskStatus(str, PyEnum):
FAILURE = "FAILURE"
+class IndexModelStatus(str, PyEnum):
+ PAST = "PAST"
+ PRESENT = "PRESENT"
+ FUTURE = "FUTURE"
+
+
class Base(DeclarativeBase):
pass
@@ -363,6 +369,37 @@ class Credential(Base):
user: Mapped[User | None] = relationship("User", back_populates="credentials")
+class EmbeddingModel(Base):
+ __tablename__ = "embedding_model"
+ # ID is used also to indicate the order that the models are configured by the admin
+ id: Mapped[int] = mapped_column(primary_key=True)
+ model_name: Mapped[str] = mapped_column(String)
+ model_dim: Mapped[int] = mapped_column(Integer)
+ normalize: Mapped[bool] = mapped_column(Boolean)
+ query_prefix: Mapped[str] = mapped_column(String)
+ passage_prefix: Mapped[str] = mapped_column(String)
+ status: Mapped[IndexModelStatus] = mapped_column(Enum(IndexModelStatus))
+
+ index_attempts: Mapped[List["IndexAttempt"]] = relationship(
+ "IndexAttempt", back_populates="embedding_model"
+ )
+
+ __table_args__ = (
+ Index(
+ "ix_embedding_model_present_unique",
+ "status",
+ unique=True,
+ postgresql_where=(status == IndexModelStatus.PRESENT),
+ ),
+ Index(
+ "ix_embedding_model_future_unique",
+ "status",
+ unique=True,
+ postgresql_where=(status == IndexModelStatus.FUTURE),
+ ),
+ )
+
+
class IndexAttempt(Base):
"""
Represents an attempt to index a group of 1 or more documents from a
@@ -387,6 +424,11 @@ class IndexAttempt(Base):
error_msg: Mapped[str | None] = mapped_column(
Text, default=None
) # only filled if status = "failed"
+ # Nullable because in the past, we didn't allow swapping out embedding models live
+ embedding_model_id: Mapped[int | None] = mapped_column(
+ ForeignKey("embedding_model.id"),
+ nullable=True,
+ )
time_created: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
@@ -408,6 +450,9 @@ class IndexAttempt(Base):
credential: Mapped[Credential] = relationship(
"Credential", back_populates="index_attempts"
)
+ embedding_model: Mapped[EmbeddingModel] = relationship(
+ "EmbeddingModel", back_populates="index_attempts"
+ )
__table_args__ = (
Index(
diff --git a/backend/danswer/document_index/document_index_utils.py b/backend/danswer/document_index/document_index_utils.py
index ae10bccf9..dd369b499 100644
--- a/backend/danswer/document_index/document_index_utils.py
+++ b/backend/danswer/document_index/document_index_utils.py
@@ -1,11 +1,10 @@
import math
import uuid
-from typing import cast
-from danswer.configs.constants import CURRENT_EMBEDDING_MODEL
-from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL
-from danswer.dynamic_configs import get_dynamic_config_store
-from danswer.dynamic_configs.interface import ConfigNotFoundError
+from sqlalchemy.orm import Session
+
+from danswer.db.embedding_model import get_latest_embedding_model_by_status
+from danswer.db.models import IndexModelStatus
from danswer.indexing.models import IndexChunk
from danswer.indexing.models import InferenceChunk
@@ -17,34 +16,43 @@ def clean_model_name(model_str: str) -> str:
return model_str.replace("/", "_").replace("-", "_").replace(".", "_")
-def get_index_name(secondary_index: bool = False) -> str:
- # TODO make this more efficient if needed
- kv_store = get_dynamic_config_store()
- if not secondary_index:
- try:
- embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
- return f"danswer_chunk_{clean_model_name(embed_model)}"
- except ConfigNotFoundError:
- return "danswer_chunk"
+def get_index_name(
+ db_session: Session,
+ secondary_index: bool = False,
+) -> str:
+ if secondary_index:
+ model = get_latest_embedding_model_by_status(
+ status=IndexModelStatus.FUTURE, db_session=db_session
+ )
+ if model is None:
+ raise RuntimeError("No secondary index being built")
+ return f"danswer_chunk_{clean_model_name(model.model_name)}"
- embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
- return f"danswer_chunk_{clean_model_name(embed_model)}"
+ model = get_latest_embedding_model_by_status(
+ status=IndexModelStatus.PRESENT, db_session=db_session
+ )
+ if not model:
+ return "danswer_chunk"
+ return f"danswer_chunk_{clean_model_name(model.model_name)}"
-def get_both_index_names() -> list[str]:
- kv_store = get_dynamic_config_store()
- try:
- embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
- indices = [f"danswer_chunk_{clean_model_name(embed_model)}"]
- except ConfigNotFoundError:
- indices = ["danswer_chunk"]
+def get_both_index_names(db_session: Session) -> tuple[str, str | None]:
+ model = get_latest_embedding_model_by_status(
+ status=IndexModelStatus.PRESENT, db_session=db_session
+ )
+ curr_index = (
+ "danswer_chunk"
+ if not model
+ else f"danswer_chunk_{clean_model_name(model.model_name)}"
+ )
- try:
- embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
- indices.append(f"danswer_chunk_{clean_model_name(embed_model)}")
- return indices
- except ConfigNotFoundError:
- return indices
+ model_new = get_latest_embedding_model_by_status(
+ status=IndexModelStatus.FUTURE, db_session=db_session
+ )
+ if not model_new:
+ return curr_index, None
+
+ return curr_index, f"danswer_chunk_{clean_model_name(model_new.model_name)}"
def translate_boost_count_to_multiplier(boost: int) -> float:
diff --git a/backend/danswer/document_index/factory.py b/backend/danswer/document_index/factory.py
index 40a44cf6a..17701d98e 100644
--- a/backend/danswer/document_index/factory.py
+++ b/backend/danswer/document_index/factory.py
@@ -2,6 +2,14 @@ from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.vespa.index import VespaIndex
-def get_default_document_index() -> DocumentIndex:
+def get_default_document_index(
+ primary_index_name: str,
+ secondary_index_name: str | None,
+) -> DocumentIndex:
+ """Primary index is the index that is used for querying/updating etc.
+ Secondary index is for when both the currently used index and the upcoming
+ index both need to be updated, updates are applied to both indices"""
# Currently only supporting Vespa
- return VespaIndex()
+ return VespaIndex(
+ index_name=primary_index_name, secondary_index_name=secondary_index_name
+ )
diff --git a/backend/danswer/document_index/interfaces.py b/backend/danswer/document_index/interfaces.py
index 1943f2369..51fe0366d 100644
--- a/backend/danswer/document_index/interfaces.py
+++ b/backend/danswer/document_index/interfaces.py
@@ -4,7 +4,6 @@ from datetime import datetime
from typing import Any
from danswer.access.models import DocumentAccess
-from danswer.configs.model_configs import DOC_EMBEDDING_DIM
from danswer.indexing.models import DocMetadataAwareIndexChunk
from danswer.indexing.models import InferenceChunk
from danswer.search.models import IndexFilters
@@ -46,12 +45,23 @@ class UpdateRequest:
class Verifiable(abc.ABC):
@abc.abstractmethod
- def __init__(self, index_name: str, *args: Any, **kwargs: Any) -> None:
+ def __init__(
+ self,
+ index_name: str,
+ secondary_index_name: str | None,
+ *args: Any,
+ **kwargs: Any
+ ) -> None:
super().__init__(*args, **kwargs)
self.index_name = index_name
+ self.secondary_index_name = secondary_index_name
@abc.abstractmethod
- def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None:
+ def ensure_indices_exist(
+ self,
+ index_embedding_dim: int,
+ secondary_index_embedding_dim: int | None,
+ ) -> None:
raise NotImplementedError
@@ -60,7 +70,6 @@ class Indexable(abc.ABC):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
- index_name: str,
) -> set[DocumentInsertionRecord]:
"""Indexes document chunks into the Document Index and return the IDs of all the documents indexed"""
raise NotImplementedError
@@ -68,14 +77,14 @@ class Indexable(abc.ABC):
class Deletable(abc.ABC):
@abc.abstractmethod
- def delete(self, doc_ids: list[str], index_name: str) -> None:
+ def delete(self, doc_ids: list[str]) -> None:
"""Removes the specified documents from the Index"""
raise NotImplementedError
class Updatable(abc.ABC):
@abc.abstractmethod
- def update(self, update_requests: list[UpdateRequest], index_name: str) -> None:
+ def update(self, update_requests: list[UpdateRequest]) -> None:
"""Updates metadata for the specified documents sets in the Index"""
raise NotImplementedError
@@ -87,7 +96,6 @@ class IdRetrievalCapable(abc.ABC):
document_id: str,
chunk_ind: int | None,
filters: IndexFilters,
- index_name: str,
) -> list[InferenceChunk]:
raise NotImplementedError
@@ -99,7 +107,6 @@ class KeywordCapable(abc.ABC):
query: str,
filters: IndexFilters,
time_decay_multiplier: float,
- index_name: str,
num_to_retrieve: int,
offset: int = 0,
) -> list[InferenceChunk]:
@@ -113,7 +120,6 @@ class VectorCapable(abc.ABC):
query: str,
filters: IndexFilters,
time_decay_multiplier: float,
- index_name: str,
num_to_retrieve: int,
offset: int = 0,
) -> list[InferenceChunk]:
@@ -128,7 +134,6 @@ class HybridCapable(abc.ABC):
filters: IndexFilters,
time_decay_multiplier: float,
num_to_retrieve: int,
- index_name: str,
offset: int = 0,
hybrid_alpha: float | None = None,
) -> list[InferenceChunk]:
@@ -141,7 +146,6 @@ class AdminCapable(abc.ABC):
self,
query: str,
filters: IndexFilters,
- index_name: str,
num_to_retrieve: int,
offset: int = 0,
) -> list[InferenceChunk]:
diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py
index fef57f862..192af61b7 100644
--- a/backend/danswer/document_index/vespa/index.py
+++ b/backend/danswer/document_index/vespa/index.py
@@ -20,7 +20,6 @@ import requests
from retry import retry
from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION
-from danswer.configs.app_configs import VESPA_DEPLOYMENT_ZIP
from danswer.configs.app_configs import VESPA_HOST
from danswer.configs.app_configs import VESPA_PORT
from danswer.configs.app_configs import VESPA_TENANT_PORT
@@ -34,8 +33,6 @@ from danswer.configs.constants import BLURB
from danswer.configs.constants import BOOST
from danswer.configs.constants import CHUNK_ID
from danswer.configs.constants import CONTENT
-from danswer.configs.constants import CURRENT_EMBEDDING_DIM
-from danswer.configs.constants import CURRENT_EMBEDDING_MODEL
from danswer.configs.constants import DOC_UPDATED_AT
from danswer.configs.constants import DOCUMENT_ID
from danswer.configs.constants import DOCUMENT_SETS
@@ -55,21 +52,15 @@ from danswer.configs.constants import SOURCE_TYPE
from danswer.configs.constants import TITLE
from danswer.configs.constants import TITLE_EMBEDDING
from danswer.configs.constants import TITLE_SEPARATOR
-from danswer.configs.constants import UPCOMING_EMBEDDING_DIM
-from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL
-from danswer.configs.model_configs import DOC_EMBEDDING_DIM
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
get_experts_stores_representations,
)
-from danswer.document_index.document_index_utils import clean_model_name
from danswer.document_index.document_index_utils import get_uuid_from_chunk
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import DocumentInsertionRecord
from danswer.document_index.interfaces import UpdateRequest
from danswer.document_index.vespa.utils import remove_invalid_unicode_chars
-from danswer.dynamic_configs import get_dynamic_config_store
-from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.models import DocMetadataAwareIndexChunk
from danswer.indexing.models import InferenceChunk
from danswer.search.models import IndexFilters
@@ -621,9 +612,11 @@ def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO:
return zip_buffer
-def _create_document_xml_lines(doc_names: list[str]) -> str:
+def _create_document_xml_lines(doc_names: list[str | None]) -> str:
doc_lines = [
- f'' for doc_name in doc_names
+ f''
+ for doc_name in doc_names
+ if doc_name
]
return "\n".join(doc_lines)
@@ -650,17 +643,15 @@ class VespaIndex(DocumentIndex):
f"from {{index_name}} where "
)
- def __init__(self, deployment_zip: str = VESPA_DEPLOYMENT_ZIP) -> None:
- # Vespa index name isn't configurable via code alone because of the config .sd file that needs
- # to be updated + zipped + deployed, not supporting the option for simplicity
- self.deployment_zip = deployment_zip
+ def __init__(self, index_name: str, secondary_index_name: str | None) -> None:
+ self.index_name = index_name
+ self.secondary_index_name = secondary_index_name
- def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None:
- """Verifying indices is more involved as there is no good way to
- verify the deployed app against the zip locally. But deploying the latest app.zip will ensure that
- the index is up-to-date with the expected schema and this does not erase the existing index.
- If the changes cannot be applied without conflict with existing data, it will fail with a non 200
- """
+ def ensure_indices_exist(
+ self,
+ index_embedding_dim: int,
+ secondary_index_embedding_dim: int | None,
+ ) -> None:
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
logger.debug(f"Sending Vespa zip to {deploy_url}")
@@ -670,30 +661,12 @@ class VespaIndex(DocumentIndex):
schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd")
services_file = os.path.join(vespa_schema_path, "services.xml")
- kv_store = get_dynamic_config_store()
- try:
- curr_embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
- schema_name = f"danswer_chunk_{clean_model_name(curr_embed_model)}"
- embedding_dim = cast(int, kv_store.load(CURRENT_EMBEDDING_DIM))
- except ConfigNotFoundError:
- schema_name = "danswer_chunk"
-
- doc_names = [schema_name]
-
- try:
- upcoming_embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
- upcoming_schema_name = (
- f"danswer_chunk_{clean_model_name(upcoming_embed_model)}"
- )
- upcoming_embedding_dim = cast(int, kv_store.load(UPCOMING_EMBEDDING_DIM))
- doc_names.append(upcoming_schema_name)
- except ConfigNotFoundError:
- upcoming_schema_name = None
-
with open(services_file, "r") as services_f:
services_template = services_f.read()
- doc_lines = _create_document_xml_lines(doc_names)
+ schema_names = [self.index_name, self.secondary_index_name]
+
+ doc_lines = _create_document_xml_lines(schema_names)
services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines)
zip_dict = {
@@ -704,17 +677,15 @@ class VespaIndex(DocumentIndex):
schema_template = schema_f.read()
schema = schema_template.replace(
- DANSWER_CHUNK_REPLACEMENT_PAT, schema_name
- ).replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim))
- zip_dict[f"schemas/{schema_name}.sd"] = schema.encode("utf-8")
+ DANSWER_CHUNK_REPLACEMENT_PAT, self.index_name
+ ).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim))
+ zip_dict[f"schemas/{schema_names[0]}.sd"] = schema.encode("utf-8")
- if upcoming_schema_name:
+ if self.secondary_index_name:
upcoming_schema = schema_template.replace(
- DANSWER_CHUNK_REPLACEMENT_PAT, upcoming_schema_name
- ).replace(VESPA_DIM_REPLACEMENT_PAT, str(upcoming_embedding_dim))
- zip_dict[f"schemas/{upcoming_schema_name}.sd"] = upcoming_schema.encode(
- "utf-8"
- )
+ DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name
+ ).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim))
+ zip_dict[f"schemas/{schema_names[1]}.sd"] = upcoming_schema.encode("utf-8")
zip_file = in_memory_zip_from_file_bytes(zip_dict)
@@ -728,9 +699,9 @@ class VespaIndex(DocumentIndex):
def index(
self,
chunks: list[DocMetadataAwareIndexChunk],
- index_name: str,
) -> set[DocumentInsertionRecord]:
- return _clear_and_index_vespa_chunks(chunks=chunks, index_name=index_name)
+ # IMPORTANT: This must be done one index at a time, do not use secondary index here
+ return _clear_and_index_vespa_chunks(chunks=chunks, index_name=self.index_name)
@staticmethod
def _apply_updates_batched(
@@ -774,7 +745,7 @@ class VespaIndex(DocumentIndex):
failure_msg = f"Failed to update document: {future_to_document_id[future]}"
raise requests.HTTPError(failure_msg) from e
- def update(self, update_requests: list[UpdateRequest], index_name: str) -> None:
+ def update(self, update_requests: list[UpdateRequest]) -> None:
logger.info(f"Updating {len(update_requests)} documents in Vespa")
start = time.time()
@@ -802,50 +773,61 @@ class VespaIndex(DocumentIndex):
logger.error("Update request received but nothing to update")
continue
- for document_id in update_request.document_ids:
- for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(
- document_id=document_id, index_name=index_name
- ):
- processed_updates_requests.append(
- _VespaUpdateRequest(
- document_id=document_id,
- url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}",
- update_request=update_dict,
+ index_names = [self.index_name]
+ if self.secondary_index_name:
+ index_names.append(self.secondary_index_name)
+
+ for index_name in index_names:
+ for document_id in update_request.document_ids:
+ for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(
+ document_id=document_id, index_name=index_name
+ ):
+ processed_updates_requests.append(
+ _VespaUpdateRequest(
+ document_id=document_id,
+ url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}",
+ update_request=update_dict,
+ )
)
- )
self._apply_updates_batched(processed_updates_requests)
logger.info(
"Finished updating Vespa documents in %s seconds", time.time() - start
)
- def delete(self, doc_ids: list[str], index_name: str) -> None:
+ def delete(self, doc_ids: list[str]) -> None:
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
# indexing / updates / deletes since we have to make a large volume of requests.
with httpx.Client(http2=True) as http_client:
- _delete_vespa_docs(
- document_ids=doc_ids, index_name=index_name, http_client=http_client
- )
+ index_names = [self.index_name]
+ if self.secondary_index_name:
+ index_names.append(self.secondary_index_name)
+
+ for index_name in index_names:
+ _delete_vespa_docs(
+ document_ids=doc_ids, index_name=index_name, http_client=http_client
+ )
def id_based_retrieval(
self,
document_id: str,
chunk_ind: int | None,
filters: IndexFilters,
- index_name: str,
) -> list[InferenceChunk]:
if chunk_ind is None:
vespa_chunk_ids = _get_vespa_chunk_ids_by_document_id(
- document_id=document_id, index_name=index_name, index_filters=filters
+ document_id=document_id,
+ index_name=self.index_name,
+ index_filters=filters,
)
if not vespa_chunk_ids:
return []
functions_with_args: list[tuple[Callable, tuple]] = [
- (_inference_chunk_by_vespa_id, (vespa_chunk_id, index_name))
+ (_inference_chunk_by_vespa_id, (vespa_chunk_id, self.index_name))
for vespa_chunk_id in vespa_chunk_ids
]
@@ -861,7 +843,7 @@ class VespaIndex(DocumentIndex):
else:
filters_str = _build_vespa_filters(filters=filters, include_hidden=True)
yql = (
- VespaIndex.yql_base.format(index_name=index_name)
+ VespaIndex.yql_base.format(index_name=self.index_name)
+ filters_str
+ f"({DOCUMENT_ID} contains '{document_id}' and {CHUNK_ID} contains '{chunk_ind}')"
)
@@ -872,7 +854,6 @@ class VespaIndex(DocumentIndex):
query: str,
filters: IndexFilters,
time_decay_multiplier: float,
- index_name: str,
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
edit_keyword_query: bool = EDIT_KEYWORD_QUERY,
@@ -880,7 +861,7 @@ class VespaIndex(DocumentIndex):
# IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY
vespa_where_clauses = _build_vespa_filters(filters)
yql = (
- VespaIndex.yql_base.format(index_name=index_name)
+ VespaIndex.yql_base.format(index_name=self.index_name)
+ vespa_where_clauses
# `({defaultIndex: "content_summary"}userInput(@query))` section is
# needed for highlighting while the N-gram highlighting is broken /
@@ -908,7 +889,6 @@ class VespaIndex(DocumentIndex):
query: str,
filters: IndexFilters,
time_decay_multiplier: float,
- index_name: str,
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF,
@@ -917,7 +897,7 @@ class VespaIndex(DocumentIndex):
# IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY
vespa_where_clauses = _build_vespa_filters(filters)
yql = (
- VespaIndex.yql_base.format(index_name=index_name)
+ VespaIndex.yql_base.format(index_name=self.index_name)
+ vespa_where_clauses
+ f"(({{targetHits: {10 * num_to_retrieve}}}nearestNeighbor(embeddings, query_embedding)) "
# `({defaultIndex: "content_summary"}userInput(@query))` section is
@@ -953,7 +933,6 @@ class VespaIndex(DocumentIndex):
filters: IndexFilters,
time_decay_multiplier: float,
num_to_retrieve: int,
- index_name: str,
offset: int = 0,
hybrid_alpha: float | None = HYBRID_ALPHA,
title_content_ratio: float | None = TITLE_CONTENT_RATIO,
@@ -964,7 +943,7 @@ class VespaIndex(DocumentIndex):
# Needs to be at least as much as the value set in Vespa schema config
target_hits = max(10 * num_to_retrieve, 1000)
yql = (
- VespaIndex.yql_base.format(index_name=index_name)
+ VespaIndex.yql_base.format(index_name=self.index_name)
+ vespa_where_clauses
+ f"(({{targetHits: {target_hits}}}nearestNeighbor(embeddings, query_embedding)) "
+ f"or ({{targetHits: {target_hits}}}nearestNeighbor(title_embedding, query_embedding)) "
@@ -1003,13 +982,12 @@ class VespaIndex(DocumentIndex):
self,
query: str,
filters: IndexFilters,
- index_name: str,
num_to_retrieve: int = NUM_RETURNED_HITS,
offset: int = 0,
) -> list[InferenceChunk]:
vespa_where_clauses = _build_vespa_filters(filters, include_hidden=True)
yql = (
- VespaIndex.yql_base.format(index_name=index_name)
+ VespaIndex.yql_base.format(index_name=self.index_name)
+ vespa_where_clauses
+ '({grammar: "weakAnd"}userInput(@query) '
# `({defaultIndex: "content_summary"}userInput(@query))` section is
diff --git a/backend/danswer/indexing/indexing_pipeline.py b/backend/danswer/indexing/indexing_pipeline.py
index 23fa2bd86..1b6891a32 100644
--- a/backend/danswer/indexing/indexing_pipeline.py
+++ b/backend/danswer/indexing/indexing_pipeline.py
@@ -19,6 +19,7 @@ from danswer.db.document_set import fetch_document_sets_for_documents
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.tag import create_or_add_document_tag
from danswer.db.tag import create_or_add_document_tag_list
+from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.factory import get_default_document_index
from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import DocumentMetadata
@@ -96,7 +97,6 @@ def index_doc_batch(
chunker: Chunker,
embedder: Embedder,
document_index: DocumentIndex,
- index_name: str,
documents: list[Document],
index_attempt_metadata: IndexAttemptMetadata,
ignore_time_skip: bool = False,
@@ -188,9 +188,7 @@ def index_doc_batch(
# A document will not be spread across different batches, so all the
# documents with chunks in this set, are fully represented by the chunks
# in this set
- insertion_records = document_index.index(
- chunks=access_aware_chunks, index_name=index_name
- )
+ insertion_records = document_index.index(chunks=access_aware_chunks)
successful_doc_ids = [record.document_id for record in insertion_records]
successful_docs = [
@@ -218,7 +216,6 @@ def build_indexing_pipeline(
chunker: Chunker | None = None,
embedder: Embedder | None = None,
document_index: DocumentIndex | None = None,
- index_name: str,
ignore_time_skip: bool = False,
) -> IndexingPipelineProtocol:
"""Builds a pipline which takes in a list (batch) of docs and indexes them."""
@@ -226,13 +223,16 @@ def build_indexing_pipeline(
embedder = embedder or DefaultEmbedder()
- document_index = document_index or get_default_document_index()
+ if not document_index:
+ with Session(get_sqlalchemy_engine()) as db_session:
+ document_index = get_default_document_index(
+ primary_index_name=get_index_name(db_session), secondary_index_name=None
+ )
return partial(
index_doc_batch,
chunker=chunker,
embedder=embedder,
document_index=document_index,
- index_name=index_name,
ignore_time_skip=ignore_time_skip,
)
diff --git a/backend/danswer/indexing/models.py b/backend/danswer/indexing/models.py
index 331a5fed4..c875c88bd 100644
--- a/backend/danswer/indexing/models.py
+++ b/backend/danswer/indexing/models.py
@@ -2,6 +2,8 @@ from dataclasses import dataclass
from dataclasses import fields
from datetime import datetime
+from pydantic import BaseModel
+
from danswer.access.models import DocumentAccess
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import Document
@@ -120,3 +122,11 @@ class InferenceChunk(BaseChunk):
break
short_blurb += " " + word
return f"Inference Chunk: {self.document_id} - {short_blurb}..."
+
+
+class EmbeddingModelDetail(BaseModel):
+ model_name: str
+ model_dim: int
+ normalize: bool
+ query_prefix: str | None
+ passage_prefix: str | None
diff --git a/backend/danswer/main.py b/backend/danswer/main.py
index 8746b37ab..295dc3173 100644
--- a/backend/danswer/main.py
+++ b/backend/danswer/main.py
@@ -35,6 +35,7 @@ from danswer.configs.chat_configs import MULTILINGUAL_QUERY_EXPANSION
from danswer.configs.constants import AuthType
from danswer.configs.model_configs import ASYM_PASSAGE_PREFIX
from danswer.configs.model_configs import ASYM_QUERY_PREFIX
+from danswer.configs.model_configs import DOC_EMBEDDING_DIM
from danswer.configs.model_configs import DOCUMENT_ENCODER_MODEL
from danswer.configs.model_configs import ENABLE_RERANKING_REAL_TIME_FLOW
from danswer.configs.model_configs import FAST_GEN_AI_MODEL_VERSION
@@ -44,7 +45,10 @@ from danswer.configs.model_configs import GEN_AI_MODEL_VERSION
from danswer.db.connector import create_initial_default_connector
from danswer.db.connector_credential_pair import associate_default_cc_pair
from danswer.db.credentials import create_initial_public_credential
+from danswer.db.embedding_model import get_latest_embedding_model_by_status
from danswer.db.engine import get_sqlalchemy_engine
+from danswer.db.models import IndexModelStatus
+from danswer.document_index.document_index_utils import clean_model_name
from danswer.document_index.factory import get_default_document_index
from danswer.llm.factory import get_default_llm
from danswer.search.search_nlp_models import warm_up_models
@@ -278,7 +282,34 @@ def get_application() -> FastAPI:
load_chat_yamls()
logger.info("Verifying Document Index(s) is/are available.")
- get_default_document_index().ensure_indices_exist()
+ primary_embedding_model = get_latest_embedding_model_by_status(
+ status=IndexModelStatus.PRESENT, db_session=db_session
+ )
+ secondary_embedding_model = get_latest_embedding_model_by_status(
+ status=IndexModelStatus.FUTURE, db_session=db_session
+ )
+ primary_index = (
+ f"danswer_chunk_{clean_model_name(primary_embedding_model.model_name)}"
+ if primary_embedding_model
+ else "danswer_chunk"
+ )
+ second_index = (
+ f"danswer_chunk_{clean_model_name(secondary_embedding_model.model_name)}"
+ if secondary_embedding_model
+ else None
+ )
+
+ document_index = get_default_document_index(
+ primary_index_name=primary_index, secondary_index_name=second_index
+ )
+ document_index.ensure_indices_exist(
+ index_embedding_dim=primary_embedding_model.model_dim
+ if primary_embedding_model
+ else DOC_EMBEDDING_DIM,
+ secondary_index_embedding_dim=secondary_embedding_model.model_dim
+ if secondary_embedding_model
+ else None,
+ )
optional_telemetry(
record_type=RecordType.VERSION, data={"version": __version__}
diff --git a/backend/danswer/one_shot_answer/answer_question.py b/backend/danswer/one_shot_answer/answer_question.py
index 1ebbd2980..4593b4f65 100644
--- a/backend/danswer/one_shot_answer/answer_question.py
+++ b/backend/danswer/one_shot_answer/answer_question.py
@@ -22,6 +22,7 @@ from danswer.db.chat import get_persona_by_id
from danswer.db.chat import get_prompt_by_id
from danswer.db.chat import translate_db_message_to_chat_message_detail
from danswer.db.models import User
+from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.factory import get_default_document_index
from danswer.indexing.models import InferenceChunk
from danswer.llm.utils import get_default_llm_token_encode
@@ -89,7 +90,10 @@ def stream_answer_objects(
)
llm_tokenizer = get_default_llm_token_encode()
- document_index = get_default_document_index()
+
+ document_index = get_default_document_index(
+ primary_index_name=get_index_name(db_session), secondary_index_name=None
+ )
# Create a chat session which will just store the root message, the query, and the AI response
root_message = get_or_create_root_message(
diff --git a/backend/danswer/search/search_runner.py b/backend/danswer/search/search_runner.py
index c920d59b6..54e69c31e 100644
--- a/backend/danswer/search/search_runner.py
+++ b/backend/danswer/search/search_runner.py
@@ -17,7 +17,6 @@ from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MAX
from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MIN
from danswer.configs.model_configs import SIM_SCORE_RANGE_HIGH
from danswer.configs.model_configs import SIM_SCORE_RANGE_LOW
-from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.document_index_utils import (
translate_boost_count_to_multiplier,
)
@@ -143,14 +142,12 @@ def doc_index_retrieval(
document_index: DocumentIndex,
hybrid_alpha: float = HYBRID_ALPHA,
) -> list[InferenceChunk]:
- index = get_index_name()
if query.search_type == SearchType.KEYWORD:
top_chunks = document_index.keyword_retrieval(
query=query.query,
filters=query.filters,
time_decay_multiplier=query.recency_bias_multiplier,
num_to_retrieve=query.num_hits,
- index_name=index,
)
elif query.search_type == SearchType.SEMANTIC:
@@ -159,7 +156,6 @@ def doc_index_retrieval(
filters=query.filters,
time_decay_multiplier=query.recency_bias_multiplier,
num_to_retrieve=query.num_hits,
- index_name=index,
)
elif query.search_type == SearchType.HYBRID:
@@ -170,7 +166,6 @@ def doc_index_retrieval(
num_to_retrieve=query.num_hits,
offset=query.offset,
hybrid_alpha=hybrid_alpha,
- index_name=index,
)
else:
diff --git a/backend/danswer/server/danswer_api/ingestion.py b/backend/danswer/server/danswer_api/ingestion.py
index 712310b34..2a0281657 100644
--- a/backend/danswer/server/danswer_api/ingestion.py
+++ b/backend/danswer/server/danswer_api/ingestion.py
@@ -16,6 +16,7 @@ from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.engine import get_session
from danswer.document_index.document_index_utils import get_both_index_names
+from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
@@ -142,10 +143,14 @@ def document_ingestion(
if document.source == DocumentSource.INGESTION_API:
document.source = DocumentSource.FILE
- index_names = get_both_index_names()
+ # Need to index for both the primary and secondary index if possible
+ curr_ind_name, sec_ind_name = get_both_index_names(db_session)
+ curr_doc_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=None
+ )
indexing_pipeline = build_indexing_pipeline(
- ignore_time_skip=True, index_name=index_names[0]
+ ignore_time_skip=True, document_index=curr_doc_index
)
new_doc, chunks = indexing_pipeline(
@@ -157,8 +162,16 @@ def document_ingestion(
)
# If there's a secondary index being built, index the doc but don't use it for return here
- if len(index_names) > 1:
- indexing_pipeline(
+ if sec_ind_name:
+ sec_doc_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=None
+ )
+
+ sec_ind_pipeline = build_indexing_pipeline(
+ ignore_time_skip=True, document_index=sec_doc_index
+ )
+
+ sec_ind_pipeline(
documents=[document],
index_attempt_metadata=IndexAttemptMetadata(
connector_id=connector_id,
diff --git a/backend/danswer/server/documents/document.py b/backend/danswer/server/documents/document.py
index 7552322f2..57f515bff 100644
--- a/backend/danswer/server/documents/document.py
+++ b/backend/danswer/server/documents/document.py
@@ -27,7 +27,9 @@ def get_document_info(
user: User | None = Depends(current_user),
db_session: Session = Depends(get_session),
) -> DocumentInfo:
- document_index = get_default_document_index()
+ document_index = get_default_document_index(
+ primary_index_name=get_index_name(db_session), secondary_index_name=None
+ )
user_acl_filters = build_access_filters_for_user(user, db_session)
filters = IndexFilters(access_control_list=user_acl_filters)
@@ -36,7 +38,6 @@ def get_document_info(
document_id=document_id,
chunk_ind=None,
filters=filters,
- index_name=get_index_name(),
)
if not inference_chunks:
@@ -60,7 +61,9 @@ def get_chunk_info(
user: User | None = Depends(current_user),
db_session: Session = Depends(get_session),
) -> ChunkInfo:
- document_index = get_default_document_index()
+ document_index = get_default_document_index(
+ primary_index_name=get_index_name(db_session), secondary_index_name=None
+ )
user_acl_filters = build_access_filters_for_user(user, db_session)
filters = IndexFilters(access_control_list=user_acl_filters)
@@ -69,7 +72,6 @@ def get_chunk_info(
document_id=document_id,
chunk_ind=chunk_id,
filters=filters,
- index_name=get_index_name(),
)
if not inference_chunks:
diff --git a/backend/danswer/server/gpts/api.py b/backend/danswer/server/gpts/api.py
index 67b637bf8..772726b2f 100644
--- a/backend/danswer/server/gpts/api.py
+++ b/backend/danswer/server/gpts/api.py
@@ -7,6 +7,7 @@ from pydantic import BaseModel
from sqlalchemy.orm import Session
from danswer.db.engine import get_session
+from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.factory import get_default_document_index
from danswer.search.access_filters import build_access_filters_for_user
from danswer.search.models import IndexFilters
@@ -83,9 +84,13 @@ def gpt_search(
skip_llm_chunk_filter=True,
)
+ document_index = get_default_document_index(
+ primary_index_name=get_index_name(db_session), secondary_index_name=None
+ )
+
top_chunks, __ = full_chunk_search(
query=search_query,
- document_index=get_default_document_index(),
+ document_index=document_index,
)
return GptSearchResponse(
diff --git a/backend/danswer/server/manage/administrative.py b/backend/danswer/server/manage/administrative.py
index 4fbe07256..50236c5d0 100644
--- a/backend/danswer/server/manage/administrative.py
+++ b/backend/danswer/server/manage/administrative.py
@@ -18,6 +18,7 @@ from danswer.db.feedback import fetch_docs_ranked_by_boost
from danswer.db.feedback import update_document_boost
from danswer.db.feedback import update_document_hidden
from danswer.db.models import User
+from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.dynamic_configs import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
@@ -68,12 +69,17 @@ def document_boost_update(
_: User | None = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> None:
+ curr_ind_name, sec_ind_name = get_both_index_names(db_session)
+ document_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
+ )
+
try:
update_document_boost(
db_session=db_session,
document_id=boost_update.document_id,
boost=boost_update.boost,
- document_index=get_default_document_index(),
+ document_index=document_index,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@@ -85,12 +91,17 @@ def document_hidden_update(
_: User | None = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> None:
+ curr_ind_name, sec_ind_name = get_both_index_names(db_session)
+ document_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
+ )
+
try:
update_document_hidden(
db_session=db_session,
document_id=hidden_update.document_id,
hidden=hidden_update.hidden,
- document_index=get_default_document_index(),
+ document_index=document_index,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
diff --git a/backend/danswer/server/manage/models.py b/backend/danswer/server/manage/models.py
index 64af01a9f..01cff7c97 100644
--- a/backend/danswer/server/manage/models.py
+++ b/backend/danswer/server/manage/models.py
@@ -103,3 +103,7 @@ class SlackBotConfig(BaseModel):
id: int
persona: PersonaSnapshot | None
channel_config: ChannelConfig
+
+
+class ModelVersionResponse(BaseModel):
+ model_name: str | None # None only applicable to secondary index
diff --git a/backend/danswer/server/manage/secondary_index.py b/backend/danswer/server/manage/secondary_index.py
new file mode 100644
index 000000000..198730e3d
--- /dev/null
+++ b/backend/danswer/server/manage/secondary_index.py
@@ -0,0 +1,33 @@
+from fastapi import APIRouter
+from fastapi import Depends
+
+from danswer.auth.users import current_admin_user
+from danswer.db.models import User
+from danswer.indexing.models import EmbeddingModelDetail
+from danswer.server.manage.models import ModelVersionResponse
+from danswer.utils.logger import setup_logger
+
+router = APIRouter(prefix="/secondary-index")
+logger = setup_logger()
+
+
+@router.post("/set-new-embedding-model")
+def set_new_embedding_model(
+ embed_model_details: EmbeddingModelDetail,
+ _: User | None = Depends(current_admin_user),
+) -> None:
+ raise NotImplementedError()
+
+
+@router.get("/get-current-embedding-model")
+def get_current_embedding_model(
+ _: User | None = Depends(current_admin_user),
+) -> ModelVersionResponse:
+ raise NotImplementedError()
+
+
+@router.get("/get-secondary-embedding-model")
+def get_secondary_embedding_model(
+ _: User | None = Depends(current_admin_user),
+) -> ModelVersionResponse:
+ raise NotImplementedError()
diff --git a/backend/danswer/server/query_and_chat/chat_backend.py b/backend/danswer/server/query_and_chat/chat_backend.py
index 2a1714104..726c3342d 100644
--- a/backend/danswer/server/query_and_chat/chat_backend.py
+++ b/backend/danswer/server/query_and_chat/chat_backend.py
@@ -20,6 +20,7 @@ from danswer.db.engine import get_session
from danswer.db.feedback import create_chat_message_feedback
from danswer.db.feedback import create_doc_retrieval_feedback
from danswer.db.models import User
+from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
from danswer.secondary_llm_flows.chat_session_naming import (
get_renamed_conversation_name,
@@ -228,12 +229,18 @@ def create_search_feedback(
"""This endpoint isn't protected - it does not check if the user has access to the document
Users could try changing boosts of arbitrary docs but this does not leak any data.
"""
+
+ curr_ind_name, sec_ind_name = get_both_index_names(db_session)
+ document_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
+ )
+
create_doc_retrieval_feedback(
message_id=feedback.message_id,
document_id=feedback.document_id,
document_rank=feedback.document_rank,
clicked=feedback.click,
feedback=feedback.search_feedback,
- document_index=get_default_document_index(),
+ document_index=document_index,
db_session=db_session,
)
diff --git a/backend/danswer/server/query_and_chat/query_backend.py b/backend/danswer/server/query_and_chat/query_backend.py
index 27d02973d..552bf266a 100644
--- a/backend/danswer/server/query_and_chat/query_backend.py
+++ b/backend/danswer/server/query_and_chat/query_backend.py
@@ -53,16 +53,18 @@ def admin_search(
time_cutoff=question.filters.time_cutoff,
access_control_list=user_acl_filters,
)
- document_index = get_default_document_index()
+
+ document_index = get_default_document_index(
+ primary_index_name=get_index_name(db_session), secondary_index_name=None
+ )
+
if not isinstance(document_index, VespaIndex):
raise HTTPException(
status_code=400,
detail="Cannot use admin-search when using a non-Vespa document index",
)
- matching_chunks = document_index.admin_retrieval(
- query=query, filters=final_filters, index_name=get_index_name()
- )
+ matching_chunks = document_index.admin_retrieval(query=query, filters=final_filters)
documents = chunks_to_search_docs(matching_chunks)
diff --git a/backend/danswer/utils/acl.py b/backend/danswer/utils/acl.py
index 01256a813..268457bfd 100644
--- a/backend/danswer/utils/acl.py
+++ b/backend/danswer/utils/acl.py
@@ -32,10 +32,6 @@ def set_acl_for_vespa(should_check_if_already_done: bool = False) -> None:
except ConfigNotFoundError:
pass
- vespa_index = get_default_document_index()
- if not isinstance(vespa_index, VespaIndex):
- raise ValueError("This script is only for Vespa indexes")
-
logger.info("Populating Access Control List fields in Vespa")
with Session(get_sqlalchemy_engine()) as db_session:
# for all documents, set the `access_control_list` field appropriately
@@ -46,15 +42,21 @@ def set_acl_for_vespa(should_check_if_already_done: bool = False) -> None:
document_ids=[document.id for document in documents],
)
- for index_name in get_both_index_names():
- update_requests = [
- UpdateRequest(
- document_ids=[document_id],
- access=DocumentAccess.build(user_ids, is_public),
- )
- for document_id, user_ids, is_public in document_access_info
- ]
- vespa_index.update(update_requests=update_requests, index_name=index_name)
+ curr_ind_name, sec_ind_name = get_both_index_names(db_session)
+ vespa_index = get_default_document_index(
+ primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
+ )
+ if not isinstance(vespa_index, VespaIndex):
+ raise ValueError("This script is only for Vespa indexes")
+
+ update_requests = [
+ UpdateRequest(
+ document_ids=[document_id],
+ access=DocumentAccess.build(user_ids, is_public),
+ )
+ for document_id, user_ids, is_public in document_access_info
+ ]
+ vespa_index.update(update_requests=update_requests)
dynamic_config_store.store(_COMPLETED_ACL_UPDATE_KEY, True)
diff --git a/backend/tests/regression/search_quality/eval_search.py b/backend/tests/regression/search_quality/eval_search.py
index fd83b8d93..9561ca9f6 100644
--- a/backend/tests/regression/search_quality/eval_search.py
+++ b/backend/tests/regression/search_quality/eval_search.py
@@ -5,8 +5,11 @@ from contextlib import contextmanager
from typing import Any
from typing import TextIO
+from sqlalchemy.orm import Session
+
from danswer.chat.chat_utils import get_chunks_for_qa
from danswer.db.engine import get_sqlalchemy_engine
+from danswer.document_index.document_index_utils import get_index_name
from danswer.document_index.factory import get_default_document_index
from danswer.indexing.models import InferenceChunk
from danswer.search.models import IndexFilters
@@ -93,9 +96,16 @@ def get_search_results(
retrieval_metrics = MetricsHander[RetrievalMetricsContainer]()
rerank_metrics = MetricsHander[RerankMetricsContainer]()
+ with Session(get_sqlalchemy_engine()) as db_session:
+ ind_name = get_index_name(db_session)
+
+ document_index = get_default_document_index(
+ primary_index_name=ind_name, secondary_index_name=None
+ )
+
top_chunks, llm_chunk_selection = full_chunk_search(
query=search_query,
- document_index=get_default_document_index(),
+ document_index=document_index,
retrieval_metrics_callback=retrieval_metrics.record_metric,
rerank_metrics_callback=rerank_metrics.record_metric,
)