mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-03-26 17:51:54 +01:00
Embedding Models Table (#1006)
This commit is contained in:
parent
81c33cc325
commit
cf4ede2130
70
backend/alembic/versions/dbaa756c2ccf_embedding_models.py
Normal file
70
backend/alembic/versions/dbaa756c2ccf_embedding_models.py
Normal file
@ -0,0 +1,70 @@
|
||||
"""Embedding Models
|
||||
|
||||
Revision ID: dbaa756c2ccf
|
||||
Revises: 7f726bad5367
|
||||
Create Date: 2024-01-25 17:12:31.813160
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
from danswer.db.models import IndexModelStatus
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "dbaa756c2ccf"
|
||||
down_revision = "7f726bad5367"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
"embedding_model",
|
||||
sa.Column("id", sa.Integer(), nullable=False),
|
||||
sa.Column("model_name", sa.String(), nullable=False),
|
||||
sa.Column("model_dim", sa.Integer(), nullable=False),
|
||||
sa.Column("normalize", sa.Boolean(), nullable=False),
|
||||
sa.Column("query_prefix", sa.String(), nullable=False),
|
||||
sa.Column("passage_prefix", sa.String(), nullable=False),
|
||||
sa.Column(
|
||||
"status",
|
||||
sa.Enum(IndexModelStatus, native=False),
|
||||
nullable=False,
|
||||
),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
op.add_column(
|
||||
"index_attempt",
|
||||
sa.Column("embedding_model_id", sa.Integer(), nullable=True),
|
||||
)
|
||||
op.create_foreign_key(
|
||||
"index_attempt__embedding_model_fk",
|
||||
"index_attempt",
|
||||
"embedding_model",
|
||||
["embedding_model_id"],
|
||||
["id"],
|
||||
)
|
||||
op.create_index(
|
||||
"ix_embedding_model_present_unique",
|
||||
"embedding_model",
|
||||
["status"],
|
||||
unique=True,
|
||||
postgresql_where=sa.text("status = 'PRESENT'"),
|
||||
)
|
||||
op.create_index(
|
||||
"ix_embedding_model_future_unique",
|
||||
"embedding_model",
|
||||
["status"],
|
||||
unique=True,
|
||||
postgresql_where=sa.text("status = 'FUTURE'"),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_constraint(
|
||||
"index_attempt__embedding_model_fk", "index_attempt", type_="foreignkey"
|
||||
)
|
||||
op.drop_column("index_attempt", "embedding_model_id")
|
||||
op.drop_table("embedding_model")
|
||||
op.drop_index("ix_embedding_model_present_unique", table_name="embedding_model")
|
||||
op.drop_index("ix_embedding_model_future_unique", table_name="embedding_model")
|
@ -30,7 +30,6 @@ from danswer.db.tasks import check_live_task_not_timed_out
|
||||
from danswer.db.tasks import get_latest_task
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.document_index.interfaces import DocumentIndex
|
||||
from danswer.document_index.interfaces import UpdateRequest
|
||||
from danswer.utils.batching import batch_generator
|
||||
from danswer.utils.logger import setup_logger
|
||||
@ -79,9 +78,13 @@ def cleanup_connector_credential_pair_task(
|
||||
|
||||
try:
|
||||
# The bulk of the work is in here, updates Postgres and Vespa
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
)
|
||||
return delete_connector_credential_pair(
|
||||
db_session=db_session,
|
||||
document_index=get_default_document_index(),
|
||||
document_index=document_index,
|
||||
cc_pair=cc_pair,
|
||||
)
|
||||
except Exception as e:
|
||||
@ -95,9 +98,7 @@ def sync_document_set_task(document_set_id: int) -> None:
|
||||
"""For document sets marked as not up to date, sync the state from postgres
|
||||
into the datastore. Also handles deletions."""
|
||||
|
||||
def _sync_document_batch(
|
||||
document_ids: list[str], document_index: DocumentIndex
|
||||
) -> None:
|
||||
def _sync_document_batch(document_ids: list[str]) -> None:
|
||||
logger.debug(f"Syncing document sets for: {document_ids}")
|
||||
# begin a transaction, release lock at the end
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
@ -115,21 +116,21 @@ def sync_document_set_task(document_set_id: int) -> None:
|
||||
}
|
||||
|
||||
# update Vespa
|
||||
for index_name in get_both_index_names():
|
||||
update_requests = [
|
||||
UpdateRequest(
|
||||
document_ids=[document_id],
|
||||
document_sets=set(document_set_map.get(document_id, [])),
|
||||
)
|
||||
for document_id in document_ids
|
||||
]
|
||||
document_index.update(
|
||||
update_requests=update_requests, index_name=index_name
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
)
|
||||
update_requests = [
|
||||
UpdateRequest(
|
||||
document_ids=[document_id],
|
||||
document_sets=set(document_set_map.get(document_id, [])),
|
||||
)
|
||||
for document_id in document_ids
|
||||
]
|
||||
document_index.update(update_requests=update_requests)
|
||||
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
try:
|
||||
document_index = get_default_document_index()
|
||||
documents_to_update = fetch_documents_for_document_set(
|
||||
document_set_id=document_set_id,
|
||||
db_session=db_session,
|
||||
@ -139,8 +140,7 @@ def sync_document_set_task(document_set_id: int) -> None:
|
||||
documents_to_update, _SYNC_BATCH_SIZE
|
||||
):
|
||||
_sync_document_batch(
|
||||
document_ids=[document.id for document in document_batch],
|
||||
document_index=document_index,
|
||||
document_ids=[document.id for document in document_batch]
|
||||
)
|
||||
|
||||
# if there are no connectors, then delete the document set. Otherwise, just
|
||||
|
@ -31,7 +31,6 @@ from danswer.db.document_set import (
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.db.index_attempt import delete_index_attempts
|
||||
from danswer.db.models import ConnectorCredentialPair
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.interfaces import DocumentIndex
|
||||
from danswer.document_index.interfaces import UpdateRequest
|
||||
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
|
||||
@ -63,8 +62,7 @@ def _delete_connector_credential_pair_batch(
|
||||
]
|
||||
logger.debug(f"Deleting documents: {document_ids_to_delete}")
|
||||
|
||||
for index_name in get_both_index_names():
|
||||
document_index.delete(doc_ids=document_ids_to_delete, index_name=index_name)
|
||||
document_index.delete(doc_ids=document_ids_to_delete)
|
||||
|
||||
delete_documents_complete(
|
||||
db_session=db_session,
|
||||
@ -92,10 +90,7 @@ def _delete_connector_credential_pair_batch(
|
||||
]
|
||||
logger.debug(f"Updating documents: {document_ids_to_update}")
|
||||
|
||||
for index_name in get_both_index_names():
|
||||
document_index.update(
|
||||
update_requests=update_requests, index_name=index_name
|
||||
)
|
||||
document_index.update(update_requests=update_requests)
|
||||
|
||||
delete_document_by_connector_credential_pair(
|
||||
db_session=db_session,
|
||||
|
@ -26,7 +26,6 @@ from danswer.db.index_attempt import mark_attempt_succeeded
|
||||
from danswer.db.index_attempt import update_docs_indexed
|
||||
from danswer.db.models import IndexAttempt
|
||||
from danswer.db.models import IndexingStatus
|
||||
from danswer.document_index.document_index_utils import get_index_name
|
||||
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
|
||||
from danswer.utils.logger import IndexAttemptSingleton
|
||||
from danswer.utils.logger import setup_logger
|
||||
@ -104,7 +103,7 @@ def _run_indexing(
|
||||
)
|
||||
|
||||
# TODO UPDATE THIS FOR SECONDARY INDEXING
|
||||
indexing_pipeline = build_indexing_pipeline(index_name=get_index_name())
|
||||
indexing_pipeline = build_indexing_pipeline()
|
||||
|
||||
db_connector = index_attempt.connector
|
||||
db_credential = index_attempt.credential
|
||||
|
@ -35,6 +35,7 @@ from danswer.db.chat import translate_db_search_doc_to_server_search_doc
|
||||
from danswer.db.models import ChatMessage
|
||||
from danswer.db.models import SearchDoc as DbSearchDoc
|
||||
from danswer.db.models import User
|
||||
from danswer.document_index.document_index_utils import get_index_name
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.indexing.models import InferenceChunk
|
||||
from danswer.llm.exceptions import GenAIDisabledException
|
||||
@ -187,7 +188,9 @@ def stream_chat_message(
|
||||
llm = None
|
||||
|
||||
llm_tokenizer = get_default_llm_token_encode()
|
||||
document_index = get_default_document_index()
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=get_index_name(db_session), secondary_index_name=None
|
||||
)
|
||||
|
||||
# Every chat Session begins with an empty root message
|
||||
root_message = get_or_create_root_message(
|
||||
@ -256,7 +259,7 @@ def stream_chat_message(
|
||||
# May extend to include chunk ranges
|
||||
llm_docs: list[LlmDoc] = inference_documents_from_ids(
|
||||
doc_identifiers=identifier_tuples,
|
||||
document_index=get_default_document_index(),
|
||||
document_index=document_index,
|
||||
)
|
||||
doc_id_to_rank_map = map_document_id_order(
|
||||
cast(list[InferenceChunk | LlmDoc], llm_docs)
|
||||
|
@ -47,12 +47,6 @@ SECTION_SEPARATOR = "\n\n"
|
||||
# For combining attributes, doesn't have to be unique/perfect to work
|
||||
INDEX_SEPARATOR = "==="
|
||||
|
||||
# Index Related
|
||||
CURRENT_EMBEDDING_MODEL = "CURRENT_EMBEDDING_MODEL"
|
||||
CURRENT_EMBEDDING_DIM = "CURRENT_EMBEDDING_DIM"
|
||||
UPCOMING_EMBEDDING_MODEL = "UPCOMING_EMBEDDING_MODEL"
|
||||
UPCOMING_EMBEDDING_DIM = "UPCOMING_EMBEDDING_DIM"
|
||||
|
||||
|
||||
# Messages
|
||||
DISABLED_GEN_AI_MSG = (
|
||||
|
@ -27,6 +27,7 @@ from danswer.danswerbot.slack.utils import update_emote_react
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.db.feedback import create_chat_message_feedback
|
||||
from danswer.db.feedback import create_doc_retrieval_feedback
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
@ -102,11 +103,16 @@ def handle_slack_feedback(
|
||||
else:
|
||||
feedback = SearchFeedbackType.HIDE
|
||||
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
)
|
||||
|
||||
create_doc_retrieval_feedback(
|
||||
message_id=message_id,
|
||||
document_id=doc_id,
|
||||
document_rank=doc_rank,
|
||||
document_index=get_default_document_index(),
|
||||
document_index=document_index,
|
||||
db_session=db_session,
|
||||
clicked=False, # Not tracking this for Slack
|
||||
feedback=feedback,
|
||||
|
43
backend/danswer/db/embedding_model.py
Normal file
43
backend/danswer/db/embedding_model.py
Normal file
@ -0,0 +1,43 @@
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.db.models import EmbeddingModel
|
||||
from danswer.db.models import IndexModelStatus
|
||||
from danswer.indexing.models import EmbeddingModelDetail
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def create_embedding_model(
|
||||
model_details: EmbeddingModelDetail,
|
||||
db_session: Session,
|
||||
status: IndexModelStatus = IndexModelStatus.FUTURE,
|
||||
) -> EmbeddingModel:
|
||||
embedding_model = EmbeddingModel(
|
||||
model_name=model_details.model_name,
|
||||
model_dim=model_details.model_dim,
|
||||
normalize=model_details.normalize,
|
||||
query_prefix=model_details.query_prefix,
|
||||
passage_prefix=model_details.passage_prefix,
|
||||
status=status,
|
||||
)
|
||||
|
||||
db_session.add(embedding_model)
|
||||
db_session.commit()
|
||||
|
||||
return embedding_model
|
||||
|
||||
|
||||
def get_latest_embedding_model_by_status(
|
||||
status: IndexModelStatus, db_session: Session
|
||||
) -> EmbeddingModel | None:
|
||||
query = (
|
||||
select(EmbeddingModel)
|
||||
.where(EmbeddingModel.status == status)
|
||||
.order_by(EmbeddingModel.id.desc())
|
||||
)
|
||||
result = db_session.execute(query)
|
||||
latest_model = result.scalars().first()
|
||||
|
||||
return latest_model
|
@ -12,7 +12,6 @@ from danswer.db.chat import get_chat_message
|
||||
from danswer.db.models import ChatMessageFeedback
|
||||
from danswer.db.models import Document as DbDocument
|
||||
from danswer.db.models import DocumentRetrievalFeedback
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.interfaces import DocumentIndex
|
||||
from danswer.document_index.interfaces import UpdateRequest
|
||||
|
||||
@ -58,8 +57,7 @@ def update_document_boost(
|
||||
boost=boost,
|
||||
)
|
||||
|
||||
for index_name in get_both_index_names():
|
||||
document_index.update(update_requests=[update], index_name=index_name)
|
||||
document_index.update(update_requests=[update])
|
||||
|
||||
db_session.commit()
|
||||
|
||||
@ -79,8 +77,7 @@ def update_document_hidden(
|
||||
hidden=hidden,
|
||||
)
|
||||
|
||||
for index_name in get_both_index_names():
|
||||
document_index.update(update_requests=[update], index_name=index_name)
|
||||
document_index.update(update_requests=[update])
|
||||
|
||||
db_session.commit()
|
||||
|
||||
@ -126,8 +123,7 @@ def create_doc_retrieval_feedback(
|
||||
document_ids=[document_id], boost=db_doc.boost, hidden=db_doc.hidden
|
||||
)
|
||||
# Updates are generally batched for efficiency, this case only 1 doc/value is updated
|
||||
for index_name in get_both_index_names():
|
||||
document_index.update(update_requests=[update], index_name=index_name)
|
||||
document_index.update(update_requests=[update])
|
||||
|
||||
db_session.add(retrieval_feedback)
|
||||
db_session.commit()
|
||||
|
@ -62,6 +62,12 @@ class TaskStatus(str, PyEnum):
|
||||
FAILURE = "FAILURE"
|
||||
|
||||
|
||||
class IndexModelStatus(str, PyEnum):
|
||||
PAST = "PAST"
|
||||
PRESENT = "PRESENT"
|
||||
FUTURE = "FUTURE"
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
pass
|
||||
|
||||
@ -363,6 +369,37 @@ class Credential(Base):
|
||||
user: Mapped[User | None] = relationship("User", back_populates="credentials")
|
||||
|
||||
|
||||
class EmbeddingModel(Base):
|
||||
__tablename__ = "embedding_model"
|
||||
# ID is used also to indicate the order that the models are configured by the admin
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
model_name: Mapped[str] = mapped_column(String)
|
||||
model_dim: Mapped[int] = mapped_column(Integer)
|
||||
normalize: Mapped[bool] = mapped_column(Boolean)
|
||||
query_prefix: Mapped[str] = mapped_column(String)
|
||||
passage_prefix: Mapped[str] = mapped_column(String)
|
||||
status: Mapped[IndexModelStatus] = mapped_column(Enum(IndexModelStatus))
|
||||
|
||||
index_attempts: Mapped[List["IndexAttempt"]] = relationship(
|
||||
"IndexAttempt", back_populates="embedding_model"
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
Index(
|
||||
"ix_embedding_model_present_unique",
|
||||
"status",
|
||||
unique=True,
|
||||
postgresql_where=(status == IndexModelStatus.PRESENT),
|
||||
),
|
||||
Index(
|
||||
"ix_embedding_model_future_unique",
|
||||
"status",
|
||||
unique=True,
|
||||
postgresql_where=(status == IndexModelStatus.FUTURE),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class IndexAttempt(Base):
|
||||
"""
|
||||
Represents an attempt to index a group of 1 or more documents from a
|
||||
@ -387,6 +424,11 @@ class IndexAttempt(Base):
|
||||
error_msg: Mapped[str | None] = mapped_column(
|
||||
Text, default=None
|
||||
) # only filled if status = "failed"
|
||||
# Nullable because in the past, we didn't allow swapping out embedding models live
|
||||
embedding_model_id: Mapped[int | None] = mapped_column(
|
||||
ForeignKey("embedding_model.id"),
|
||||
nullable=True,
|
||||
)
|
||||
time_created: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True),
|
||||
server_default=func.now(),
|
||||
@ -408,6 +450,9 @@ class IndexAttempt(Base):
|
||||
credential: Mapped[Credential] = relationship(
|
||||
"Credential", back_populates="index_attempts"
|
||||
)
|
||||
embedding_model: Mapped[EmbeddingModel] = relationship(
|
||||
"EmbeddingModel", back_populates="index_attempts"
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
Index(
|
||||
|
@ -1,11 +1,10 @@
|
||||
import math
|
||||
import uuid
|
||||
from typing import cast
|
||||
|
||||
from danswer.configs.constants import CURRENT_EMBEDDING_MODEL
|
||||
from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL
|
||||
from danswer.dynamic_configs import get_dynamic_config_store
|
||||
from danswer.dynamic_configs.interface import ConfigNotFoundError
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.db.embedding_model import get_latest_embedding_model_by_status
|
||||
from danswer.db.models import IndexModelStatus
|
||||
from danswer.indexing.models import IndexChunk
|
||||
from danswer.indexing.models import InferenceChunk
|
||||
|
||||
@ -17,34 +16,43 @@ def clean_model_name(model_str: str) -> str:
|
||||
return model_str.replace("/", "_").replace("-", "_").replace(".", "_")
|
||||
|
||||
|
||||
def get_index_name(secondary_index: bool = False) -> str:
|
||||
# TODO make this more efficient if needed
|
||||
kv_store = get_dynamic_config_store()
|
||||
if not secondary_index:
|
||||
try:
|
||||
embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
|
||||
return f"danswer_chunk_{clean_model_name(embed_model)}"
|
||||
except ConfigNotFoundError:
|
||||
return "danswer_chunk"
|
||||
def get_index_name(
|
||||
db_session: Session,
|
||||
secondary_index: bool = False,
|
||||
) -> str:
|
||||
if secondary_index:
|
||||
model = get_latest_embedding_model_by_status(
|
||||
status=IndexModelStatus.FUTURE, db_session=db_session
|
||||
)
|
||||
if model is None:
|
||||
raise RuntimeError("No secondary index being built")
|
||||
return f"danswer_chunk_{clean_model_name(model.model_name)}"
|
||||
|
||||
embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
|
||||
return f"danswer_chunk_{clean_model_name(embed_model)}"
|
||||
model = get_latest_embedding_model_by_status(
|
||||
status=IndexModelStatus.PRESENT, db_session=db_session
|
||||
)
|
||||
if not model:
|
||||
return "danswer_chunk"
|
||||
return f"danswer_chunk_{clean_model_name(model.model_name)}"
|
||||
|
||||
|
||||
def get_both_index_names() -> list[str]:
|
||||
kv_store = get_dynamic_config_store()
|
||||
try:
|
||||
embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
|
||||
indices = [f"danswer_chunk_{clean_model_name(embed_model)}"]
|
||||
except ConfigNotFoundError:
|
||||
indices = ["danswer_chunk"]
|
||||
def get_both_index_names(db_session: Session) -> tuple[str, str | None]:
|
||||
model = get_latest_embedding_model_by_status(
|
||||
status=IndexModelStatus.PRESENT, db_session=db_session
|
||||
)
|
||||
curr_index = (
|
||||
"danswer_chunk"
|
||||
if not model
|
||||
else f"danswer_chunk_{clean_model_name(model.model_name)}"
|
||||
)
|
||||
|
||||
try:
|
||||
embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
|
||||
indices.append(f"danswer_chunk_{clean_model_name(embed_model)}")
|
||||
return indices
|
||||
except ConfigNotFoundError:
|
||||
return indices
|
||||
model_new = get_latest_embedding_model_by_status(
|
||||
status=IndexModelStatus.FUTURE, db_session=db_session
|
||||
)
|
||||
if not model_new:
|
||||
return curr_index, None
|
||||
|
||||
return curr_index, f"danswer_chunk_{clean_model_name(model_new.model_name)}"
|
||||
|
||||
|
||||
def translate_boost_count_to_multiplier(boost: int) -> float:
|
||||
|
@ -2,6 +2,14 @@ from danswer.document_index.interfaces import DocumentIndex
|
||||
from danswer.document_index.vespa.index import VespaIndex
|
||||
|
||||
|
||||
def get_default_document_index() -> DocumentIndex:
|
||||
def get_default_document_index(
|
||||
primary_index_name: str,
|
||||
secondary_index_name: str | None,
|
||||
) -> DocumentIndex:
|
||||
"""Primary index is the index that is used for querying/updating etc.
|
||||
Secondary index is for when both the currently used index and the upcoming
|
||||
index both need to be updated, updates are applied to both indices"""
|
||||
# Currently only supporting Vespa
|
||||
return VespaIndex()
|
||||
return VespaIndex(
|
||||
index_name=primary_index_name, secondary_index_name=secondary_index_name
|
||||
)
|
||||
|
@ -4,7 +4,6 @@ from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from danswer.access.models import DocumentAccess
|
||||
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
|
||||
from danswer.indexing.models import DocMetadataAwareIndexChunk
|
||||
from danswer.indexing.models import InferenceChunk
|
||||
from danswer.search.models import IndexFilters
|
||||
@ -46,12 +45,23 @@ class UpdateRequest:
|
||||
|
||||
class Verifiable(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def __init__(self, index_name: str, *args: Any, **kwargs: Any) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
index_name: str,
|
||||
secondary_index_name: str | None,
|
||||
*args: Any,
|
||||
**kwargs: Any
|
||||
) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self.index_name = index_name
|
||||
self.secondary_index_name = secondary_index_name
|
||||
|
||||
@abc.abstractmethod
|
||||
def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None:
|
||||
def ensure_indices_exist(
|
||||
self,
|
||||
index_embedding_dim: int,
|
||||
secondary_index_embedding_dim: int | None,
|
||||
) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@ -60,7 +70,6 @@ class Indexable(abc.ABC):
|
||||
def index(
|
||||
self,
|
||||
chunks: list[DocMetadataAwareIndexChunk],
|
||||
index_name: str,
|
||||
) -> set[DocumentInsertionRecord]:
|
||||
"""Indexes document chunks into the Document Index and return the IDs of all the documents indexed"""
|
||||
raise NotImplementedError
|
||||
@ -68,14 +77,14 @@ class Indexable(abc.ABC):
|
||||
|
||||
class Deletable(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def delete(self, doc_ids: list[str], index_name: str) -> None:
|
||||
def delete(self, doc_ids: list[str]) -> None:
|
||||
"""Removes the specified documents from the Index"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class Updatable(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def update(self, update_requests: list[UpdateRequest], index_name: str) -> None:
|
||||
def update(self, update_requests: list[UpdateRequest]) -> None:
|
||||
"""Updates metadata for the specified documents sets in the Index"""
|
||||
raise NotImplementedError
|
||||
|
||||
@ -87,7 +96,6 @@ class IdRetrievalCapable(abc.ABC):
|
||||
document_id: str,
|
||||
chunk_ind: int | None,
|
||||
filters: IndexFilters,
|
||||
index_name: str,
|
||||
) -> list[InferenceChunk]:
|
||||
raise NotImplementedError
|
||||
|
||||
@ -99,7 +107,6 @@ class KeywordCapable(abc.ABC):
|
||||
query: str,
|
||||
filters: IndexFilters,
|
||||
time_decay_multiplier: float,
|
||||
index_name: str,
|
||||
num_to_retrieve: int,
|
||||
offset: int = 0,
|
||||
) -> list[InferenceChunk]:
|
||||
@ -113,7 +120,6 @@ class VectorCapable(abc.ABC):
|
||||
query: str,
|
||||
filters: IndexFilters,
|
||||
time_decay_multiplier: float,
|
||||
index_name: str,
|
||||
num_to_retrieve: int,
|
||||
offset: int = 0,
|
||||
) -> list[InferenceChunk]:
|
||||
@ -128,7 +134,6 @@ class HybridCapable(abc.ABC):
|
||||
filters: IndexFilters,
|
||||
time_decay_multiplier: float,
|
||||
num_to_retrieve: int,
|
||||
index_name: str,
|
||||
offset: int = 0,
|
||||
hybrid_alpha: float | None = None,
|
||||
) -> list[InferenceChunk]:
|
||||
@ -141,7 +146,6 @@ class AdminCapable(abc.ABC):
|
||||
self,
|
||||
query: str,
|
||||
filters: IndexFilters,
|
||||
index_name: str,
|
||||
num_to_retrieve: int,
|
||||
offset: int = 0,
|
||||
) -> list[InferenceChunk]:
|
||||
|
@ -20,7 +20,6 @@ import requests
|
||||
from retry import retry
|
||||
|
||||
from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION
|
||||
from danswer.configs.app_configs import VESPA_DEPLOYMENT_ZIP
|
||||
from danswer.configs.app_configs import VESPA_HOST
|
||||
from danswer.configs.app_configs import VESPA_PORT
|
||||
from danswer.configs.app_configs import VESPA_TENANT_PORT
|
||||
@ -34,8 +33,6 @@ from danswer.configs.constants import BLURB
|
||||
from danswer.configs.constants import BOOST
|
||||
from danswer.configs.constants import CHUNK_ID
|
||||
from danswer.configs.constants import CONTENT
|
||||
from danswer.configs.constants import CURRENT_EMBEDDING_DIM
|
||||
from danswer.configs.constants import CURRENT_EMBEDDING_MODEL
|
||||
from danswer.configs.constants import DOC_UPDATED_AT
|
||||
from danswer.configs.constants import DOCUMENT_ID
|
||||
from danswer.configs.constants import DOCUMENT_SETS
|
||||
@ -55,21 +52,15 @@ from danswer.configs.constants import SOURCE_TYPE
|
||||
from danswer.configs.constants import TITLE
|
||||
from danswer.configs.constants import TITLE_EMBEDDING
|
||||
from danswer.configs.constants import TITLE_SEPARATOR
|
||||
from danswer.configs.constants import UPCOMING_EMBEDDING_DIM
|
||||
from danswer.configs.constants import UPCOMING_EMBEDDING_MODEL
|
||||
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
|
||||
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
|
||||
from danswer.connectors.cross_connector_utils.miscellaneous_utils import (
|
||||
get_experts_stores_representations,
|
||||
)
|
||||
from danswer.document_index.document_index_utils import clean_model_name
|
||||
from danswer.document_index.document_index_utils import get_uuid_from_chunk
|
||||
from danswer.document_index.interfaces import DocumentIndex
|
||||
from danswer.document_index.interfaces import DocumentInsertionRecord
|
||||
from danswer.document_index.interfaces import UpdateRequest
|
||||
from danswer.document_index.vespa.utils import remove_invalid_unicode_chars
|
||||
from danswer.dynamic_configs import get_dynamic_config_store
|
||||
from danswer.dynamic_configs.interface import ConfigNotFoundError
|
||||
from danswer.indexing.models import DocMetadataAwareIndexChunk
|
||||
from danswer.indexing.models import InferenceChunk
|
||||
from danswer.search.models import IndexFilters
|
||||
@ -621,9 +612,11 @@ def in_memory_zip_from_file_bytes(file_contents: dict[str, bytes]) -> BinaryIO:
|
||||
return zip_buffer
|
||||
|
||||
|
||||
def _create_document_xml_lines(doc_names: list[str]) -> str:
|
||||
def _create_document_xml_lines(doc_names: list[str | None]) -> str:
|
||||
doc_lines = [
|
||||
f'<document type="{doc_name}" mode="index" />' for doc_name in doc_names
|
||||
f'<document type="{doc_name}" mode="index" />'
|
||||
for doc_name in doc_names
|
||||
if doc_name
|
||||
]
|
||||
return "\n".join(doc_lines)
|
||||
|
||||
@ -650,17 +643,15 @@ class VespaIndex(DocumentIndex):
|
||||
f"from {{index_name}} where "
|
||||
)
|
||||
|
||||
def __init__(self, deployment_zip: str = VESPA_DEPLOYMENT_ZIP) -> None:
|
||||
# Vespa index name isn't configurable via code alone because of the config .sd file that needs
|
||||
# to be updated + zipped + deployed, not supporting the option for simplicity
|
||||
self.deployment_zip = deployment_zip
|
||||
def __init__(self, index_name: str, secondary_index_name: str | None) -> None:
|
||||
self.index_name = index_name
|
||||
self.secondary_index_name = secondary_index_name
|
||||
|
||||
def ensure_indices_exist(self, embedding_dim: int = DOC_EMBEDDING_DIM) -> None:
|
||||
"""Verifying indices is more involved as there is no good way to
|
||||
verify the deployed app against the zip locally. But deploying the latest app.zip will ensure that
|
||||
the index is up-to-date with the expected schema and this does not erase the existing index.
|
||||
If the changes cannot be applied without conflict with existing data, it will fail with a non 200
|
||||
"""
|
||||
def ensure_indices_exist(
|
||||
self,
|
||||
index_embedding_dim: int,
|
||||
secondary_index_embedding_dim: int | None,
|
||||
) -> None:
|
||||
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
|
||||
logger.debug(f"Sending Vespa zip to {deploy_url}")
|
||||
|
||||
@ -670,30 +661,12 @@ class VespaIndex(DocumentIndex):
|
||||
schema_file = os.path.join(vespa_schema_path, "schemas", "danswer_chunk.sd")
|
||||
services_file = os.path.join(vespa_schema_path, "services.xml")
|
||||
|
||||
kv_store = get_dynamic_config_store()
|
||||
try:
|
||||
curr_embed_model = cast(str, kv_store.load(CURRENT_EMBEDDING_MODEL))
|
||||
schema_name = f"danswer_chunk_{clean_model_name(curr_embed_model)}"
|
||||
embedding_dim = cast(int, kv_store.load(CURRENT_EMBEDDING_DIM))
|
||||
except ConfigNotFoundError:
|
||||
schema_name = "danswer_chunk"
|
||||
|
||||
doc_names = [schema_name]
|
||||
|
||||
try:
|
||||
upcoming_embed_model = cast(str, kv_store.load(UPCOMING_EMBEDDING_MODEL))
|
||||
upcoming_schema_name = (
|
||||
f"danswer_chunk_{clean_model_name(upcoming_embed_model)}"
|
||||
)
|
||||
upcoming_embedding_dim = cast(int, kv_store.load(UPCOMING_EMBEDDING_DIM))
|
||||
doc_names.append(upcoming_schema_name)
|
||||
except ConfigNotFoundError:
|
||||
upcoming_schema_name = None
|
||||
|
||||
with open(services_file, "r") as services_f:
|
||||
services_template = services_f.read()
|
||||
|
||||
doc_lines = _create_document_xml_lines(doc_names)
|
||||
schema_names = [self.index_name, self.secondary_index_name]
|
||||
|
||||
doc_lines = _create_document_xml_lines(schema_names)
|
||||
services = services_template.replace(DOCUMENT_REPLACEMENT_PAT, doc_lines)
|
||||
|
||||
zip_dict = {
|
||||
@ -704,17 +677,15 @@ class VespaIndex(DocumentIndex):
|
||||
schema_template = schema_f.read()
|
||||
|
||||
schema = schema_template.replace(
|
||||
DANSWER_CHUNK_REPLACEMENT_PAT, schema_name
|
||||
).replace(VESPA_DIM_REPLACEMENT_PAT, str(embedding_dim))
|
||||
zip_dict[f"schemas/{schema_name}.sd"] = schema.encode("utf-8")
|
||||
DANSWER_CHUNK_REPLACEMENT_PAT, self.index_name
|
||||
).replace(VESPA_DIM_REPLACEMENT_PAT, str(index_embedding_dim))
|
||||
zip_dict[f"schemas/{schema_names[0]}.sd"] = schema.encode("utf-8")
|
||||
|
||||
if upcoming_schema_name:
|
||||
if self.secondary_index_name:
|
||||
upcoming_schema = schema_template.replace(
|
||||
DANSWER_CHUNK_REPLACEMENT_PAT, upcoming_schema_name
|
||||
).replace(VESPA_DIM_REPLACEMENT_PAT, str(upcoming_embedding_dim))
|
||||
zip_dict[f"schemas/{upcoming_schema_name}.sd"] = upcoming_schema.encode(
|
||||
"utf-8"
|
||||
)
|
||||
DANSWER_CHUNK_REPLACEMENT_PAT, self.secondary_index_name
|
||||
).replace(VESPA_DIM_REPLACEMENT_PAT, str(secondary_index_embedding_dim))
|
||||
zip_dict[f"schemas/{schema_names[1]}.sd"] = upcoming_schema.encode("utf-8")
|
||||
|
||||
zip_file = in_memory_zip_from_file_bytes(zip_dict)
|
||||
|
||||
@ -728,9 +699,9 @@ class VespaIndex(DocumentIndex):
|
||||
def index(
|
||||
self,
|
||||
chunks: list[DocMetadataAwareIndexChunk],
|
||||
index_name: str,
|
||||
) -> set[DocumentInsertionRecord]:
|
||||
return _clear_and_index_vespa_chunks(chunks=chunks, index_name=index_name)
|
||||
# IMPORTANT: This must be done one index at a time, do not use secondary index here
|
||||
return _clear_and_index_vespa_chunks(chunks=chunks, index_name=self.index_name)
|
||||
|
||||
@staticmethod
|
||||
def _apply_updates_batched(
|
||||
@ -774,7 +745,7 @@ class VespaIndex(DocumentIndex):
|
||||
failure_msg = f"Failed to update document: {future_to_document_id[future]}"
|
||||
raise requests.HTTPError(failure_msg) from e
|
||||
|
||||
def update(self, update_requests: list[UpdateRequest], index_name: str) -> None:
|
||||
def update(self, update_requests: list[UpdateRequest]) -> None:
|
||||
logger.info(f"Updating {len(update_requests)} documents in Vespa")
|
||||
start = time.time()
|
||||
|
||||
@ -802,50 +773,61 @@ class VespaIndex(DocumentIndex):
|
||||
logger.error("Update request received but nothing to update")
|
||||
continue
|
||||
|
||||
for document_id in update_request.document_ids:
|
||||
for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(
|
||||
document_id=document_id, index_name=index_name
|
||||
):
|
||||
processed_updates_requests.append(
|
||||
_VespaUpdateRequest(
|
||||
document_id=document_id,
|
||||
url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}",
|
||||
update_request=update_dict,
|
||||
index_names = [self.index_name]
|
||||
if self.secondary_index_name:
|
||||
index_names.append(self.secondary_index_name)
|
||||
|
||||
for index_name in index_names:
|
||||
for document_id in update_request.document_ids:
|
||||
for doc_chunk_id in _get_vespa_chunk_ids_by_document_id(
|
||||
document_id=document_id, index_name=index_name
|
||||
):
|
||||
processed_updates_requests.append(
|
||||
_VespaUpdateRequest(
|
||||
document_id=document_id,
|
||||
url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}",
|
||||
update_request=update_dict,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
self._apply_updates_batched(processed_updates_requests)
|
||||
logger.info(
|
||||
"Finished updating Vespa documents in %s seconds", time.time() - start
|
||||
)
|
||||
|
||||
def delete(self, doc_ids: list[str], index_name: str) -> None:
|
||||
def delete(self, doc_ids: list[str]) -> None:
|
||||
logger.info(f"Deleting {len(doc_ids)} documents from Vespa")
|
||||
|
||||
# NOTE: using `httpx` here since `requests` doesn't support HTTP2. This is beneficient for
|
||||
# indexing / updates / deletes since we have to make a large volume of requests.
|
||||
with httpx.Client(http2=True) as http_client:
|
||||
_delete_vespa_docs(
|
||||
document_ids=doc_ids, index_name=index_name, http_client=http_client
|
||||
)
|
||||
index_names = [self.index_name]
|
||||
if self.secondary_index_name:
|
||||
index_names.append(self.secondary_index_name)
|
||||
|
||||
for index_name in index_names:
|
||||
_delete_vespa_docs(
|
||||
document_ids=doc_ids, index_name=index_name, http_client=http_client
|
||||
)
|
||||
|
||||
def id_based_retrieval(
|
||||
self,
|
||||
document_id: str,
|
||||
chunk_ind: int | None,
|
||||
filters: IndexFilters,
|
||||
index_name: str,
|
||||
) -> list[InferenceChunk]:
|
||||
if chunk_ind is None:
|
||||
vespa_chunk_ids = _get_vespa_chunk_ids_by_document_id(
|
||||
document_id=document_id, index_name=index_name, index_filters=filters
|
||||
document_id=document_id,
|
||||
index_name=self.index_name,
|
||||
index_filters=filters,
|
||||
)
|
||||
|
||||
if not vespa_chunk_ids:
|
||||
return []
|
||||
|
||||
functions_with_args: list[tuple[Callable, tuple]] = [
|
||||
(_inference_chunk_by_vespa_id, (vespa_chunk_id, index_name))
|
||||
(_inference_chunk_by_vespa_id, (vespa_chunk_id, self.index_name))
|
||||
for vespa_chunk_id in vespa_chunk_ids
|
||||
]
|
||||
|
||||
@ -861,7 +843,7 @@ class VespaIndex(DocumentIndex):
|
||||
else:
|
||||
filters_str = _build_vespa_filters(filters=filters, include_hidden=True)
|
||||
yql = (
|
||||
VespaIndex.yql_base.format(index_name=index_name)
|
||||
VespaIndex.yql_base.format(index_name=self.index_name)
|
||||
+ filters_str
|
||||
+ f"({DOCUMENT_ID} contains '{document_id}' and {CHUNK_ID} contains '{chunk_ind}')"
|
||||
)
|
||||
@ -872,7 +854,6 @@ class VespaIndex(DocumentIndex):
|
||||
query: str,
|
||||
filters: IndexFilters,
|
||||
time_decay_multiplier: float,
|
||||
index_name: str,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
offset: int = 0,
|
||||
edit_keyword_query: bool = EDIT_KEYWORD_QUERY,
|
||||
@ -880,7 +861,7 @@ class VespaIndex(DocumentIndex):
|
||||
# IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY
|
||||
vespa_where_clauses = _build_vespa_filters(filters)
|
||||
yql = (
|
||||
VespaIndex.yql_base.format(index_name=index_name)
|
||||
VespaIndex.yql_base.format(index_name=self.index_name)
|
||||
+ vespa_where_clauses
|
||||
# `({defaultIndex: "content_summary"}userInput(@query))` section is
|
||||
# needed for highlighting while the N-gram highlighting is broken /
|
||||
@ -908,7 +889,6 @@ class VespaIndex(DocumentIndex):
|
||||
query: str,
|
||||
filters: IndexFilters,
|
||||
time_decay_multiplier: float,
|
||||
index_name: str,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
offset: int = 0,
|
||||
distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF,
|
||||
@ -917,7 +897,7 @@ class VespaIndex(DocumentIndex):
|
||||
# IMPORTANT: THIS FUNCTION IS NOT UP TO DATE, DOES NOT WORK CORRECTLY
|
||||
vespa_where_clauses = _build_vespa_filters(filters)
|
||||
yql = (
|
||||
VespaIndex.yql_base.format(index_name=index_name)
|
||||
VespaIndex.yql_base.format(index_name=self.index_name)
|
||||
+ vespa_where_clauses
|
||||
+ f"(({{targetHits: {10 * num_to_retrieve}}}nearestNeighbor(embeddings, query_embedding)) "
|
||||
# `({defaultIndex: "content_summary"}userInput(@query))` section is
|
||||
@ -953,7 +933,6 @@ class VespaIndex(DocumentIndex):
|
||||
filters: IndexFilters,
|
||||
time_decay_multiplier: float,
|
||||
num_to_retrieve: int,
|
||||
index_name: str,
|
||||
offset: int = 0,
|
||||
hybrid_alpha: float | None = HYBRID_ALPHA,
|
||||
title_content_ratio: float | None = TITLE_CONTENT_RATIO,
|
||||
@ -964,7 +943,7 @@ class VespaIndex(DocumentIndex):
|
||||
# Needs to be at least as much as the value set in Vespa schema config
|
||||
target_hits = max(10 * num_to_retrieve, 1000)
|
||||
yql = (
|
||||
VespaIndex.yql_base.format(index_name=index_name)
|
||||
VespaIndex.yql_base.format(index_name=self.index_name)
|
||||
+ vespa_where_clauses
|
||||
+ f"(({{targetHits: {target_hits}}}nearestNeighbor(embeddings, query_embedding)) "
|
||||
+ f"or ({{targetHits: {target_hits}}}nearestNeighbor(title_embedding, query_embedding)) "
|
||||
@ -1003,13 +982,12 @@ class VespaIndex(DocumentIndex):
|
||||
self,
|
||||
query: str,
|
||||
filters: IndexFilters,
|
||||
index_name: str,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
offset: int = 0,
|
||||
) -> list[InferenceChunk]:
|
||||
vespa_where_clauses = _build_vespa_filters(filters, include_hidden=True)
|
||||
yql = (
|
||||
VespaIndex.yql_base.format(index_name=index_name)
|
||||
VespaIndex.yql_base.format(index_name=self.index_name)
|
||||
+ vespa_where_clauses
|
||||
+ '({grammar: "weakAnd"}userInput(@query) '
|
||||
# `({defaultIndex: "content_summary"}userInput(@query))` section is
|
||||
|
@ -19,6 +19,7 @@ from danswer.db.document_set import fetch_document_sets_for_documents
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.db.tag import create_or_add_document_tag
|
||||
from danswer.db.tag import create_or_add_document_tag_list
|
||||
from danswer.document_index.document_index_utils import get_index_name
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.document_index.interfaces import DocumentIndex
|
||||
from danswer.document_index.interfaces import DocumentMetadata
|
||||
@ -96,7 +97,6 @@ def index_doc_batch(
|
||||
chunker: Chunker,
|
||||
embedder: Embedder,
|
||||
document_index: DocumentIndex,
|
||||
index_name: str,
|
||||
documents: list[Document],
|
||||
index_attempt_metadata: IndexAttemptMetadata,
|
||||
ignore_time_skip: bool = False,
|
||||
@ -188,9 +188,7 @@ def index_doc_batch(
|
||||
# A document will not be spread across different batches, so all the
|
||||
# documents with chunks in this set, are fully represented by the chunks
|
||||
# in this set
|
||||
insertion_records = document_index.index(
|
||||
chunks=access_aware_chunks, index_name=index_name
|
||||
)
|
||||
insertion_records = document_index.index(chunks=access_aware_chunks)
|
||||
|
||||
successful_doc_ids = [record.document_id for record in insertion_records]
|
||||
successful_docs = [
|
||||
@ -218,7 +216,6 @@ def build_indexing_pipeline(
|
||||
chunker: Chunker | None = None,
|
||||
embedder: Embedder | None = None,
|
||||
document_index: DocumentIndex | None = None,
|
||||
index_name: str,
|
||||
ignore_time_skip: bool = False,
|
||||
) -> IndexingPipelineProtocol:
|
||||
"""Builds a pipline which takes in a list (batch) of docs and indexes them."""
|
||||
@ -226,13 +223,16 @@ def build_indexing_pipeline(
|
||||
|
||||
embedder = embedder or DefaultEmbedder()
|
||||
|
||||
document_index = document_index or get_default_document_index()
|
||||
if not document_index:
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=get_index_name(db_session), secondary_index_name=None
|
||||
)
|
||||
|
||||
return partial(
|
||||
index_doc_batch,
|
||||
chunker=chunker,
|
||||
embedder=embedder,
|
||||
document_index=document_index,
|
||||
index_name=index_name,
|
||||
ignore_time_skip=ignore_time_skip,
|
||||
)
|
||||
|
@ -2,6 +2,8 @@ from dataclasses import dataclass
|
||||
from dataclasses import fields
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from danswer.access.models import DocumentAccess
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.models import Document
|
||||
@ -120,3 +122,11 @@ class InferenceChunk(BaseChunk):
|
||||
break
|
||||
short_blurb += " " + word
|
||||
return f"Inference Chunk: {self.document_id} - {short_blurb}..."
|
||||
|
||||
|
||||
class EmbeddingModelDetail(BaseModel):
|
||||
model_name: str
|
||||
model_dim: int
|
||||
normalize: bool
|
||||
query_prefix: str | None
|
||||
passage_prefix: str | None
|
||||
|
@ -35,6 +35,7 @@ from danswer.configs.chat_configs import MULTILINGUAL_QUERY_EXPANSION
|
||||
from danswer.configs.constants import AuthType
|
||||
from danswer.configs.model_configs import ASYM_PASSAGE_PREFIX
|
||||
from danswer.configs.model_configs import ASYM_QUERY_PREFIX
|
||||
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
|
||||
from danswer.configs.model_configs import DOCUMENT_ENCODER_MODEL
|
||||
from danswer.configs.model_configs import ENABLE_RERANKING_REAL_TIME_FLOW
|
||||
from danswer.configs.model_configs import FAST_GEN_AI_MODEL_VERSION
|
||||
@ -44,7 +45,10 @@ from danswer.configs.model_configs import GEN_AI_MODEL_VERSION
|
||||
from danswer.db.connector import create_initial_default_connector
|
||||
from danswer.db.connector_credential_pair import associate_default_cc_pair
|
||||
from danswer.db.credentials import create_initial_public_credential
|
||||
from danswer.db.embedding_model import get_latest_embedding_model_by_status
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.db.models import IndexModelStatus
|
||||
from danswer.document_index.document_index_utils import clean_model_name
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.llm.factory import get_default_llm
|
||||
from danswer.search.search_nlp_models import warm_up_models
|
||||
@ -278,7 +282,34 @@ def get_application() -> FastAPI:
|
||||
load_chat_yamls()
|
||||
|
||||
logger.info("Verifying Document Index(s) is/are available.")
|
||||
get_default_document_index().ensure_indices_exist()
|
||||
primary_embedding_model = get_latest_embedding_model_by_status(
|
||||
status=IndexModelStatus.PRESENT, db_session=db_session
|
||||
)
|
||||
secondary_embedding_model = get_latest_embedding_model_by_status(
|
||||
status=IndexModelStatus.FUTURE, db_session=db_session
|
||||
)
|
||||
primary_index = (
|
||||
f"danswer_chunk_{clean_model_name(primary_embedding_model.model_name)}"
|
||||
if primary_embedding_model
|
||||
else "danswer_chunk"
|
||||
)
|
||||
second_index = (
|
||||
f"danswer_chunk_{clean_model_name(secondary_embedding_model.model_name)}"
|
||||
if secondary_embedding_model
|
||||
else None
|
||||
)
|
||||
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=primary_index, secondary_index_name=second_index
|
||||
)
|
||||
document_index.ensure_indices_exist(
|
||||
index_embedding_dim=primary_embedding_model.model_dim
|
||||
if primary_embedding_model
|
||||
else DOC_EMBEDDING_DIM,
|
||||
secondary_index_embedding_dim=secondary_embedding_model.model_dim
|
||||
if secondary_embedding_model
|
||||
else None,
|
||||
)
|
||||
|
||||
optional_telemetry(
|
||||
record_type=RecordType.VERSION, data={"version": __version__}
|
||||
|
@ -22,6 +22,7 @@ from danswer.db.chat import get_persona_by_id
|
||||
from danswer.db.chat import get_prompt_by_id
|
||||
from danswer.db.chat import translate_db_message_to_chat_message_detail
|
||||
from danswer.db.models import User
|
||||
from danswer.document_index.document_index_utils import get_index_name
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.indexing.models import InferenceChunk
|
||||
from danswer.llm.utils import get_default_llm_token_encode
|
||||
@ -89,7 +90,10 @@ def stream_answer_objects(
|
||||
)
|
||||
|
||||
llm_tokenizer = get_default_llm_token_encode()
|
||||
document_index = get_default_document_index()
|
||||
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=get_index_name(db_session), secondary_index_name=None
|
||||
)
|
||||
|
||||
# Create a chat session which will just store the root message, the query, and the AI response
|
||||
root_message = get_or_create_root_message(
|
||||
|
@ -17,7 +17,6 @@ from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MAX
|
||||
from danswer.configs.model_configs import CROSS_ENCODER_RANGE_MIN
|
||||
from danswer.configs.model_configs import SIM_SCORE_RANGE_HIGH
|
||||
from danswer.configs.model_configs import SIM_SCORE_RANGE_LOW
|
||||
from danswer.document_index.document_index_utils import get_index_name
|
||||
from danswer.document_index.document_index_utils import (
|
||||
translate_boost_count_to_multiplier,
|
||||
)
|
||||
@ -143,14 +142,12 @@ def doc_index_retrieval(
|
||||
document_index: DocumentIndex,
|
||||
hybrid_alpha: float = HYBRID_ALPHA,
|
||||
) -> list[InferenceChunk]:
|
||||
index = get_index_name()
|
||||
if query.search_type == SearchType.KEYWORD:
|
||||
top_chunks = document_index.keyword_retrieval(
|
||||
query=query.query,
|
||||
filters=query.filters,
|
||||
time_decay_multiplier=query.recency_bias_multiplier,
|
||||
num_to_retrieve=query.num_hits,
|
||||
index_name=index,
|
||||
)
|
||||
|
||||
elif query.search_type == SearchType.SEMANTIC:
|
||||
@ -159,7 +156,6 @@ def doc_index_retrieval(
|
||||
filters=query.filters,
|
||||
time_decay_multiplier=query.recency_bias_multiplier,
|
||||
num_to_retrieve=query.num_hits,
|
||||
index_name=index,
|
||||
)
|
||||
|
||||
elif query.search_type == SearchType.HYBRID:
|
||||
@ -170,7 +166,6 @@ def doc_index_retrieval(
|
||||
num_to_retrieve=query.num_hits,
|
||||
offset=query.offset,
|
||||
hybrid_alpha=hybrid_alpha,
|
||||
index_name=index,
|
||||
)
|
||||
|
||||
else:
|
||||
|
@ -16,6 +16,7 @@ from danswer.db.connector_credential_pair import get_connector_credential_pair
|
||||
from danswer.db.credentials import fetch_credential_by_id
|
||||
from danswer.db.engine import get_session
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.dynamic_configs import get_dynamic_config_store
|
||||
from danswer.dynamic_configs.interface import ConfigNotFoundError
|
||||
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
|
||||
@ -142,10 +143,14 @@ def document_ingestion(
|
||||
if document.source == DocumentSource.INGESTION_API:
|
||||
document.source = DocumentSource.FILE
|
||||
|
||||
index_names = get_both_index_names()
|
||||
# Need to index for both the primary and secondary index if possible
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
curr_doc_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=None
|
||||
)
|
||||
|
||||
indexing_pipeline = build_indexing_pipeline(
|
||||
ignore_time_skip=True, index_name=index_names[0]
|
||||
ignore_time_skip=True, document_index=curr_doc_index
|
||||
)
|
||||
|
||||
new_doc, chunks = indexing_pipeline(
|
||||
@ -157,8 +162,16 @@ def document_ingestion(
|
||||
)
|
||||
|
||||
# If there's a secondary index being built, index the doc but don't use it for return here
|
||||
if len(index_names) > 1:
|
||||
indexing_pipeline(
|
||||
if sec_ind_name:
|
||||
sec_doc_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=None
|
||||
)
|
||||
|
||||
sec_ind_pipeline = build_indexing_pipeline(
|
||||
ignore_time_skip=True, document_index=sec_doc_index
|
||||
)
|
||||
|
||||
sec_ind_pipeline(
|
||||
documents=[document],
|
||||
index_attempt_metadata=IndexAttemptMetadata(
|
||||
connector_id=connector_id,
|
||||
|
@ -27,7 +27,9 @@ def get_document_info(
|
||||
user: User | None = Depends(current_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> DocumentInfo:
|
||||
document_index = get_default_document_index()
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=get_index_name(db_session), secondary_index_name=None
|
||||
)
|
||||
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
filters = IndexFilters(access_control_list=user_acl_filters)
|
||||
@ -36,7 +38,6 @@ def get_document_info(
|
||||
document_id=document_id,
|
||||
chunk_ind=None,
|
||||
filters=filters,
|
||||
index_name=get_index_name(),
|
||||
)
|
||||
|
||||
if not inference_chunks:
|
||||
@ -60,7 +61,9 @@ def get_chunk_info(
|
||||
user: User | None = Depends(current_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> ChunkInfo:
|
||||
document_index = get_default_document_index()
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=get_index_name(db_session), secondary_index_name=None
|
||||
)
|
||||
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
filters = IndexFilters(access_control_list=user_acl_filters)
|
||||
@ -69,7 +72,6 @@ def get_chunk_info(
|
||||
document_id=document_id,
|
||||
chunk_ind=chunk_id,
|
||||
filters=filters,
|
||||
index_name=get_index_name(),
|
||||
)
|
||||
|
||||
if not inference_chunks:
|
||||
|
@ -7,6 +7,7 @@ from pydantic import BaseModel
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.db.engine import get_session
|
||||
from danswer.document_index.document_index_utils import get_index_name
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.search.access_filters import build_access_filters_for_user
|
||||
from danswer.search.models import IndexFilters
|
||||
@ -83,9 +84,13 @@ def gpt_search(
|
||||
skip_llm_chunk_filter=True,
|
||||
)
|
||||
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=get_index_name(db_session), secondary_index_name=None
|
||||
)
|
||||
|
||||
top_chunks, __ = full_chunk_search(
|
||||
query=search_query,
|
||||
document_index=get_default_document_index(),
|
||||
document_index=document_index,
|
||||
)
|
||||
|
||||
return GptSearchResponse(
|
||||
|
@ -18,6 +18,7 @@ from danswer.db.feedback import fetch_docs_ranked_by_boost
|
||||
from danswer.db.feedback import update_document_boost
|
||||
from danswer.db.feedback import update_document_hidden
|
||||
from danswer.db.models import User
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.dynamic_configs import get_dynamic_config_store
|
||||
from danswer.dynamic_configs.interface import ConfigNotFoundError
|
||||
@ -68,12 +69,17 @@ def document_boost_update(
|
||||
_: User | None = Depends(current_admin_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> None:
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
)
|
||||
|
||||
try:
|
||||
update_document_boost(
|
||||
db_session=db_session,
|
||||
document_id=boost_update.document_id,
|
||||
boost=boost_update.boost,
|
||||
document_index=get_default_document_index(),
|
||||
document_index=document_index,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
@ -85,12 +91,17 @@ def document_hidden_update(
|
||||
_: User | None = Depends(current_admin_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> None:
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
)
|
||||
|
||||
try:
|
||||
update_document_hidden(
|
||||
db_session=db_session,
|
||||
document_id=hidden_update.document_id,
|
||||
hidden=hidden_update.hidden,
|
||||
document_index=get_default_document_index(),
|
||||
document_index=document_index,
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
@ -103,3 +103,7 @@ class SlackBotConfig(BaseModel):
|
||||
id: int
|
||||
persona: PersonaSnapshot | None
|
||||
channel_config: ChannelConfig
|
||||
|
||||
|
||||
class ModelVersionResponse(BaseModel):
|
||||
model_name: str | None # None only applicable to secondary index
|
||||
|
33
backend/danswer/server/manage/secondary_index.py
Normal file
33
backend/danswer/server/manage/secondary_index.py
Normal file
@ -0,0 +1,33 @@
|
||||
from fastapi import APIRouter
|
||||
from fastapi import Depends
|
||||
|
||||
from danswer.auth.users import current_admin_user
|
||||
from danswer.db.models import User
|
||||
from danswer.indexing.models import EmbeddingModelDetail
|
||||
from danswer.server.manage.models import ModelVersionResponse
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
router = APIRouter(prefix="/secondary-index")
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
@router.post("/set-new-embedding-model")
|
||||
def set_new_embedding_model(
|
||||
embed_model_details: EmbeddingModelDetail,
|
||||
_: User | None = Depends(current_admin_user),
|
||||
) -> None:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@router.get("/get-current-embedding-model")
|
||||
def get_current_embedding_model(
|
||||
_: User | None = Depends(current_admin_user),
|
||||
) -> ModelVersionResponse:
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
@router.get("/get-secondary-embedding-model")
|
||||
def get_secondary_embedding_model(
|
||||
_: User | None = Depends(current_admin_user),
|
||||
) -> ModelVersionResponse:
|
||||
raise NotImplementedError()
|
@ -20,6 +20,7 @@ from danswer.db.engine import get_session
|
||||
from danswer.db.feedback import create_chat_message_feedback
|
||||
from danswer.db.feedback import create_doc_retrieval_feedback
|
||||
from danswer.db.models import User
|
||||
from danswer.document_index.document_index_utils import get_both_index_names
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.secondary_llm_flows.chat_session_naming import (
|
||||
get_renamed_conversation_name,
|
||||
@ -228,12 +229,18 @@ def create_search_feedback(
|
||||
"""This endpoint isn't protected - it does not check if the user has access to the document
|
||||
Users could try changing boosts of arbitrary docs but this does not leak any data.
|
||||
"""
|
||||
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
)
|
||||
|
||||
create_doc_retrieval_feedback(
|
||||
message_id=feedback.message_id,
|
||||
document_id=feedback.document_id,
|
||||
document_rank=feedback.document_rank,
|
||||
clicked=feedback.click,
|
||||
feedback=feedback.search_feedback,
|
||||
document_index=get_default_document_index(),
|
||||
document_index=document_index,
|
||||
db_session=db_session,
|
||||
)
|
||||
|
@ -53,16 +53,18 @@ def admin_search(
|
||||
time_cutoff=question.filters.time_cutoff,
|
||||
access_control_list=user_acl_filters,
|
||||
)
|
||||
document_index = get_default_document_index()
|
||||
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=get_index_name(db_session), secondary_index_name=None
|
||||
)
|
||||
|
||||
if not isinstance(document_index, VespaIndex):
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Cannot use admin-search when using a non-Vespa document index",
|
||||
)
|
||||
|
||||
matching_chunks = document_index.admin_retrieval(
|
||||
query=query, filters=final_filters, index_name=get_index_name()
|
||||
)
|
||||
matching_chunks = document_index.admin_retrieval(query=query, filters=final_filters)
|
||||
|
||||
documents = chunks_to_search_docs(matching_chunks)
|
||||
|
||||
|
@ -32,10 +32,6 @@ def set_acl_for_vespa(should_check_if_already_done: bool = False) -> None:
|
||||
except ConfigNotFoundError:
|
||||
pass
|
||||
|
||||
vespa_index = get_default_document_index()
|
||||
if not isinstance(vespa_index, VespaIndex):
|
||||
raise ValueError("This script is only for Vespa indexes")
|
||||
|
||||
logger.info("Populating Access Control List fields in Vespa")
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
# for all documents, set the `access_control_list` field appropriately
|
||||
@ -46,15 +42,21 @@ def set_acl_for_vespa(should_check_if_already_done: bool = False) -> None:
|
||||
document_ids=[document.id for document in documents],
|
||||
)
|
||||
|
||||
for index_name in get_both_index_names():
|
||||
update_requests = [
|
||||
UpdateRequest(
|
||||
document_ids=[document_id],
|
||||
access=DocumentAccess.build(user_ids, is_public),
|
||||
)
|
||||
for document_id, user_ids, is_public in document_access_info
|
||||
]
|
||||
vespa_index.update(update_requests=update_requests, index_name=index_name)
|
||||
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
|
||||
vespa_index = get_default_document_index(
|
||||
primary_index_name=curr_ind_name, secondary_index_name=sec_ind_name
|
||||
)
|
||||
if not isinstance(vespa_index, VespaIndex):
|
||||
raise ValueError("This script is only for Vespa indexes")
|
||||
|
||||
update_requests = [
|
||||
UpdateRequest(
|
||||
document_ids=[document_id],
|
||||
access=DocumentAccess.build(user_ids, is_public),
|
||||
)
|
||||
for document_id, user_ids, is_public in document_access_info
|
||||
]
|
||||
vespa_index.update(update_requests=update_requests)
|
||||
|
||||
dynamic_config_store.store(_COMPLETED_ACL_UPDATE_KEY, True)
|
||||
|
||||
|
@ -5,8 +5,11 @@ from contextlib import contextmanager
|
||||
from typing import Any
|
||||
from typing import TextIO
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.chat.chat_utils import get_chunks_for_qa
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.document_index.document_index_utils import get_index_name
|
||||
from danswer.document_index.factory import get_default_document_index
|
||||
from danswer.indexing.models import InferenceChunk
|
||||
from danswer.search.models import IndexFilters
|
||||
@ -93,9 +96,16 @@ def get_search_results(
|
||||
retrieval_metrics = MetricsHander[RetrievalMetricsContainer]()
|
||||
rerank_metrics = MetricsHander[RerankMetricsContainer]()
|
||||
|
||||
with Session(get_sqlalchemy_engine()) as db_session:
|
||||
ind_name = get_index_name(db_session)
|
||||
|
||||
document_index = get_default_document_index(
|
||||
primary_index_name=ind_name, secondary_index_name=None
|
||||
)
|
||||
|
||||
top_chunks, llm_chunk_selection = full_chunk_search(
|
||||
query=search_query,
|
||||
document_index=get_default_document_index(),
|
||||
document_index=document_index,
|
||||
retrieval_metrics_callback=retrieval_metrics.record_metric,
|
||||
rerank_metrics_callback=rerank_metrics.record_metric,
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user