diff --git a/backend/.gitignore b/backend/.gitignore index 38ac9f33e7..ead9ceb200 100644 --- a/backend/.gitignore +++ b/backend/.gitignore @@ -4,8 +4,6 @@ site_crawls/ .ipynb_checkpoints/ api_keys.py *ipynb -qdrant-data/ -typesense-data/ .env vespa-app.zip dynamic_config_storage/ diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 1837a7ad17..5391586a99 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -196,7 +196,7 @@ def _run_indexing( ) -> None: """ 1. Get documents which are either new or updated from specified application - 2. Embed and index these documents into the chosen datastores (e.g. Qdrant / Typesense or Vespa) + 2. Embed and index these documents into the chosen datastore (vespa) 3. Updates Postgres to record the indexed documents + the outcome of this run """ diff --git a/backend/danswer/bots/slack/blocks.py b/backend/danswer/bots/slack/blocks.py index ea31461edc..16f5c8c346 100644 --- a/backend/danswer/bots/slack/blocks.py +++ b/backend/danswer/bots/slack/blocks.py @@ -1,3 +1,5 @@ +from datetime import datetime + from slack_sdk.models.blocks import ActionsBlock from slack_sdk.models.blocks import Block from slack_sdk.models.blocks import ButtonElement @@ -180,10 +182,24 @@ def build_qa_response_blocks( query_event_id: int, answer: str | None, quotes: list[DanswerQuote] | None, + time_cutoff: datetime | None, + favor_recent: bool, ) -> list[Block]: quotes_blocks: list[Block] = [] ai_answer_header = HeaderBlock(text="AI Answer") + filter_block: Block | None = None + if time_cutoff or favor_recent: + filter_text = "Filters: " + if time_cutoff is not None: + time_str = time_cutoff.strftime("%b %d, %Y") + filter_text += f"`Docs Updated >= {time_str}` " + if favor_recent: + if time_cutoff is not None: + filter_text += "+ " + filter_text += "`Prioritize Recently Updated Docs`" + + filter_block = SectionBlock(text=f"_{filter_text}_") if not answer: answer_block = SectionBlock( @@ -203,12 +219,14 @@ def build_qa_response_blocks( ] feedback_block = build_qa_feedback_block(query_event_id=query_event_id) - return ( - [ - ai_answer_header, - answer_block, - feedback_block, - ] - + quotes_blocks - + [DividerBlock()] + + response_blocks: list[Block] = [ai_answer_header] + + if filter_block is not None: + response_blocks.append(filter_block) + + response_blocks.extend( + [answer_block, feedback_block] + quotes_blocks + [DividerBlock()] ) + + return response_blocks diff --git a/backend/danswer/bots/slack/handlers/handle_message.py b/backend/danswer/bots/slack/handlers/handle_message.py index 7d7e4e524a..fcbf098a89 100644 --- a/backend/danswer/bots/slack/handlers/handle_message.py +++ b/backend/danswer/bots/slack/handlers/handle_message.py @@ -15,12 +15,12 @@ from danswer.bots.slack.utils import ChannelIdAdapter from danswer.bots.slack.utils import fetch_userids_from_emails from danswer.bots.slack.utils import respond_in_thread from danswer.configs.app_configs import DOCUMENT_INDEX_NAME -from danswer.configs.constants import DOCUMENT_SETS from danswer.configs.danswerbot_configs import DANSWER_BOT_ANSWER_GENERATION_TIMEOUT from danswer.configs.danswerbot_configs import DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER from danswer.configs.danswerbot_configs import DANSWER_BOT_DISPLAY_ERROR_MSGS from danswer.configs.danswerbot_configs import DANSWER_BOT_NUM_RETRIES from danswer.configs.danswerbot_configs import DANSWER_REACT_EMOJI +from danswer.configs.danswerbot_configs import DISABLE_DANSWER_BOT_FILTER_DETECT from danswer.configs.danswerbot_configs import ENABLE_DANSWERBOT_REFLEXION from danswer.connectors.slack.utils import make_slack_api_rate_limited from danswer.db.engine import get_sqlalchemy_engine @@ -28,6 +28,7 @@ from danswer.db.models import SlackBotConfig from danswer.direct_qa.answer_question import answer_qa_query from danswer.server.models import QAResponse from danswer.server.models import QuestionRequest +from danswer.server.models import RequestFilters from danswer.utils.logger import setup_logger logger_base = setup_logger() @@ -72,6 +73,7 @@ def handle_message( answer_generation_timeout: int = DANSWER_BOT_ANSWER_GENERATION_TIMEOUT, should_respond_with_error_msgs: bool = DANSWER_BOT_DISPLAY_ERROR_MSGS, disable_docs_only_answer: bool = DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER, + disable_auto_detect_filters: bool = DISABLE_DANSWER_BOT_FILTER_DETECT, reflexion: bool = ENABLE_DANSWERBOT_REFLEXION, ) -> bool: """Potentially respond to the user message depending on filters and if an answer was generated @@ -178,15 +180,23 @@ def handle_message( answer_failed = False try: + # By leaving time_cutoff and favor_recent as None, and setting enable_auto_detect_filters + # it allows the slack flow to extract out filters from the user query + filters = RequestFilters( + source_type=None, + document_set=document_set_names, + time_cutoff=None, + ) + # This includes throwing out answer via reflexion answer = _get_answer( QuestionRequest( query=msg, collection=DOCUMENT_INDEX_NAME, use_keyword=False, # always use semantic search when handling Slack messages - filters=[{DOCUMENT_SETS: document_set_names}] - if document_set_names - else None, + enable_auto_detect_filters=not disable_auto_detect_filters, + filters=filters, + favor_recent=None, offset=None, ) ) @@ -251,6 +261,8 @@ def handle_message( query_event_id=answer.query_event_id, answer=answer.answer, quotes=answer.quotes, + time_cutoff=answer.time_cutoff, + favor_recent=answer.favor_recent, ) document_blocks = build_documents_blocks( diff --git a/backend/danswer/chat/chat_llm.py b/backend/danswer/chat/chat_llm.py index bc8497d20b..bea0b1e94f 100644 --- a/backend/danswer/chat/chat_llm.py +++ b/backend/danswer/chat/chat_llm.py @@ -1,12 +1,12 @@ import re from collections.abc import Callable from collections.abc import Iterator -from uuid import UUID from langchain.schema.messages import AIMessage from langchain.schema.messages import BaseMessage from langchain.schema.messages import HumanMessage from langchain.schema.messages import SystemMessage +from sqlalchemy.orm import Session from danswer.chat.chat_prompts import build_combined_query from danswer.chat.chat_prompts import DANSWER_TOOL_NAME @@ -28,6 +28,7 @@ from danswer.configs.model_configs import GEN_AI_MAX_INPUT_TOKENS from danswer.datastores.document_index import get_default_document_index from danswer.db.models import ChatMessage from danswer.db.models import Persona +from danswer.db.models import User from danswer.direct_qa.interfaces import DanswerAnswerPiece from danswer.direct_qa.interfaces import DanswerChatModelOut from danswer.direct_qa.qa_utils import get_usable_chunks @@ -35,6 +36,7 @@ from danswer.llm.build import get_default_llm from danswer.llm.llm import LLM from danswer.llm.utils import get_default_llm_tokenizer from danswer.llm.utils import translate_danswer_msg_to_langchain +from danswer.search.access_filters import build_user_only_filters from danswer.search.semantic_search import chunks_to_search_docs from danswer.search.semantic_search import retrieve_ranked_documents from danswer.server.models import RetrievalDocs @@ -118,7 +120,8 @@ def danswer_chat_retrieval( query_message: ChatMessage, history: list[ChatMessage], llm: LLM, - user_id: UUID | None, + user: User | None, + db_session: Session, ) -> list[InferenceChunk]: if history: query_combination_msgs = build_combined_query(query_message, history) @@ -128,9 +131,9 @@ def danswer_chat_retrieval( # Good Debug/Breakpoint ranked_chunks, unranked_chunks = retrieve_ranked_documents( - reworded_query, - user_id=user_id, - filters=None, + query=reworded_query, + filters=build_user_only_filters(user, db_session), + favor_recent=False, datastore=get_default_document_index(), ) if not ranked_chunks: @@ -277,8 +280,9 @@ def extract_citations_from_stream( def llm_contextual_chat_answer( messages: list[ChatMessage], persona: Persona, - user_id: UUID | None, + user: User | None, tokenizer: Callable, + db_session: Session, run_search_system_text: str = REQUIRE_DANSWER_SYSTEM_MSG, ) -> Iterator[str | list[InferenceChunk]]: last_message = messages[-1] @@ -322,7 +326,8 @@ def llm_contextual_chat_answer( query_message=last_message, history=previous_messages, llm=llm, - user_id=user_id, + user=user, + db_session=db_session, ) yield retrieved_chunks tool_result_str = format_danswer_chunks_for_chat(retrieved_chunks) @@ -369,8 +374,9 @@ def llm_contextual_chat_answer( def llm_tools_enabled_chat_answer( messages: list[ChatMessage], persona: Persona, - user_id: UUID | None, + user: User | None, tokenizer: Callable, + db_session: Session, ) -> Iterator[str | list[InferenceChunk]]: retrieval_enabled = persona.retrieval_enabled system_text = build_system_text_from_persona(persona) @@ -447,12 +453,13 @@ def llm_tools_enabled_chat_answer( query_message=last_message, history=previous_messages, llm=llm, - user_id=user_id, + user=user, + db_session=db_session, ) yield retrieved_chunks tool_result_str = format_danswer_chunks_for_chat(retrieved_chunks) else: - tool_result_str = call_tool(final_result, user_id=user_id) + tool_result_str = call_tool(final_result) # The AI's tool calling message tool_call_msg_text = final_result.model_raw @@ -508,8 +515,9 @@ def wrap_chat_package_in_model( def llm_chat_answer( messages: list[ChatMessage], persona: Persona | None, - user_id: UUID | None, tokenizer: Callable, + user: User | None, + db_session: Session, ) -> Iterator[DanswerAnswerPiece | RetrievalDocs]: # Common error cases to keep in mind: # - User asks question about something long ago, due to context limit, the message is dropped @@ -535,13 +543,21 @@ def llm_chat_answer( # Doesn't require tool calling output format (all LLM outputs are therefore valid) elif persona.retrieval_enabled and not persona.tools and not FORCE_TOOL_PROMPT: for package in llm_contextual_chat_answer( - messages=messages, persona=persona, user_id=user_id, tokenizer=tokenizer + messages=messages, + persona=persona, + tokenizer=tokenizer, + user=user, + db_session=db_session, ): yield wrap_chat_package_in_model(package) # Use most flexible/complex prompt format else: for package in llm_tools_enabled_chat_answer( - messages=messages, persona=persona, user_id=user_id, tokenizer=tokenizer + messages=messages, + persona=persona, + tokenizer=tokenizer, + user=user, + db_session=db_session, ): yield wrap_chat_package_in_model(package) diff --git a/backend/danswer/chat/tools.py b/backend/danswer/chat/tools.py index 0928b5a500..ecd3b3a3e1 100644 --- a/backend/danswer/chat/tools.py +++ b/backend/danswer/chat/tools.py @@ -1,10 +1,7 @@ -from uuid import UUID - from danswer.direct_qa.interfaces import DanswerChatModelOut def call_tool( model_actions: DanswerChatModelOut, - user_id: UUID | None, ) -> str: raise NotImplementedError("There are no additional tool integrations right now") diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 7cf1d846e1..97824541bb 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -92,17 +92,6 @@ VESPA_TENANT_PORT = os.environ.get("VESPA_TENANT_PORT") or "19071" VESPA_DEPLOYMENT_ZIP = ( os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip" ) -# Qdrant is Semantic Search Vector DB -# Url / Key are used to connect to a remote Qdrant instance -QDRANT_URL = os.environ.get("QDRANT_URL", "") -QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", "") -# Host / Port are used for connecting to local Qdrant instance -QDRANT_HOST = os.environ.get("QDRANT_HOST") or "localhost" -QDRANT_PORT = 6333 -# Typesense is the Keyword Search Engine -TYPESENSE_HOST = os.environ.get("TYPESENSE_HOST") or "localhost" -TYPESENSE_PORT = 8108 -TYPESENSE_API_KEY = os.environ.get("TYPESENSE_API_KEY", "") # Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder) INDEX_BATCH_SIZE = 16 @@ -153,11 +142,15 @@ NUM_DOCUMENT_TOKENS_FED_TO_GENERATIVE_MODEL = int( NUM_DOCUMENT_TOKENS_FED_TO_CHAT = int( os.environ.get("NUM_DOCUMENT_TOKENS_FED_TO_CHAT") or (512 * 3) ) -# 1 / (1 + DOC_TIME_DECAY * doc-age-in-years) +# 1 / (1 + DOC_TIME_DECAY * doc-age-in-years), set to 0 to have no decay # Capped in Vespa at 0.5 DOC_TIME_DECAY = float( os.environ.get("DOC_TIME_DECAY") or 0.5 # Hits limit at 2 years by default ) +FAVOR_RECENT_DECAY_MULTIPLIER = 2 +DISABLE_TIME_FILTER_EXTRACTION = ( + os.environ.get("DISABLE_TIME_FILTER_EXTRACTION", "").lower() == "true" +) # 1 edit per 2 characters, currently unused due to fuzzy match being too slow QUOTE_ALLOWED_ERROR_PERCENT = 0.05 QA_TIMEOUT = int(os.environ.get("QA_TIMEOUT") or "60") # 60 seconds diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index c291c7b403..dd22608ed9 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -14,6 +14,7 @@ EMBEDDINGS = "embeddings" ALLOWED_USERS = "allowed_users" ACCESS_CONTROL_LIST = "access_control_list" DOCUMENT_SETS = "document_sets" +TIME_FILTER = "time_filter" METADATA = "metadata" MATCH_HIGHLIGHTS = "match_highlights" # stored in the `metadata` of a chunk. Used to signify that this chunk should diff --git a/backend/danswer/configs/danswerbot_configs.py b/backend/danswer/configs/danswerbot_configs.py index 97e3a0472e..4b7a1ec752 100644 --- a/backend/danswer/configs/danswerbot_configs.py +++ b/backend/danswer/configs/danswerbot_configs.py @@ -35,6 +35,10 @@ DANSWER_BOT_DISPLAY_ERROR_MSGS = os.environ.get( DANSWER_BOT_RESPOND_EVERY_CHANNEL = ( os.environ.get("DANSWER_BOT_RESPOND_EVERY_CHANNEL", "").lower() == "true" ) +# Auto detect query options like time cutoff or heavily favor recently updated docs +DISABLE_DANSWER_BOT_FILTER_DETECT = ( + os.environ.get("DISABLE_DANSWER_BOT_FILTER_DETECT", "").lower() == "true" +) # Add a second LLM call post Answer to verify if the Answer is valid # Throws out answers that don't directly or fully answer the user query # This is the default for all DanswerBot channels unless the bot is configured individually diff --git a/backend/danswer/datastores/datastore_utils.py b/backend/danswer/datastores/datastore_utils.py index 6956e13da1..a4fbb02f3c 100644 --- a/backend/danswer/datastores/datastore_utils.py +++ b/backend/danswer/datastores/datastore_utils.py @@ -1,7 +1,5 @@ import math import uuid -from datetime import datetime -from datetime import timezone from danswer.chunking.models import IndexChunk from danswer.chunking.models import InferenceChunk @@ -32,13 +30,3 @@ def get_uuid_from_chunk( [doc_str, str(chunk.chunk_id), str(mini_chunk_ind)] ) return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string) - - -def translate_to_epoch_seconds_ensure_tz(t: datetime | None) -> int | None: - if not t: - return None - - if t.tzinfo != timezone.utc: - raise ValueError("Connectors must provide document update time in UTC") - - return int(t.timestamp()) diff --git a/backend/danswer/datastores/document_index.py b/backend/danswer/datastores/document_index.py index 7af973c804..480689e7c1 100644 --- a/backend/danswer/datastores/document_index.py +++ b/backend/danswer/datastores/document_index.py @@ -1,109 +1,12 @@ -from typing import Type -from uuid import UUID - -from danswer.chunking.models import DocMetadataAwareIndexChunk -from danswer.chunking.models import InferenceChunk -from danswer.configs.app_configs import DOCUMENT_INDEX_NAME -from danswer.configs.app_configs import DOCUMENT_INDEX_TYPE -from danswer.configs.app_configs import NUM_RETURNED_HITS -from danswer.configs.constants import DocumentIndexType -from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF from danswer.datastores.interfaces import DocumentIndex -from danswer.datastores.interfaces import DocumentInsertionRecord -from danswer.datastores.interfaces import IndexFilter -from danswer.datastores.interfaces import KeywordIndex -from danswer.datastores.interfaces import UpdateRequest -from danswer.datastores.interfaces import VectorIndex -from danswer.datastores.qdrant.store import QdrantIndex -from danswer.datastores.typesense.store import TypesenseIndex from danswer.datastores.vespa.store import VespaIndex from danswer.utils.logger import setup_logger logger = setup_logger() -class SplitDocumentIndex(DocumentIndex): - """A Document index that uses 2 separate storages - one for keyword index and one for vector index""" - - def __init__( - self, - index_name: str | None = DOCUMENT_INDEX_NAME, - keyword_index_cls: Type[KeywordIndex] = TypesenseIndex, - vector_index_cls: Type[VectorIndex] = QdrantIndex, - ) -> None: - index_name = index_name or DOCUMENT_INDEX_NAME - self.keyword_index = keyword_index_cls(index_name) - self.vector_index = vector_index_cls(index_name) - - def ensure_indices_exist(self) -> None: - self.keyword_index.ensure_indices_exist() - self.vector_index.ensure_indices_exist() - - def index( - self, - chunks: list[DocMetadataAwareIndexChunk], - ) -> set[DocumentInsertionRecord]: - keyword_index_result = self.keyword_index.index(chunks) - vector_index_result = self.vector_index.index(chunks) - if keyword_index_result != vector_index_result: - logger.error( - f"Inconsistent document indexing:\n" - f"Keyword: {keyword_index_result}\n" - f"Vector: {vector_index_result}" - ) - return keyword_index_result.union(vector_index_result) - - def update(self, update_requests: list[UpdateRequest]) -> None: - self.keyword_index.update(update_requests) - self.vector_index.update(update_requests) - - def delete(self, doc_ids: list[str]) -> None: - self.keyword_index.delete(doc_ids) - self.vector_index.delete(doc_ids) - - def keyword_retrieval( - self, - query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, - num_to_retrieve: int = NUM_RETURNED_HITS, - ) -> list[InferenceChunk]: - return self.keyword_index.keyword_retrieval( - query, user_id, filters, num_to_retrieve - ) - - def semantic_retrieval( - self, - query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, - num_to_retrieve: int = NUM_RETURNED_HITS, - distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF, - ) -> list[InferenceChunk]: - return self.vector_index.semantic_retrieval( - query, user_id, filters, num_to_retrieve, distance_cutoff - ) - - def hybrid_retrieval( - self, - query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, - num_to_retrieve: int, - ) -> list[InferenceChunk]: - """Currently results from vector and keyword indices are not combined post retrieval. - This may change in the future but for now, the default behavior is to use semantic search - which should be a more flexible/powerful search option""" - return self.semantic_retrieval(query, user_id, filters, num_to_retrieve) - - -def get_default_document_index( - collection: str | None = DOCUMENT_INDEX_NAME, index_type: str = DOCUMENT_INDEX_TYPE -) -> DocumentIndex: - if index_type == DocumentIndexType.COMBINED.value: - return VespaIndex() # Can't specify collection without modifying the deployment - elif index_type == DocumentIndexType.SPLIT.value: - return SplitDocumentIndex(index_name=collection) - else: - raise ValueError("Invalid document index type selected") +def get_default_document_index() -> DocumentIndex: + # Currently only supporting Vespa + # Supporting multiple index types with multiple + # Search-Engines / VectorDBs was too much overhead + return VespaIndex() diff --git a/backend/danswer/datastores/interfaces.py b/backend/danswer/datastores/interfaces.py index bca0005c66..d3f77dea9e 100644 --- a/backend/danswer/datastores/interfaces.py +++ b/backend/danswer/datastores/interfaces.py @@ -2,14 +2,12 @@ import abc from dataclasses import dataclass from datetime import datetime from typing import Any -from uuid import UUID from danswer.access.models import DocumentAccess from danswer.chunking.models import DocMetadataAwareIndexChunk from danswer.chunking.models import InferenceChunk from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF - -IndexFilter = dict[str, str | list[str] | None] +from danswer.server.models import IndexFilters @dataclass(frozen=True) @@ -84,8 +82,8 @@ class KeywordCapable(abc.ABC): def keyword_retrieval( self, query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, + filters: IndexFilters, + favor_recent: bool, num_to_retrieve: int, ) -> list[InferenceChunk]: raise NotImplementedError @@ -96,8 +94,8 @@ class VectorCapable(abc.ABC): def semantic_retrieval( self, query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, + filters: IndexFilters, + favor_recent: bool, num_to_retrieve: int, distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF, ) -> list[InferenceChunk]: @@ -109,14 +107,25 @@ class HybridCapable(abc.ABC): def hybrid_retrieval( self, query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, + filters: IndexFilters, + favor_recent: bool, num_to_retrieve: int, ) -> list[InferenceChunk]: raise NotImplementedError -class BaseIndex(Verifiable, Indexable, Updatable, Deletable, abc.ABC): +class AdminCapable(abc.ABC): + @abc.abstractmethod + def admin_retrieval( + self, + query: str, + filters: IndexFilters, + num_to_retrieve: int, + ) -> list[InferenceChunk]: + raise NotImplementedError + + +class BaseIndex(Verifiable, AdminCapable, Indexable, Updatable, Deletable, abc.ABC): """All basic functionalities excluding a specific retrieval approach Indices need to be able to - Check that the index exists with a schema definition diff --git a/backend/danswer/datastores/qdrant/__init__.py b/backend/danswer/datastores/qdrant/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/backend/danswer/datastores/qdrant/indexing.py b/backend/danswer/datastores/qdrant/indexing.py deleted file mode 100644 index c2019d96af..0000000000 --- a/backend/danswer/datastores/qdrant/indexing.py +++ /dev/null @@ -1,152 +0,0 @@ -import json - -from qdrant_client import QdrantClient -from qdrant_client.http import models -from qdrant_client.http.exceptions import ResponseHandlingException -from qdrant_client.http.models.models import UpdateResult -from qdrant_client.models import PointStruct - -from danswer.chunking.models import DocMetadataAwareIndexChunk -from danswer.configs.constants import ALLOWED_USERS -from danswer.configs.constants import BLURB -from danswer.configs.constants import CHUNK_ID -from danswer.configs.constants import CONTENT -from danswer.configs.constants import DOCUMENT_ID -from danswer.configs.constants import METADATA -from danswer.configs.constants import SECTION_CONTINUATION -from danswer.configs.constants import SEMANTIC_IDENTIFIER -from danswer.configs.constants import SOURCE_LINKS -from danswer.configs.constants import SOURCE_TYPE -from danswer.datastores.datastore_utils import DEFAULT_BATCH_SIZE -from danswer.datastores.datastore_utils import get_uuid_from_chunk -from danswer.datastores.interfaces import DocumentInsertionRecord -from danswer.utils.clients import get_qdrant_client -from danswer.utils.logger import setup_logger - - -logger = setup_logger() - - -def _does_document_exist( - doc_chunk_id: str, collection_name: str, q_client: QdrantClient -) -> bool: - """Get whether a document is found and the existing whitelists""" - results = q_client.retrieve( - collection_name=collection_name, - ids=[doc_chunk_id], - ) - if len(results) == 0: - return False - - return True - - -def delete_qdrant_doc_chunks( - document_id: str, collection_name: str, q_client: QdrantClient -) -> bool: - q_client.delete( - collection_name=collection_name, - points_selector=models.FilterSelector( - filter=models.Filter( - must=[ - models.FieldCondition( - key=DOCUMENT_ID, - match=models.MatchValue(value=document_id), - ), - ], - ) - ), - ) - return True - - -def index_qdrant_chunks( - chunks: list[DocMetadataAwareIndexChunk], - collection: str, - client: QdrantClient | None = None, - batch_upsert: bool = True, -) -> set[DocumentInsertionRecord]: - # Public documents will have the PUBLIC string in ALLOWED_USERS - # If credential that kicked this off has no user associated, either Auth is off or the doc is public - q_client: QdrantClient = client if client else get_qdrant_client() - - point_structs: list[PointStruct] = [] - insertion_records: set[DocumentInsertionRecord] = set() - # document ids of documents that existed BEFORE this indexing - already_existing_documents: set[str] = set() - for chunk in chunks: - document = chunk.source_document - - # Delete all chunks related to the document if (1) it already exists and - # (2) this is our first time running into it during this indexing attempt - document_exists = _does_document_exist(document.id, collection, q_client) - if document_exists and document.id not in already_existing_documents: - # Processing the first chunk of the doc and the doc exists - delete_qdrant_doc_chunks(document.id, collection, q_client) - already_existing_documents.add(document.id) - - embeddings = chunk.embeddings - vector_list = [embeddings.full_embedding] - vector_list.extend(embeddings.mini_chunk_embeddings) - - for minichunk_ind, embedding in enumerate(vector_list): - qdrant_id = str(get_uuid_from_chunk(chunk, minichunk_ind)) - insertion_records.add( - DocumentInsertionRecord( - document_id=document.id, - already_existed=document.id in already_existing_documents, - ) - ) - point_structs.append( - PointStruct( - id=qdrant_id, - payload={ - DOCUMENT_ID: document.id, - CHUNK_ID: chunk.chunk_id, - BLURB: chunk.blurb, - CONTENT: chunk.content, - SOURCE_TYPE: str(document.source.value), - SOURCE_LINKS: chunk.source_links, - SEMANTIC_IDENTIFIER: document.semantic_identifier, - SECTION_CONTINUATION: chunk.section_continuation, - ALLOWED_USERS: json.dumps(chunk.access.to_acl()), - METADATA: json.dumps(document.metadata), - }, - vector=embedding, - ) - ) - - if batch_upsert: - point_struct_batches = [ - point_structs[x : x + DEFAULT_BATCH_SIZE] - for x in range(0, len(point_structs), DEFAULT_BATCH_SIZE) - ] - for point_struct_batch in point_struct_batches: - - def upsert() -> UpdateResult | None: - for _ in range(5): - try: - return q_client.upsert( - collection_name=collection, points=point_struct_batch - ) - except ResponseHandlingException as e: - logger.warning( - f"Failed to upsert batch into qdrant due to error: {e}" - ) - return None - - index_results = upsert() - log_status = index_results.status if index_results else "Failed" - logger.info( - f"Indexed {len(point_struct_batch)} chunks into Qdrant collection '{collection}', " - f"status: {log_status}" - ) - else: - index_results = q_client.upsert( - collection_name=collection, points=point_structs - ) - logger.info( - f"Document batch of size {len(point_structs)} indexing status: {index_results.status}" - ) - - return insertion_records diff --git a/backend/danswer/datastores/qdrant/store.py b/backend/danswer/datastores/qdrant/store.py deleted file mode 100644 index 14505c685c..0000000000 --- a/backend/danswer/datastores/qdrant/store.py +++ /dev/null @@ -1,221 +0,0 @@ -from uuid import UUID - -from qdrant_client import QdrantClient -from qdrant_client.http.exceptions import ResponseHandlingException -from qdrant_client.http.exceptions import UnexpectedResponse -from qdrant_client.http.models import FieldCondition -from qdrant_client.http.models import Filter -from qdrant_client.http.models import MatchAny -from qdrant_client.http.models import MatchValue - -from danswer.chunking.models import DocMetadataAwareIndexChunk -from danswer.chunking.models import InferenceChunk -from danswer.configs.app_configs import DOCUMENT_INDEX_NAME -from danswer.configs.app_configs import NUM_RETURNED_HITS -from danswer.configs.constants import ALLOWED_USERS -from danswer.configs.constants import DOCUMENT_ID -from danswer.configs.constants import PUBLIC_DOC_PAT -from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF -from danswer.datastores.datastore_utils import get_uuid_from_chunk -from danswer.datastores.interfaces import DocumentInsertionRecord -from danswer.datastores.interfaces import IndexFilter -from danswer.datastores.interfaces import UpdateRequest -from danswer.datastores.interfaces import VectorIndex -from danswer.datastores.qdrant.indexing import index_qdrant_chunks -from danswer.datastores.qdrant.utils import create_qdrant_collection -from danswer.datastores.qdrant.utils import list_qdrant_collections -from danswer.search.semantic_search import embed_query -from danswer.utils.batching import batch_generator -from danswer.utils.clients import get_qdrant_client -from danswer.utils.logger import setup_logger -from danswer.utils.timing import log_function_time - -logger = setup_logger() - -# how many points we want to delete/update at a time -_BATCH_SIZE = 200 - - -def _build_qdrant_filters( - user_id: UUID | None, filters: list[IndexFilter] | None -) -> list[FieldCondition]: - filter_conditions: list[FieldCondition] = [] - # Permissions filter - if user_id: - filter_conditions.append( - FieldCondition( - key=ALLOWED_USERS, - match=MatchAny(any=[str(user_id), PUBLIC_DOC_PAT]), - ) - ) - else: - filter_conditions.append( - FieldCondition( - key=ALLOWED_USERS, - match=MatchValue(value=PUBLIC_DOC_PAT), - ) - ) - - # Provided query filters - if filters: - for filter_dict in filters: - valid_filters = { - key: value for key, value in filter_dict.items() if value is not None - } - for filter_key, filter_val in valid_filters.items(): - if isinstance(filter_val, str): - filter_conditions.append( - FieldCondition( - key=filter_key, - match=MatchValue(value=filter_val), - ) - ) - elif isinstance(filter_val, list): - filter_conditions.append( - FieldCondition( - key=filter_key, - match=MatchAny(any=filter_val), - ) - ) - else: - raise ValueError("Invalid filters provided") - - return filter_conditions - - -def _get_points_from_document_ids( - document_ids: list[str], - collection: str, - client: QdrantClient, -) -> list[int | str]: - offset: int | str | None = 0 - chunk_ids = [] - while offset is not None: - matches, offset = client.scroll( - collection_name=collection, - scroll_filter=Filter( - must=[FieldCondition(key=DOCUMENT_ID, match=MatchAny(any=document_ids))] - ), - limit=_BATCH_SIZE, - with_payload=False, - with_vectors=False, - offset=offset, - ) - for match in matches: - chunk_ids.append(match.id) - - return chunk_ids - - -class QdrantIndex(VectorIndex): - def __init__(self, index_name: str = DOCUMENT_INDEX_NAME) -> None: - # In Qdrant, the vector index is referred to as a collection - self.collection = index_name - self.client = get_qdrant_client() - - def ensure_indices_exist(self) -> None: - if self.collection not in { - collection.name - for collection in list_qdrant_collections(self.client).collections - }: - logger.info(f"Creating Qdrant collection with name: {self.collection}") - create_qdrant_collection( - collection_name=self.collection, q_client=self.client - ) - - def index( - self, - chunks: list[DocMetadataAwareIndexChunk], - ) -> set[DocumentInsertionRecord]: - return index_qdrant_chunks( - chunks=chunks, - collection=self.collection, - client=self.client, - ) - - def update(self, update_requests: list[UpdateRequest]) -> None: - logger.info( - f"Updating {len(update_requests)} documents' allowed_users in Qdrant" - ) - for update_request in update_requests: - for doc_id_batch in batch_generator( - items=update_request.document_ids, - batch_size=_BATCH_SIZE, - ): - if update_request.access is None: - continue - - chunk_ids = _get_points_from_document_ids( - doc_id_batch, self.collection, self.client - ) - self.client.set_payload( - collection_name=self.collection, - payload={ALLOWED_USERS: update_request.access.to_acl()}, - points=chunk_ids, - ) - - def delete(self, doc_ids: list[str]) -> None: - logger.info(f"Deleting {len(doc_ids)} documents from Qdrant") - for doc_id_batch in batch_generator(items=doc_ids, batch_size=_BATCH_SIZE): - chunk_ids = _get_points_from_document_ids( - doc_id_batch, self.collection, self.client - ) - self.client.delete( - collection_name=self.collection, - points_selector=chunk_ids, - ) - - @log_function_time() - def semantic_retrieval( - self, - query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, - num_to_retrieve: int = NUM_RETURNED_HITS, - distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF, - page_size: int = NUM_RETURNED_HITS, - ) -> list[InferenceChunk]: - query_embedding = embed_query(query) - - filter_conditions = _build_qdrant_filters(user_id, filters) - - page_offset = 0 - found_inference_chunks: list[InferenceChunk] = [] - found_chunk_uuids: set[UUID] = set() - while len(found_inference_chunks) < num_to_retrieve: - try: - hits = self.client.search( - collection_name=self.collection, - query_vector=query_embedding, - query_filter=Filter(must=list(filter_conditions)), - limit=page_size, - offset=page_offset, - score_threshold=distance_cutoff, - ) - page_offset += page_size - if not hits: - break - except ResponseHandlingException as e: - logger.exception( - f'Qdrant querying failed due to: "{e}", is Qdrant set up?' - ) - break - except UnexpectedResponse as e: - logger.exception( - f'Qdrant querying failed due to: "{e}", has ingestion been run?' - ) - break - - inference_chunks_from_hits = [ - InferenceChunk.from_dict(hit.payload) - for hit in hits - if hit.payload is not None - ] - for inf_chunk in inference_chunks_from_hits: - # remove duplicate chunks which happen if minichunks are used - inf_chunk_id = get_uuid_from_chunk(inf_chunk) - if inf_chunk_id not in found_chunk_uuids: - found_inference_chunks.append(inf_chunk) - found_chunk_uuids.add(inf_chunk_id) - - return found_inference_chunks diff --git a/backend/danswer/datastores/qdrant/utils.py b/backend/danswer/datastores/qdrant/utils.py deleted file mode 100644 index 2c2bf95452..0000000000 --- a/backend/danswer/datastores/qdrant/utils.py +++ /dev/null @@ -1,45 +0,0 @@ -from typing import Any - -from qdrant_client import QdrantClient -from qdrant_client.http.models import Record -from qdrant_client.models import CollectionsResponse -from qdrant_client.models import Distance -from qdrant_client.models import VectorParams - -from danswer.configs.model_configs import DOC_EMBEDDING_DIM -from danswer.utils.clients import get_qdrant_client -from danswer.utils.logger import setup_logger - - -logger = setup_logger() - - -def list_qdrant_collections( - q_client: QdrantClient | None = None, -) -> CollectionsResponse: - q_client = q_client or get_qdrant_client() - return q_client.get_collections() - - -def create_qdrant_collection( - collection_name: str, - embedding_dim: int = DOC_EMBEDDING_DIM, - q_client: QdrantClient | None = None, -) -> None: - q_client = q_client or get_qdrant_client() - logger.info(f"Attempting to create collection {collection_name}") - result = q_client.create_collection( - collection_name=collection_name, - vectors_config=VectorParams(size=embedding_dim, distance=Distance.COSINE), - ) - if not result: - raise RuntimeError("Could not create Qdrant collection") - - -def get_payload_from_record(record: Record, is_required: bool = True) -> dict[str, Any]: - if record.payload is None and is_required: - raise RuntimeError( - "Qdrant Index is corrupted, Document found with no metadata." - ) - - return record.payload or {} diff --git a/backend/danswer/datastores/typesense/__init__.py b/backend/danswer/datastores/typesense/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/backend/danswer/datastores/typesense/store.py b/backend/danswer/datastores/typesense/store.py deleted file mode 100644 index 23947a7ca2..0000000000 --- a/backend/danswer/datastores/typesense/store.py +++ /dev/null @@ -1,280 +0,0 @@ -import json -from typing import Any -from uuid import UUID - -import typesense # type: ignore -from typesense.exceptions import ObjectNotFound # type: ignore - -from danswer.chunking.models import DocMetadataAwareIndexChunk -from danswer.chunking.models import InferenceChunk -from danswer.configs.app_configs import DOCUMENT_INDEX_NAME -from danswer.configs.app_configs import NUM_RETURNED_HITS -from danswer.configs.constants import ALLOWED_USERS -from danswer.configs.constants import BLURB -from danswer.configs.constants import CHUNK_ID -from danswer.configs.constants import CONTENT -from danswer.configs.constants import DOCUMENT_ID -from danswer.configs.constants import METADATA -from danswer.configs.constants import PUBLIC_DOC_PAT -from danswer.configs.constants import SECTION_CONTINUATION -from danswer.configs.constants import SEMANTIC_IDENTIFIER -from danswer.configs.constants import SOURCE_LINKS -from danswer.configs.constants import SOURCE_TYPE -from danswer.datastores.datastore_utils import DEFAULT_BATCH_SIZE -from danswer.datastores.datastore_utils import get_uuid_from_chunk -from danswer.datastores.interfaces import DocumentInsertionRecord -from danswer.datastores.interfaces import IndexFilter -from danswer.datastores.interfaces import KeywordIndex -from danswer.datastores.interfaces import UpdateRequest -from danswer.utils.batching import batch_generator -from danswer.utils.clients import get_typesense_client -from danswer.utils.logger import setup_logger - - -logger = setup_logger() - -# how many points we want to delete/update at a time -_BATCH_SIZE = 200 - - -def create_typesense_collection( - collection_name: str = DOCUMENT_INDEX_NAME, -) -> None: - ts_client = get_typesense_client() - collection_schema = { - "name": collection_name, - "fields": [ - # Typesense uses "id" type string as a special field - {"name": "id", "type": "string"}, - {"name": DOCUMENT_ID, "type": "string"}, - {"name": CHUNK_ID, "type": "int32"}, - {"name": BLURB, "type": "string"}, - {"name": CONTENT, "type": "string"}, - {"name": SOURCE_TYPE, "type": "string"}, - {"name": SOURCE_LINKS, "type": "string"}, - {"name": SEMANTIC_IDENTIFIER, "type": "string"}, - {"name": SECTION_CONTINUATION, "type": "bool"}, - {"name": ALLOWED_USERS, "type": "string[]"}, - {"name": METADATA, "type": "string"}, - ], - } - ts_client.collections.create(collection_schema) - - -def _check_typesense_collection_exist( - collection_name: str = DOCUMENT_INDEX_NAME, -) -> bool: - client = get_typesense_client() - try: - client.collections[collection_name].retrieve() - except ObjectNotFound: - return False - return True - - -def _does_document_exist( - doc_chunk_id: str, collection_name: str, ts_client: typesense.Client -) -> bool: - """Returns whether the document already exists and the users/group whitelists""" - try: - ts_client.collections[collection_name].documents[doc_chunk_id].retrieve() - except ObjectNotFound: - return False - - return True - - -def _delete_typesense_doc_chunks( - document_id: str, collection_name: str, ts_client: typesense.Client -) -> bool: - doc_id_filter = {"filter_by": f"{DOCUMENT_ID}:'{document_id}'"} - - # Typesense doesn't seem to prioritize individual deletions, problem not seen with this approach - # Point to consider if we see instances of number of Typesense and Qdrant docs not matching - del_result = ts_client.collections[collection_name].documents.delete(doc_id_filter) - return del_result["num_deleted"] != 0 - - -def _index_typesense_chunks( - chunks: list[DocMetadataAwareIndexChunk], - collection: str, - client: typesense.Client | None = None, - batch_upsert: bool = True, -) -> set[DocumentInsertionRecord]: - ts_client: typesense.Client = client if client else get_typesense_client() - - insertion_records: set[DocumentInsertionRecord] = set() - new_documents: list[dict[str, Any]] = [] - # document ids of documents that existed BEFORE this indexing - already_existing_documents: set[str] = set() - for chunk in chunks: - document = chunk.source_document - typesense_id = str(get_uuid_from_chunk(chunk)) - - # Delete all chunks related to the document if (1) it already exists and - # (2) this is our first time running into it during this indexing attempt - document_exists = _does_document_exist(typesense_id, collection, ts_client) - if document_exists and document.id not in already_existing_documents: - # Processing the first chunk of the doc and the doc exists - _delete_typesense_doc_chunks(document.id, collection, ts_client) - already_existing_documents.add(document.id) - - insertion_records.add( - DocumentInsertionRecord( - document_id=document.id, - already_existed=document.id in already_existing_documents, - ) - ) - new_documents.append( - { - "id": typesense_id, # No minichunks for typesense - DOCUMENT_ID: document.id, - CHUNK_ID: chunk.chunk_id, - BLURB: chunk.blurb, - CONTENT: chunk.content, - SOURCE_TYPE: str(document.source.value), - SOURCE_LINKS: json.dumps(chunk.source_links), - SEMANTIC_IDENTIFIER: document.semantic_identifier, - SECTION_CONTINUATION: chunk.section_continuation, - ALLOWED_USERS: json.dumps(chunk.access.to_acl()), - METADATA: json.dumps(document.metadata), - } - ) - - if batch_upsert: - doc_batches = [ - new_documents[x : x + DEFAULT_BATCH_SIZE] - for x in range(0, len(new_documents), DEFAULT_BATCH_SIZE) - ] - for doc_batch in doc_batches: - results = ts_client.collections[collection].documents.import_( - doc_batch, {"action": "upsert"} - ) - failures = [ - doc_res["success"] - for doc_res in results - if doc_res["success"] is not True - ] - logger.info( - f"Indexed {len(doc_batch)} chunks into Typesense collection '{collection}', " - f"number failed: {len(failures)}" - ) - else: - [ - ts_client.collections[collection].documents.upsert(document) - for document in new_documents - ] - - return insertion_records - - -def _build_typesense_filters( - user_id: UUID | None, filters: list[IndexFilter] | None -) -> str: - filter_str = "" - - # Permissions filter - if user_id: - filter_str += f"{ALLOWED_USERS}:=[{PUBLIC_DOC_PAT},{user_id}] && " - else: - filter_str += f"{ALLOWED_USERS}:={PUBLIC_DOC_PAT} && " - - # Provided query filters - if filters: - for filter_dict in filters: - valid_filters = { - key: value for key, value in filter_dict.items() if value is not None - } - for filter_key, filter_val in valid_filters.items(): - if isinstance(filter_val, str): - filter_str += f"{filter_key}:={filter_val} && " - elif isinstance(filter_val, list): - filters_or = ",".join([str(f_val) for f_val in filter_val]) - filter_str += f"{filter_key}:=[{filters_or}] && " - else: - raise ValueError("Invalid filters provided") - if filter_str[-4:] == " && ": - filter_str = filter_str[:-4] - return filter_str - - -class TypesenseIndex(KeywordIndex): - def __init__(self, index_name: str = DOCUMENT_INDEX_NAME) -> None: - # In Typesense, the document index is referred to as a collection - self.collection = index_name - self.ts_client = get_typesense_client() - - def ensure_indices_exist(self) -> None: - if not _check_typesense_collection_exist(self.collection): - logger.info(f"Creating Typesense collection with name: {self.collection}") - create_typesense_collection(collection_name=self.collection) - - def index( - self, chunks: list[DocMetadataAwareIndexChunk] - ) -> set[DocumentInsertionRecord]: - return _index_typesense_chunks( - chunks=chunks, - collection=self.collection, - client=self.ts_client, - ) - - def update(self, update_requests: list[UpdateRequest]) -> None: - logger.info( - f"Updating {len(update_requests)} documents' allowed_users in Typesense" - ) - for update_request in update_requests: - for id_batch in batch_generator( - items=update_request.document_ids, batch_size=_BATCH_SIZE - ): - if update_request.access is None: - continue - - typesense_updates = [ - { - DOCUMENT_ID: doc_id, - ALLOWED_USERS: update_request.access.to_acl(), - } - for doc_id in id_batch - ] - self.ts_client.collections[self.collection].documents.import_( - typesense_updates, {"action": "update"} - ) - - def delete(self, doc_ids: list[str]) -> None: - logger.info(f"Deleting {len(doc_ids)} documents from Typesense") - for id_batch in batch_generator(items=doc_ids, batch_size=_BATCH_SIZE): - self.ts_client.collections[self.collection].documents.delete( - {"filter_by": f'{DOCUMENT_ID}:[{",".join(id_batch)}]'} - ) - - def keyword_retrieval( - self, - query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, - num_to_retrieve: int = NUM_RETURNED_HITS, - ) -> list[InferenceChunk]: - filters_str = _build_typesense_filters(user_id, filters) - - search_query = { - "q": query, - "query_by": CONTENT, - "filter_by": filters_str, - "per_page": num_to_retrieve, - "limit_hits": num_to_retrieve, - "num_typos": 2, - "prefix": "false", - # below is required to allow proper partial matching of a query - # (partial matching = only some of the terms in the query match) - # more info here: https://typesense-community.slack.com/archives/C01P749MET0/p1688083239192799 - "exhaustive_search": "true", - } - - search_results = self.ts_client.collections[self.collection].documents.search( - search_query - ) - - hits = search_results["hits"] - inference_chunks = [InferenceChunk.from_dict(hit["document"]) for hit in hits] - - return inference_chunks diff --git a/backend/danswer/datastores/vespa/store.py b/backend/danswer/datastores/vespa/store.py index d8e9642760..be0c219e1f 100644 --- a/backend/danswer/datastores/vespa/store.py +++ b/backend/danswer/datastores/vespa/store.py @@ -4,9 +4,11 @@ import string import time from collections.abc import Mapping from dataclasses import dataclass +from datetime import datetime +from datetime import timedelta +from datetime import timezone from typing import Any from typing import cast -from uuid import UUID import requests from requests import HTTPError @@ -17,6 +19,7 @@ from danswer.chunking.models import InferenceChunk from danswer.configs.app_configs import DOC_TIME_DECAY from danswer.configs.app_configs import DOCUMENT_INDEX_NAME from danswer.configs.app_configs import EDIT_KEYWORD_QUERY +from danswer.configs.app_configs import FAVOR_RECENT_DECAY_MULTIPLIER from danswer.configs.app_configs import NUM_RETURNED_HITS from danswer.configs.app_configs import VESPA_DEPLOYMENT_ZIP from danswer.configs.app_configs import VESPA_HOST @@ -46,10 +49,9 @@ from danswer.configs.constants import SOURCE_TYPE from danswer.configs.constants import TITLE from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF from danswer.datastores.datastore_utils import get_uuid_from_chunk -from danswer.datastores.datastore_utils import translate_to_epoch_seconds_ensure_tz from danswer.datastores.interfaces import DocumentIndex from danswer.datastores.interfaces import DocumentInsertionRecord -from danswer.datastores.interfaces import IndexFilter +from danswer.datastores.interfaces import IndexFilters from danswer.datastores.interfaces import UpdateRequest from danswer.datastores.vespa.utils import remove_invalid_unicode_chars from danswer.search.keyword_search import remove_stop_words @@ -99,6 +101,16 @@ def _does_document_exist( return True +def _vespa_get_updated_at_attribute(t: datetime | None) -> int | None: + if not t: + return None + + if t.tzinfo != timezone.utc: + raise ValueError("Connectors must provide document update time in UTC") + + return int(t.timestamp()) + + def _get_vespa_chunk_ids_by_document_id( document_id: str, hits_per_page: int = _BATCH_SIZE ) -> list[str]: @@ -178,7 +190,7 @@ def _index_vespa_chunk( METADATA: json.dumps(document.metadata), EMBEDDINGS: embeddings_name_vector_map, BOOST: DEFAULT_BOOST, - DOC_UPDATED_AT: translate_to_epoch_seconds_ensure_tz(document.doc_updated_at), + DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at), PRIMARY_OWNERS: document.primary_owners, SECONDARY_OWNERS: document.secondary_owners, # the only `set` vespa has is `weightedset`, so we have to give each @@ -275,37 +287,48 @@ def _index_vespa_chunks( return insertion_records -def _build_vespa_filters( - filters: list[IndexFilter] | None, include_hidden: bool = False -) -> str: - # NOTE: permissions filters are expected to be passed in directly via - # the `filters` arg, which is why they are not considered explicitly here +def _build_vespa_filters(filters: IndexFilters, include_hidden: bool = False) -> str: + def _build_or_filters(key: str, vals: list[str] | None) -> str: + if vals is None: + return "" - # NOTE: document-set filters are also expected to be passed in directly - # via the `filters` arg. These are set either in the Web UI or in the Slack - # listener + valid_vals = [val for val in vals if val] + if not key or not valid_vals: + return "" + + eq_elems = [f'{key} contains "{elem}"' for elem in valid_vals] + or_clause = " or ".join(eq_elems) + return f"({or_clause}) and " + + def _build_time_filter( + cutoff: datetime | None, + untimed_doc_cutoff: timedelta = timedelta(days=62), # Slightly over 2 Months + ) -> str: + if not cutoff: + return "" + + # For Documents that don't have an updated at, filter them out for queries asking for + # very recent documents (2 months) default + include_untimed = datetime.now(timezone.utc) - untimed_doc_cutoff > cutoff + cutoff_secs = int(cutoff.timestamp()) + + if include_untimed: + # Documents without updated_at are assigned -1 as their date + return f"!({DOC_UPDATED_AT} < {cutoff_secs}) and " + + return f"({DOC_UPDATED_AT} >= {cutoff_secs}) and " - # usually ignore hidden docs unless explicitly requested. We may want to - # get hidden docs on the admin panel to allow for un-hiding filter_str = f"!({HIDDEN}=true) and " if not include_hidden else "" - # Handle provided query filters - if filters: - for filter_dict in filters: - valid_filters = { - key: value for key, value in filter_dict.items() if value is not None - } - for filter_key, filter_val in valid_filters.items(): - if isinstance(filter_val, str): - filter_str += f'{filter_key} contains "{filter_val}" and ' - elif isinstance(filter_val, list): - eq_elems = [ - f'{filter_key} contains "{elem}"' for elem in filter_val - ] - filters_or = " or ".join(eq_elems) - filter_str += f"({filters_or}) and " - else: - raise ValueError("Invalid filters provided") + # CAREFUL touching this one, currently there is no second ACL double-check post retrieval + filter_str += _build_or_filters(ACCESS_CONTROL_LIST, filters.access_control_list) + + filter_str += _build_or_filters(SOURCE_TYPE, filters.source_type) + + filter_str += _build_or_filters(DOCUMENT_SETS, filters.document_set) + + filter_str += _build_time_filter(filters.time_cutoff) + return filter_str @@ -530,10 +553,11 @@ class VespaIndex(DocumentIndex): def keyword_retrieval( self, query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, + filters: IndexFilters, + favor_recent: bool, num_to_retrieve: int = NUM_RETURNED_HITS, ) -> list[InferenceChunk]: + decay_multiplier = FAVOR_RECENT_DECAY_MULTIPLIER if favor_recent else 1 vespa_where_clauses = _build_vespa_filters(filters) yql = ( VespaIndex.yql_base @@ -549,7 +573,7 @@ class VespaIndex(DocumentIndex): params: dict[str, str | int] = { "yql": yql, "query": query, - "input.query(decay_factor)": str(DOC_TIME_DECAY), + "input.query(decay_factor)": str(DOC_TIME_DECAY * decay_multiplier), "hits": num_to_retrieve, "num_to_rerank": 10 * num_to_retrieve, "ranking.profile": "keyword_search", @@ -560,11 +584,12 @@ class VespaIndex(DocumentIndex): def semantic_retrieval( self, query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, - num_to_retrieve: int, + filters: IndexFilters, + favor_recent: bool, + num_to_retrieve: int = NUM_RETURNED_HITS, distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF, ) -> list[InferenceChunk]: + decay_multiplier = FAVOR_RECENT_DECAY_MULTIPLIER if favor_recent else 1 vespa_where_clauses = _build_vespa_filters(filters) yql = ( VespaIndex.yql_base @@ -587,7 +612,7 @@ class VespaIndex(DocumentIndex): "yql": yql, "query": query_keywords, "input.query(query_embedding)": str(query_embedding), - "input.query(decay_factor)": str(DOC_TIME_DECAY), + "input.query(decay_factor)": str(DOC_TIME_DECAY * decay_multiplier), "ranking.profile": "semantic_search", } @@ -596,8 +621,8 @@ class VespaIndex(DocumentIndex): def hybrid_retrieval( self, query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, + filters: IndexFilters, + favor_recent: bool, num_to_retrieve: int, ) -> list[InferenceChunk]: vespa_where_clauses = _build_vespa_filters(filters) @@ -628,8 +653,7 @@ class VespaIndex(DocumentIndex): def admin_retrieval( self, query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, + filters: IndexFilters, num_to_retrieve: int = NUM_RETURNED_HITS, ) -> list[InferenceChunk]: vespa_where_clauses = _build_vespa_filters(filters, include_hidden=True) diff --git a/backend/danswer/direct_qa/answer_question.py b/backend/danswer/direct_qa/answer_question.py index 58ddc2bd6f..d559dd6515 100644 --- a/backend/danswer/direct_qa/answer_question.py +++ b/backend/danswer/direct_qa/answer_question.py @@ -26,6 +26,8 @@ from danswer.search.models import SearchType from danswer.search.semantic_search import chunks_to_search_docs from danswer.search.semantic_search import retrieve_ranked_documents from danswer.secondary_llm_flows.answer_validation import get_answer_validity +from danswer.secondary_llm_flows.extract_filters import extract_question_time_filters +from danswer.server.models import IndexFilters from danswer.server.models import QAResponse from danswer.server.models import QuestionRequest from danswer.utils.logger import setup_logger @@ -49,11 +51,14 @@ def answer_qa_query( llm_metrics_callback: Callable[[LLMMetricsContainer], None] | None = None, ) -> QAResponse: query = question.query - filters = question.filters use_keyword = question.use_keyword offset_count = question.offset if question.offset is not None else 0 logger.info(f"Received QA query: {query}") + time_cutoff, favor_recent = extract_question_time_filters(question) + question.filters.time_cutoff = time_cutoff + filters = question.filters + query_event_id = create_query_event( query=query, selected_flow=SearchType.KEYWORD @@ -70,22 +75,27 @@ def answer_qa_query( user_id = None if user is None else user.id user_acl_filters = build_access_filters_for_user(user, db_session) - final_filters = (filters or []) + user_acl_filters + final_filters = IndexFilters( + source_type=filters.source_type, + document_set=filters.document_set, + time_cutoff=time_cutoff, + access_control_list=user_acl_filters, + ) if use_keyword: ranked_chunks: list[InferenceChunk] | None = retrieve_keyword_documents( - query, - user_id, - final_filters, - get_default_document_index(), + query=query, + filters=final_filters, + favor_recent=favor_recent, + datastore=get_default_document_index(), retrieval_metrics_callback=retrieval_metrics_callback, ) unranked_chunks: list[InferenceChunk] | None = [] else: ranked_chunks, unranked_chunks = retrieve_ranked_documents( - query, - user_id, - final_filters, - get_default_document_index(), + query=query, + filters=final_filters, + favor_recent=favor_recent, + datastore=get_default_document_index(), retrieval_metrics_callback=retrieval_metrics_callback, rerank_metrics_callback=rerank_metrics_callback, ) @@ -98,6 +108,8 @@ def answer_qa_query( predicted_flow=predicted_flow, predicted_search=predicted_search, query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, ) top_docs = chunks_to_search_docs(ranked_chunks) @@ -121,6 +133,8 @@ def answer_qa_query( predicted_flow=QueryFlow.SEARCH, predicted_search=predicted_search, query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, ) try: @@ -135,8 +149,10 @@ def answer_qa_query( lower_ranked_docs=unranked_top_docs, predicted_flow=predicted_flow, predicted_search=predicted_search, - error_msg=str(e), query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, + error_msg=str(e), ) # remove chunks marked as not applicable for QA (e.g. Google Drive file @@ -179,8 +195,10 @@ def answer_qa_query( predicted_flow=predicted_flow, predicted_search=predicted_search, eval_res_valid=True if valid else False, - error_msg=error_msg, query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, + error_msg=error_msg, ) return QAResponse( @@ -190,6 +208,8 @@ def answer_qa_query( lower_ranked_docs=unranked_top_docs, predicted_flow=predicted_flow, predicted_search=predicted_search, - error_msg=error_msg, query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, + error_msg=error_msg, ) diff --git a/backend/danswer/search/access_filters.py b/backend/danswer/search/access_filters.py index e6781d612d..d69a1e61f9 100644 --- a/backend/danswer/search/access_filters.py +++ b/backend/danswer/search/access_filters.py @@ -1,13 +1,20 @@ from sqlalchemy.orm import Session from danswer.access.access import get_acl_for_user -from danswer.configs.constants import ACCESS_CONTROL_LIST -from danswer.datastores.interfaces import IndexFilter from danswer.db.models import User +from danswer.server.models import IndexFilters -def build_access_filters_for_user( - user: User | None, session: Session -) -> list[IndexFilter]: +def build_access_filters_for_user(user: User | None, session: Session) -> list[str]: user_acl = get_acl_for_user(user, session) - return [{ACCESS_CONTROL_LIST: list(user_acl)}] + return list(user_acl) + + +def build_user_only_filters(user: User | None, db_session: Session) -> IndexFilters: + user_acl_filters = build_access_filters_for_user(user, db_session) + return IndexFilters( + source_type=None, + document_set=None, + time_cutoff=None, + access_control_list=user_acl_filters, + ) diff --git a/backend/danswer/search/keyword_search.py b/backend/danswer/search/keyword_search.py index f681fe12c1..25ad1c9114 100644 --- a/backend/danswer/search/keyword_search.py +++ b/backend/danswer/search/keyword_search.py @@ -1,6 +1,4 @@ -import json from collections.abc import Callable -from uuid import UUID from nltk.corpus import stopwords # type:ignore from nltk.stem import WordNetLemmatizer # type:ignore @@ -10,7 +8,7 @@ from danswer.chunking.models import InferenceChunk from danswer.configs.app_configs import EDIT_KEYWORD_QUERY from danswer.configs.app_configs import NUM_RETURNED_HITS from danswer.datastores.interfaces import DocumentIndex -from danswer.datastores.interfaces import IndexFilter +from danswer.datastores.interfaces import IndexFilters from danswer.search.models import ChunkMetric from danswer.search.models import MAX_METRICS_CONTENT from danswer.search.models import RetrievalMetricsContainer @@ -44,8 +42,8 @@ def query_processing( @log_function_time() def retrieve_keyword_documents( query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, + filters: IndexFilters, + favor_recent: bool, datastore: DocumentIndex, num_hits: int = NUM_RETURNED_HITS, edit_query: bool = EDIT_KEYWORD_QUERY, @@ -54,12 +52,13 @@ def retrieve_keyword_documents( ) -> list[InferenceChunk] | None: edited_query = query_processing(query) if edit_query else query - top_chunks = datastore.keyword_retrieval(edited_query, user_id, filters, num_hits) + top_chunks = datastore.keyword_retrieval( + edited_query, filters, favor_recent, num_hits + ) if not top_chunks: - filters_log_msg = json.dumps(filters, separators=(",", ":")).replace("\n", "") logger.warning( - f"Keyword search returned no results - Filters: {filters_log_msg}\tEdited Query: {edited_query}" + f"Keyword search returned no results - Filters: {filters}\tEdited Query: {edited_query}" ) return None diff --git a/backend/danswer/search/semantic_search.py b/backend/danswer/search/semantic_search.py index 632b166d08..6b7083a4f8 100644 --- a/backend/danswer/search/semantic_search.py +++ b/backend/danswer/search/semantic_search.py @@ -1,6 +1,4 @@ -import json from collections.abc import Callable -from uuid import UUID import numpy from sentence_transformers import SentenceTransformer # type: ignore @@ -24,7 +22,7 @@ from danswer.configs.model_configs import SIM_SCORE_RANGE_LOW from danswer.configs.model_configs import SKIP_RERANKING from danswer.datastores.datastore_utils import translate_boost_count_to_multiplier from danswer.datastores.interfaces import DocumentIndex -from danswer.datastores.interfaces import IndexFilter +from danswer.datastores.interfaces import IndexFilters from danswer.search.models import ChunkMetric from danswer.search.models import Embedder from danswer.search.models import MAX_METRICS_CONTENT @@ -179,8 +177,8 @@ def apply_boost( @log_function_time() def retrieve_ranked_documents( query: str, - user_id: UUID | None, - filters: list[IndexFilter] | None, + filters: IndexFilters, + favor_recent: bool, datastore: DocumentIndex, num_hits: int = NUM_RETURNED_HITS, num_rerank: int = NUM_RERANKED_RESULTS, @@ -198,12 +196,9 @@ def retrieve_ranked_documents( files_log_msg = f"Top links from semantic search: {', '.join(doc_links)}" logger.info(files_log_msg) - top_chunks = datastore.semantic_retrieval(query, user_id, filters, num_hits) + top_chunks = datastore.semantic_retrieval(query, filters, favor_recent, num_hits) if not top_chunks: - filters_log_msg = json.dumps(filters, separators=(",", ":")).replace("\n", "") - logger.warning( - f"Semantic search returned no results with filters: {filters_log_msg}" - ) + logger.info(f"Semantic search returned no results with filters: {filters}") return None, None logger.debug(top_chunks) diff --git a/backend/danswer/secondary_llm_flows/extract_filters.py b/backend/danswer/secondary_llm_flows/extract_filters.py new file mode 100644 index 0000000000..8fcc32663f --- /dev/null +++ b/backend/danswer/secondary_llm_flows/extract_filters.py @@ -0,0 +1,196 @@ +import json +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +from dateutil.parser import parse + +from danswer.configs.app_configs import DISABLE_TIME_FILTER_EXTRACTION +from danswer.llm.build import get_default_llm +from danswer.llm.utils import dict_based_prompt_to_langchain_prompt +from danswer.server.models import QuestionRequest +from danswer.utils.logger import setup_logger +from danswer.utils.timing import log_function_time + +logger = setup_logger() + + +def best_match_time(time_str: str) -> datetime | None: + preferred_formats = ["%m/%d/%Y", "%m-%d-%Y"] + + for fmt in preferred_formats: + try: + # As we don't know if the user is interacting with the API server from + # the same timezone as the API server, just assume the queries are UTC time + # the few hours offset (if any) shouldn't make any significant difference + dt = datetime.strptime(time_str, fmt) + return dt.replace(tzinfo=timezone.utc) + except ValueError: + continue + + # If the above formats don't match, try using dateutil's parser + try: + dt = parse(time_str) + return ( + dt.astimezone(timezone.utc) + if dt.tzinfo + else dt.replace(tzinfo=timezone.utc) + ) + except ValueError: + return None + + +@log_function_time() +def extract_time_filter(query: str) -> tuple[datetime | None, bool]: + """Returns a datetime if a hard time filter should be applied for the given query + Additionally returns a bool, True if more recently updated Documents should be + heavily favored""" + + def _get_time_filter_messages(query: str) -> list[dict[str, str]]: + messages = [ + { + "role": "system", + "content": "You are a tool to identify time filters to apply to a user query for " + "a downstream search application. The downstream application is able to " + "use a recency bias or apply a hard cutoff to remove all documents " + "before the cutoff. Identify the correct filters to apply for the user " + "query.\n\n" + "Always answer with ONLY a json which contains the keys " + '"filter_type", "filter_value", "value_multiple" and "date".\n\n' + 'The valid values for "filter_type" are "hard cutoff", ' + '"favors recent", or "not time sensitive".\n' + 'The valid values for "filter_value" are "day", "week", "month", ' + '"quarter", "half", or "year".\n' + 'The valid values for "value_multiple" is any number.\n' + 'The valid values for "date" is a date in format MM/DD/YYYY.', + }, + { + "role": "user", + "content": "What documents in Confluence were written in the last two quarters", + }, + { + "role": "assistant", + "content": json.dumps( + { + "filter_type": "hard cutoff", + "filter_value": "quarter", + "value_multiple": 2, + } + ), + }, + {"role": "user", "content": "What's the latest on project Corgies?"}, + { + "role": "assistant", + "content": json.dumps({"filter_type": "favor recent"}), + }, + { + "role": "user", + "content": "Which customer asked about security features in February of 2022?", + }, + { + "role": "assistant", + "content": json.dumps( + {"filter_type": "hard cutoff", "date": "02/01/2022"} + ), + }, + {"role": "user", "content": query}, + ] + return messages + + def _extract_time_filter_from_llm_out( + model_out: str, + ) -> tuple[datetime | None, bool]: + """Returns a datetime for a hard cutoff and a bool for if the""" + try: + model_json = json.loads(model_out, strict=False) + except json.JSONDecodeError: + return None, False + + # If filter type is not present, just assume something has gone wrong + # Potentially model has identified a date and just returned that but + # better to be conservative and not identify the wrong filter. + if "filter_type" not in model_json: + return None, False + + if "hard" in model_json["filter_type"] or "recent" in model_json["filter_type"]: + favor_recent = "recent" in model_json["filter_type"] + + if "date" in model_json: + extracted_time = best_match_time(model_json["date"]) + if extracted_time is not None: + return extracted_time, favor_recent + + time_diff = None + multiplier = 1.0 + + if "value_multiple" in model_json: + try: + multiplier = float(model_json["value_multiple"]) + except ValueError: + pass + + if "filter_value" in model_json: + filter_value = model_json["filter_value"] + if "day" in filter_value: + time_diff = timedelta(days=multiplier) + elif "week" in filter_value: + time_diff = timedelta(weeks=multiplier) + elif "month" in filter_value: + # Have to just use the average here, too complicated to calculate exact day + # based on current day etc. + time_diff = timedelta(days=multiplier * 30.437) + elif "quarter" in filter_value: + time_diff = timedelta(days=multiplier * 91.25) + elif "year" in filter_value: + time_diff = timedelta(days=multiplier * 365) + + if time_diff is not None: + current = datetime.now(timezone.utc) + return current - time_diff, favor_recent + + # If we failed to extract a hard filter, just pass back the value of favor recent + return None, favor_recent + + return None, False + + messages = _get_time_filter_messages(query) + filled_llm_prompt = dict_based_prompt_to_langchain_prompt(messages) + model_output = get_default_llm().invoke(filled_llm_prompt) + logger.debug(model_output) + + return _extract_time_filter_from_llm_out(model_output) + + +def extract_question_time_filters( + question: QuestionRequest, + disable_llm_extraction: bool = DISABLE_TIME_FILTER_EXTRACTION, +) -> tuple[datetime | None, bool]: + time_cutoff = question.filters.time_cutoff + favor_recent = question.favor_recent + # Frontend needs to be able to set this flag so that if user deletes the time filter, + # we don't automatically reapply it. The env variable is a global disable of this feature + # for the sake of latency + if not question.enable_auto_detect_filters or disable_llm_extraction: + if favor_recent is None: + favor_recent = False + return time_cutoff, favor_recent + + llm_cutoff, llm_favor_recent = extract_time_filter(question.query) + + # For all extractable filters, don't overwrite the provided values if any is provided + if time_cutoff is None: + time_cutoff = llm_cutoff + + if favor_recent is None: + favor_recent = llm_favor_recent + + return time_cutoff, favor_recent + + +if __name__ == "__main__": + # Just for testing purposes, too tedious to unit test as it relies on an LLM + while True: + user_input = input("Query to Extract Time: ") + cutoff, recency_bias = extract_time_filter(user_input) + print(f"Time Cutoff: {cutoff}") + print(f"Favor Recent: {recency_bias}") diff --git a/backend/danswer/server/chat_backend.py b/backend/danswer/server/chat_backend.py index d682a824a0..2f706887b5 100644 --- a/backend/danswer/server/chat_backend.py +++ b/backend/danswer/server/chat_backend.py @@ -308,8 +308,9 @@ def handle_new_chat_message( response_packets = llm_chat_answer( messages=mainline_messages, persona=persona, - user_id=user_id, tokenizer=llm_tokenizer, + user=user, + db_session=db_session, ) llm_output = "" fetched_docs: RetrievalDocs | None = None @@ -397,8 +398,9 @@ def regenerate_message_given_parent( response_packets = llm_chat_answer( messages=mainline_messages, persona=persona, - user_id=user_id, tokenizer=llm_tokenizer, + user=user, + db_session=db_session, ) llm_output = "" fetched_docs: RetrievalDocs | None = None diff --git a/backend/danswer/server/models.py b/backend/danswer/server/models.py index aa20b4eebc..2178553735 100644 --- a/backend/danswer/server/models.py +++ b/backend/danswer/server/models.py @@ -18,7 +18,6 @@ from danswer.configs.constants import MessageType from danswer.configs.constants import QAFeedbackType from danswer.configs.constants import SearchFeedbackType from danswer.connectors.models import InputType -from danswer.datastores.interfaces import IndexFilter from danswer.db.models import AllowedAnswerFilters from danswer.db.models import ChannelConfig from danswer.db.models import Connector @@ -166,18 +165,32 @@ class RerankedRetrievalDocs(RetrievalDocs): unranked_top_documents: list[SearchDoc] predicted_flow: QueryFlow predicted_search: SearchType + time_cutoff: datetime | None + favor_recent: bool class CreateChatSessionID(BaseModel): chat_session_id: int +class RequestFilters(BaseModel): + source_type: list[str] | None + document_set: list[str] | None + time_cutoff: datetime | None = None + + +class IndexFilters(RequestFilters): + access_control_list: list[str] + + class QuestionRequest(BaseModel): query: str collection: str use_keyword: bool | None - filters: list[IndexFilter] | None + filters: RequestFilters offset: int | None + enable_auto_detect_filters: bool + favor_recent: bool | None = None class QAFeedbackRequest(BaseModel): @@ -266,6 +279,8 @@ class SearchResponse(BaseModel): top_ranked_docs: list[SearchDoc] | None lower_ranked_docs: list[SearchDoc] | None query_event_id: int + time_cutoff: datetime | None + favor_recent: bool class QAResponse(SearchResponse): diff --git a/backend/danswer/server/search_backend.py b/backend/danswer/server/search_backend.py index 1343213ca0..27381b2e04 100644 --- a/backend/danswer/server/search_backend.py +++ b/backend/danswer/server/search_backend.py @@ -14,7 +14,6 @@ from danswer.configs.app_configs import DISABLE_GENERATIVE_AI from danswer.configs.app_configs import NUM_DOCUMENT_TOKENS_FED_TO_GENERATIVE_MODEL from danswer.configs.constants import IGNORE_FOR_QA from danswer.datastores.document_index import get_default_document_index -from danswer.datastores.interfaces import IndexFilter from danswer.datastores.vespa.store import VespaIndex from danswer.db.engine import get_session from danswer.db.feedback import create_doc_retrieval_feedback @@ -36,9 +35,11 @@ from danswer.search.models import QueryFlow from danswer.search.models import SearchType from danswer.search.semantic_search import chunks_to_search_docs from danswer.search.semantic_search import retrieve_ranked_documents +from danswer.secondary_llm_flows.extract_filters import extract_question_time_filters from danswer.secondary_llm_flows.query_validation import get_query_answerability from danswer.secondary_llm_flows.query_validation import stream_query_answerability from danswer.server.models import HelperResponse +from danswer.server.models import IndexFilters from danswer.server.models import QAFeedbackRequest from danswer.server.models import QAResponse from danswer.server.models import QueryValidationResponse @@ -61,7 +62,7 @@ router = APIRouter() class AdminSearchRequest(BaseModel): query: str - filters: list[IndexFilter] | None = None + filters: IndexFilters class AdminSearchResponse(BaseModel): @@ -78,9 +79,13 @@ def admin_search( filters = question.filters logger.info(f"Received admin search query: {query}") - user_id = None if user is None else user.id user_acl_filters = build_access_filters_for_user(user, db_session) - final_filters = (filters or []) + user_acl_filters + final_filters = IndexFilters( + source_type=filters.source_type, + document_set=filters.document_set, + time_cutoff=filters.time_cutoff, + access_control_list=user_acl_filters, + ) document_index = get_default_document_index() if not isinstance(document_index, VespaIndex): raise HTTPException( @@ -88,9 +93,7 @@ def admin_search( detail="Cannot use admin-search when using a non-Vespa document index", ) - matching_chunks = document_index.admin_retrieval( - query=query, user_id=user_id, filters=final_filters - ) + matching_chunks = document_index.admin_retrieval(query=query, filters=final_filters) documents = chunks_to_search_docs(matching_chunks) @@ -142,9 +145,12 @@ def semantic_search( db_session: Session = Depends(get_session), ) -> SearchResponse: query = question.query - filters = question.filters logger.info(f"Received semantic search query: {query}") + time_cutoff, favor_recent = extract_question_time_filters(question) + question.filters.time_cutoff = time_cutoff + filters = question.filters + query_event_id = create_query_event( query=query, selected_flow=SearchType.SEMANTIC, @@ -155,13 +161,25 @@ def semantic_search( user_id = None if user is None else user.id user_acl_filters = build_access_filters_for_user(user, db_session) - final_filters = (filters or []) + user_acl_filters + final_filters = IndexFilters( + source_type=filters.source_type, + document_set=filters.document_set, + time_cutoff=filters.time_cutoff, + access_control_list=user_acl_filters, + ) ranked_chunks, unranked_chunks = retrieve_ranked_documents( - query, user_id, final_filters, get_default_document_index() + query=query, + filters=final_filters, + favor_recent=favor_recent, + datastore=get_default_document_index(), ) if not ranked_chunks: return SearchResponse( - top_ranked_docs=None, lower_ranked_docs=None, query_event_id=query_event_id + top_ranked_docs=None, + lower_ranked_docs=None, + query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, ) top_docs = chunks_to_search_docs(ranked_chunks) @@ -177,6 +195,8 @@ def semantic_search( top_ranked_docs=top_docs, lower_ranked_docs=other_top_docs, query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, ) @@ -187,9 +207,12 @@ def keyword_search( db_session: Session = Depends(get_session), ) -> SearchResponse: query = question.query - filters = question.filters logger.info(f"Received keyword search query: {query}") + time_cutoff, favor_recent = extract_question_time_filters(question) + question.filters.time_cutoff = time_cutoff + filters = question.filters + query_event_id = create_query_event( query=query, selected_flow=SearchType.KEYWORD, @@ -200,13 +223,25 @@ def keyword_search( user_id = None if user is None else user.id user_acl_filters = build_access_filters_for_user(user, db_session) - final_filters = (filters or []) + user_acl_filters + final_filters = IndexFilters( + source_type=filters.source_type, + document_set=filters.document_set, + time_cutoff=filters.time_cutoff, + access_control_list=user_acl_filters, + ) ranked_chunks = retrieve_keyword_documents( - query, user_id, final_filters, get_default_document_index() + query=query, + filters=final_filters, + favor_recent=favor_recent, + datastore=get_default_document_index(), ) if not ranked_chunks: return SearchResponse( - top_ranked_docs=None, lower_ranked_docs=None, query_event_id=query_event_id + top_ranked_docs=None, + lower_ranked_docs=None, + query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, ) top_docs = chunks_to_search_docs(ranked_chunks) @@ -218,7 +253,11 @@ def keyword_search( ) return SearchResponse( - top_ranked_docs=top_docs, lower_ranked_docs=None, query_event_id=query_event_id + top_ranked_docs=top_docs, + lower_ranked_docs=None, + query_event_id=query_event_id, + time_cutoff=time_cutoff, + favor_recent=favor_recent, ) @@ -228,6 +267,8 @@ def direct_qa( user: User | None = Depends(current_user), db_session: Session = Depends(get_session), ) -> QAResponse: + # Everything handled via answer_qa_query which is also used by default + # for the DanswerBot flow return answer_qa_query(question=question, user=user, db_session=db_session) @@ -255,31 +296,39 @@ def stream_direct_qa( ) -> Generator[str, None, None]: answer_so_far: str = "" query = question.query - filters = question.filters use_keyword = question.use_keyword offset_count = question.offset if question.offset is not None else 0 + time_cutoff, favor_recent = extract_question_time_filters(question) + question.filters.time_cutoff = time_cutoff + filters = question.filters + predicted_search, predicted_flow = query_intent(query) if use_keyword is None: use_keyword = predicted_search == SearchType.KEYWORD user_id = None if user is None else user.id user_acl_filters = build_access_filters_for_user(user, db_session) - final_filters = (filters or []) + user_acl_filters + final_filters = IndexFilters( + source_type=filters.source_type, + document_set=filters.document_set, + time_cutoff=filters.time_cutoff, + access_control_list=user_acl_filters, + ) if use_keyword: ranked_chunks: list[InferenceChunk] | None = retrieve_keyword_documents( - query, - user_id, - final_filters, - get_default_document_index(), + query=query, + filters=final_filters, + favor_recent=favor_recent, + datastore=get_default_document_index(), ) unranked_chunks: list[InferenceChunk] | None = [] else: ranked_chunks, unranked_chunks = retrieve_ranked_documents( - query, - user_id, - final_filters, - get_default_document_index(), + query=query, + filters=final_filters, + favor_recent=favor_recent, + datastore=get_default_document_index(), ) if not ranked_chunks: logger.debug("No Documents Found") @@ -304,6 +353,8 @@ def stream_direct_qa( if disable_generative_answer else predicted_flow, predicted_search=predicted_search, + time_cutoff=time_cutoff, + favor_recent=favor_recent, ).dict() logger.debug(send_packet_debug_msg.format(initial_response)) diff --git a/backend/danswer/utils/clients.py b/backend/danswer/utils/clients.py deleted file mode 100644 index db5d47b9f0..0000000000 --- a/backend/danswer/utils/clients.py +++ /dev/null @@ -1,49 +0,0 @@ -import typesense # type: ignore -from qdrant_client import QdrantClient - -from danswer.configs.app_configs import QDRANT_API_KEY -from danswer.configs.app_configs import QDRANT_HOST -from danswer.configs.app_configs import QDRANT_PORT -from danswer.configs.app_configs import QDRANT_URL -from danswer.configs.app_configs import TYPESENSE_API_KEY -from danswer.configs.app_configs import TYPESENSE_HOST -from danswer.configs.app_configs import TYPESENSE_PORT - - -_qdrant_client: QdrantClient | None = None -_typesense_client: typesense.Client | None = None - - -def get_qdrant_client() -> QdrantClient: - global _qdrant_client - if _qdrant_client is None: - if QDRANT_URL and QDRANT_API_KEY: - _qdrant_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY) - elif QDRANT_HOST and QDRANT_PORT: - _qdrant_client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT) - else: - raise Exception("Unable to instantiate QdrantClient") - - return _qdrant_client - - -def get_typesense_client() -> typesense.Client: - global _typesense_client - if _typesense_client is None: - if TYPESENSE_HOST and TYPESENSE_PORT and TYPESENSE_API_KEY: - _typesense_client = typesense.Client( - { - "api_key": TYPESENSE_API_KEY, - "nodes": [ - { - "host": TYPESENSE_HOST, - "port": str(TYPESENSE_PORT), - "protocol": "http", - } - ], - } - ) - else: - raise Exception("Unable to instantiate TypesenseClient") - - return _typesense_client diff --git a/backend/requirements/default.txt b/backend/requirements/default.txt index bf5bf2ac29..de36fc8aba 100644 --- a/backend/requirements/default.txt +++ b/backend/requirements/default.txt @@ -37,7 +37,6 @@ PyGithub==1.58.2 pypdf==3.16.4 pytest-playwright==0.3.2 python-multipart==0.0.6 -qdrant-client==1.2.0 requests==2.31.0 requests-oauthlib==1.3.1 retry==0.9.2 # This pulls in py which is in CVE-2022-42969, must remove py from image @@ -53,7 +52,6 @@ tiktoken==0.4.0 torch==2.0.1 torchvision==0.15.2 transformers==4.30.1 -typesense==0.15.1 uvicorn==0.21.1 zulip==0.8.2 hubspot-api-client==8.1.0 diff --git a/backend/scripts/dev_run_celery.py b/backend/scripts/dev_run_celery.py index a83ccee09c..da3cb71424 100644 --- a/backend/scripts/dev_run_celery.py +++ b/backend/scripts/dev_run_celery.py @@ -1,3 +1,4 @@ +# This file is purely for development use, not included in any builds import subprocess import threading diff --git a/backend/scripts/list_typesense_docs.py b/backend/scripts/list_typesense_docs.py deleted file mode 100644 index 9cb6823d40..0000000000 --- a/backend/scripts/list_typesense_docs.py +++ /dev/null @@ -1,25 +0,0 @@ -from danswer.configs.app_configs import DOCUMENT_INDEX_NAME -from danswer.utils.clients import get_typesense_client - - -if __name__ == "__main__": - ts_client = get_typesense_client() - - page_number = 1 - per_page = 100 # number of documents to retrieve per page - while True: - params = { - "q": "", - "query_by": "content", - "page": page_number, - "per_page": per_page, - } - response = ts_client.collections[DOCUMENT_INDEX_NAME].documents.search(params) - documents = response.get("hits") - if not documents: - break # if there are no more documents, break out of the loop - - for document in documents: - print(document) - - page_number += 1 # move on to the next page diff --git a/backend/scripts/reset_indexes.py b/backend/scripts/reset_indexes.py index 2158bc18c0..9e8c86aad6 100644 --- a/backend/scripts/reset_indexes.py +++ b/backend/scripts/reset_indexes.py @@ -1,46 +1,13 @@ # This file is purely for development use, not included in any builds import requests -from qdrant_client.http.models import Distance -from qdrant_client.http.models import VectorParams -from typesense.exceptions import ObjectNotFound # type: ignore from danswer.configs.app_configs import DOCUMENT_INDEX_NAME -from danswer.configs.model_configs import DOC_EMBEDDING_DIM -from danswer.datastores.document_index import get_default_document_index -from danswer.datastores.document_index import SplitDocumentIndex -from danswer.datastores.typesense.store import create_typesense_collection from danswer.datastores.vespa.store import DOCUMENT_ID_ENDPOINT -from danswer.datastores.vespa.store import VespaIndex -from danswer.utils.clients import get_qdrant_client -from danswer.utils.clients import get_typesense_client from danswer.utils.logger import setup_logger logger = setup_logger() -def recreate_qdrant_collection( - collection_name: str, embedding_dim: int = DOC_EMBEDDING_DIM -) -> None: - logger.info(f"Attempting to recreate Qdrant collection {collection_name}") - result = get_qdrant_client().recreate_collection( - collection_name=collection_name, - vectors_config=VectorParams(size=embedding_dim, distance=Distance.COSINE), - ) - if not result: - raise RuntimeError("Could not create Qdrant collection") - - -def recreate_typesense_collection(collection_name: str) -> None: - logger.info(f"Attempting to recreate Typesense collection {collection_name}") - ts_client = get_typesense_client() - try: - ts_client.collections[collection_name].delete() - except ObjectNotFound: - logger.debug(f"Collection {collection_name} does not already exist") - - create_typesense_collection(collection_name) - - def wipe_vespa_index() -> None: params = {"selection": "true", "cluster": DOCUMENT_INDEX_NAME} response = requests.delete(DOCUMENT_ID_ENDPOINT, params=params) @@ -48,9 +15,4 @@ def wipe_vespa_index() -> None: if __name__ == "__main__": - document_index = get_default_document_index() - if isinstance(document_index, SplitDocumentIndex): - recreate_qdrant_collection("danswer_index") - recreate_typesense_collection("danswer_index") - elif isinstance(document_index, VespaIndex): - wipe_vespa_index() + wipe_vespa_index() diff --git a/backend/scripts/save_load_state.py b/backend/scripts/save_load_state.py index 1b21676078..cfb0c3f7e3 100644 --- a/backend/scripts/save_load_state.py +++ b/backend/scripts/save_load_state.py @@ -3,28 +3,17 @@ import argparse import json import os import subprocess -from datetime import datetime import requests -from qdrant_client.http.models.models import SnapshotDescription -from typesense.exceptions import ObjectNotFound # type: ignore from alembic import command from alembic.config import Config -from danswer.configs.app_configs import DOCUMENT_INDEX_NAME from danswer.configs.app_configs import POSTGRES_DB from danswer.configs.app_configs import POSTGRES_HOST from danswer.configs.app_configs import POSTGRES_PASSWORD from danswer.configs.app_configs import POSTGRES_PORT from danswer.configs.app_configs import POSTGRES_USER -from danswer.configs.app_configs import QDRANT_HOST -from danswer.configs.app_configs import QDRANT_PORT -from danswer.datastores.qdrant.utils import create_qdrant_collection -from danswer.datastores.qdrant.utils import list_qdrant_collections -from danswer.datastores.typesense.store import create_typesense_collection from danswer.datastores.vespa.store import DOCUMENT_ID_ENDPOINT -from danswer.utils.clients import get_qdrant_client -from danswer.utils.clients import get_typesense_client from danswer.utils.logger import setup_logger logger = setup_logger() @@ -51,80 +40,6 @@ def load_postgres(filename: str) -> None: ) -def snapshot_time_compare(snap: SnapshotDescription) -> datetime: - if not hasattr(snap, "creation_time") or snap.creation_time is None: - raise RuntimeError("Qdrant Snapshots Failed") - return datetime.strptime(snap.creation_time, "%Y-%m-%dT%H:%M:%S") - - -def save_qdrant(filename: str) -> None: - logger.info("Attempting to take Qdrant snapshot") - qdrant_client = get_qdrant_client() - qdrant_client.create_snapshot(collection_name=DOCUMENT_INDEX_NAME) - snapshots = qdrant_client.list_snapshots(collection_name=DOCUMENT_INDEX_NAME) - valid_snapshots = [snap for snap in snapshots if snap.creation_time is not None] - - sorted_snapshots = sorted(valid_snapshots, key=snapshot_time_compare) - last_snapshot_name = sorted_snapshots[-1].name - url = f"http://{QDRANT_HOST}:{QDRANT_PORT}/collections/{DOCUMENT_INDEX_NAME}/snapshots/{last_snapshot_name}" - - response = requests.get(url, stream=True) - - if response.status_code != 200: - raise RuntimeError("Qdrant Save Failed") - - with open(filename, "wb") as file: - for chunk in response.iter_content(chunk_size=8192): - file.write(chunk) - - -def load_qdrant(filename: str) -> None: - logger.info("Attempting to load Qdrant snapshot") - if DOCUMENT_INDEX_NAME not in { - collection.name for collection in list_qdrant_collections().collections - }: - create_qdrant_collection(DOCUMENT_INDEX_NAME) - snapshot_url = f"http://{QDRANT_HOST}:{QDRANT_PORT}/collections/{DOCUMENT_INDEX_NAME}/snapshots/" - - with open(filename, "rb") as f: - files = {"snapshot": (os.path.basename(filename), f)} - response = requests.post(snapshot_url + "upload", files=files) - if response.status_code != 200: - raise RuntimeError("Qdrant Snapshot Upload Failed") - - data = {"location": snapshot_url + os.path.basename(filename)} - headers = {"Content-Type": "application/json"} - response = requests.put( - snapshot_url + "recover", data=json.dumps(data), headers=headers - ) - if response.status_code != 200: - raise RuntimeError("Loading Qdrant Snapshot Failed") - - -def save_typesense(filename: str) -> None: - logger.info("Attempting to take Typesense snapshot") - ts_client = get_typesense_client() - all_docs = ts_client.collections[DOCUMENT_INDEX_NAME].documents.export() - with open(filename, "w") as f: - f.write(all_docs) - - -def load_typesense(filename: str) -> None: - logger.info("Attempting to load Typesense snapshot") - ts_client = get_typesense_client() - try: - ts_client.collections[DOCUMENT_INDEX_NAME].delete() - except ObjectNotFound: - pass - - create_typesense_collection(DOCUMENT_INDEX_NAME) - - with open(filename) as jsonl_file: - ts_client.collections[DOCUMENT_INDEX_NAME].documents.import_( - jsonl_file.read().encode("utf-8"), {"action": "create"} - ) - - def save_vespa(filename: str) -> None: logger.info("Attempting to take Vespa snapshot") continuation = "" @@ -189,10 +104,6 @@ if __name__ == "__main__": if args.load: load_postgres(os.path.join(checkpoint_dir, "postgres_snapshot.tar")) load_vespa(os.path.join(checkpoint_dir, "vespa_snapshot.jsonl")) - # load_qdrant(os.path.join(checkpoint_dir, "qdrant.snapshot")) - # load_typesense(os.path.join(checkpoint_dir, "typesense_snapshot.jsonl")) else: save_postgres(os.path.join(checkpoint_dir, "postgres_snapshot.tar")) save_vespa(os.path.join(checkpoint_dir, "vespa_snapshot.jsonl")) - # save_qdrant(os.path.join(checkpoint_dir, "qdrant.snapshot")) - # save_typesense(os.path.join(checkpoint_dir, "typesense_snapshot.jsonl")) diff --git a/backend/tests/regression/answer_quality/eval_direct_qa.py b/backend/tests/regression/answer_quality/eval_direct_qa.py index ea7875b1c1..f08c9e3d77 100644 --- a/backend/tests/regression/answer_quality/eval_direct_qa.py +++ b/backend/tests/regression/answer_quality/eval_direct_qa.py @@ -8,11 +8,13 @@ from typing import TextIO import yaml from sqlalchemy.orm import Session +from danswer.access.access import get_acl_for_user from danswer.db.engine import get_sqlalchemy_engine from danswer.direct_qa.answer_question import answer_qa_query from danswer.direct_qa.models import LLMMetricsContainer from danswer.search.models import RerankMetricsContainer from danswer.search.models import RetrievalMetricsContainer +from danswer.server.models import IndexFilters from danswer.server.models import QuestionRequest from danswer.utils.callbacks import MetricsHander @@ -74,11 +76,18 @@ def get_answer_for_question( RerankMetricsContainer | None, LLMMetricsContainer | None, ]: + filters = IndexFilters( + source_type=None, + document_set=None, + time_cutoff=None, + access_control_list=list(get_acl_for_user(user=None)), + ) question = QuestionRequest( query=query, collection="danswer_index", use_keyword=False, - filters=None, + filters=filters, + enable_auto_detect_filters=False, offset=None, ) diff --git a/deployment/README.md b/deployment/README.md index 297bd12b26..a9dcbc429a 100644 --- a/deployment/README.md +++ b/deployment/README.md @@ -27,7 +27,6 @@ Requirements: Docker and docker compose - `docker compose -f docker-compose.dev.yml -p danswer-stack up -d --pull always --force-recreate` - or run: `docker compose -f docker-compose.dev.yml -p danswer-stack up -d --build --force-recreate` to build from source - - This will start Web/API servers, Postgres (backend DB), Qdrant (vector DB), and the background indexing job. - Downloading images or packages/requirements may take 15+ minutes depending on your internet connection. diff --git a/deployment/docker_compose/docker-compose.dev.legacy.yml b/deployment/docker_compose/docker-compose.dev.legacy.yml deleted file mode 100644 index 56bfa572cc..0000000000 --- a/deployment/docker_compose/docker-compose.dev.legacy.yml +++ /dev/null @@ -1,154 +0,0 @@ -# This legacy version runs the app with typesense and qdrant together as the document indices -# Danswer is moving forward with Vespa to offer a consolidated index and a better search experience -version: '3' -services: - api_server: - image: danswer/danswer-backend:latest - build: - context: ../../backend - dockerfile: Dockerfile - command: > - /bin/sh -c "alembic upgrade head && - echo \"Starting Danswer Api Server\" && - uvicorn danswer.main:app --host 0.0.0.0 --port 8080" - depends_on: - - relational_db - - vector_db - - search_engine - restart: always - ports: - - "8080:8080" - environment: - - DOCUMENT_INDEX_TYPE=split - - INTERNAL_MODEL_VERSION=${INTERNAL_MODEL_VERSION:-openai-chat-completion} - - GEN_AI_MODEL_VERSION=${GEN_AI_MODEL_VERSION:-gpt-3.5-turbo} - - GEN_AI_API_KEY=${GEN_AI_API_KEY:-} - - GEN_AI_ENDPOINT=${GEN_AI_ENDPOINT:-} - - GEN_AI_HOST_TYPE=${GEN_AI_HOST_TYPE:-} - - POSTGRES_HOST=relational_db - - QDRANT_HOST=vector_db - - TYPESENSE_HOST=search_engine - - TYPESENSE_API_KEY=${TYPESENSE_API_KEY:-typesense_api_key} - - LOG_LEVEL=${LOG_LEVEL:-info} - - AUTH_TYPE=${AUTH_TYPE:-disabled} - - QA_TIMEOUT=${QA_TIMEOUT:-} - - GOOGLE_OAUTH_CLIENT_ID=${GOOGLE_OAUTH_CLIENT_ID:-} - - GOOGLE_OAUTH_CLIENT_SECRET=${GOOGLE_OAUTH_CLIENT_SECRET:-} - - DISABLE_GENERATIVE_AI=${DISABLE_GENERATIVE_AI:-} - - API_BASE_OPENAI=${API_BASE_OPENAI:-} - - API_TYPE_OPENAI=${API_TYPE_OPENAI:-} - - API_VERSION_OPENAI=${API_VERSION_OPENAI:-} - - AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-} - volumes: - - local_dynamic_storage:/home/storage - - file_connector_tmp_storage:/home/file_connector_storage - - model_cache_torch:/root/.cache/torch/ - - model_cache_nltk:/root/nltk_data/ - - model_cache_huggingface:/root/.cache/huggingface/ - background: - image: danswer/danswer-backend:latest - build: - context: ../../backend - dockerfile: Dockerfile - command: /usr/bin/supervisord - depends_on: - - relational_db - - vector_db - restart: always - environment: - - DOCUMENT_INDEX_TYPE=split - - INTERNAL_MODEL_VERSION=${INTERNAL_MODEL_VERSION:-openai-chat-completion} - - GEN_AI_MODEL_VERSION=${GEN_AI_MODEL_VERSION:-gpt-3.5-turbo} - - GEN_AI_API_KEY=${GEN_AI_API_KEY:-} - - GEN_AI_ENDPOINT=${GEN_AI_ENDPOINT:-} - - GEN_AI_HOST_TYPE=${GEN_AI_HOST_TYPE:-} - - NUM_DOCUMENT_TOKENS_FED_TO_GENERATIVE_MODEL=${NUM_DOCUMENT_TOKENS_FED_TO_GENERATIVE_MODEL:-} - - POSTGRES_HOST=relational_db - - QDRANT_HOST=vector_db - - TYPESENSE_HOST=search_engine - - TYPESENSE_API_KEY=${TYPESENSE_API_KEY:-typesense_api_key} - - API_BASE_OPENAI=${API_BASE_OPENAI:-} - - API_TYPE_OPENAI=${API_TYPE_OPENAI:-} - - API_VERSION_OPENAI=${API_VERSION_OPENAI:-} - - AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-} - - CONTINUE_ON_CONNECTOR_FAILURE=${CONTINUE_ON_CONNECTOR_FAILURE:-} - - NUM_INDEXING_WORKERS=${NUM_INDEXING_WORKERS:-} - - DANSWER_BOT_SLACK_APP_TOKEN=${DANSWER_BOT_SLACK_APP_TOKEN:-} - - DANSWER_BOT_SLACK_BOT_TOKEN=${DANSWER_BOT_SLACK_BOT_TOKEN:-} - - LOG_LEVEL=${LOG_LEVEL:-info} - volumes: - - local_dynamic_storage:/home/storage - - file_connector_tmp_storage:/home/file_connector_storage - - model_cache_torch:/root/.cache/torch/ - - model_cache_nltk:/root/nltk_data/ - - model_cache_huggingface:/root/.cache/huggingface/ - web_server: - image: danswer/danswer-web-server:latest - build: - context: ../../web - dockerfile: Dockerfile - args: - - NEXT_PUBLIC_DISABLE_STREAMING=${NEXT_PUBLIC_DISABLE_STREAMING:-false} - depends_on: - - api_server - restart: always - environment: - - INTERNAL_URL=http://api_server:8080 - - WEB_DOMAIN=${WEB_DOMAIN:-} - - OAUTH_NAME=${OAUTH_NAME:-} - relational_db: - image: postgres:15.2-alpine - restart: always - environment: - - POSTGRES_USER=${POSTGRES_USER:-postgres} - - POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password} - ports: - - "5432:5432" - volumes: - - db_volume:/var/lib/postgresql/data - vector_db: - image: qdrant/qdrant:v1.3.0 - restart: always - environment: - - QDRANT__TELEMETRY_DISABLED=true - ports: - - "6333:6333" - volumes: - - qdrant_volume:/qdrant/storage - search_engine: - image: typesense/typesense:0.24.1 - restart: always - environment: - - TYPESENSE_API_KEY=${TYPESENSE_API_KEY:-typesense_api_key} - - TYPESENSE_DATA_DIR=/typesense/storage - ports: - - "8108:8108" - volumes: - - typesense_volume:/typesense/storage - nginx: - image: nginx:1.23.4-alpine - restart: always - # nginx will immediately crash with `nginx: [emerg] host not found in upstream` - # if api_server / web_server are not up - depends_on: - - api_server - - web_server - environment: - - DOMAIN=localhost - ports: - - "80:80" - - "3000:80" # allow for localhost:3000 usage, since that is the norm - volumes: - - ../data/nginx:/etc/nginx/conf.d - command: > - /bin/sh -c "envsubst '$$\{DOMAIN\}' < /etc/nginx/conf.d/app.conf.template.dev > /etc/nginx/conf.d/app.conf && - while :; do sleep 6h & wait $${!}; nginx -s reload; done & nginx -g \"daemon off;\"" -volumes: - local_dynamic_storage: - file_connector_tmp_storage: # used to store files uploaded by the user temporarily while we are indexing them - db_volume: - qdrant_volume: - typesense_volume: - model_cache_torch: - model_cache_nltk: - model_cache_huggingface: diff --git a/deployment/docker_compose/docker-compose.dev.yml b/deployment/docker_compose/docker-compose.dev.yml index 63f4fad42d..f9c24bf53e 100644 --- a/deployment/docker_compose/docker-compose.dev.yml +++ b/deployment/docker_compose/docker-compose.dev.yml @@ -34,6 +34,8 @@ services: - API_TYPE_OPENAI=${API_TYPE_OPENAI:-} - API_VERSION_OPENAI=${API_VERSION_OPENAI:-} - AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-} + - NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP=${NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP:-} + - DISABLE_TIME_FILTER_EXTRACTION=${DISABLE_TIME_FILTER_EXTRACTION:-} # Don't change the NLP model configs unless you know what you're doing - DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-} - NORMALIZE_EMBEDDINGS=${NORMALIZE_EMBEDDINGS:-} @@ -44,7 +46,6 @@ services: - SKIP_RERANKING=${SKIP_RERANKING:-} # Set to debug to get more fine-grained logs - LOG_LEVEL=${LOG_LEVEL:-info} - - NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP=${NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP:-} volumes: - local_dynamic_storage:/home/storage - file_connector_tmp_storage:/home/file_connector_storage diff --git a/deployment/docker_compose/docker-compose.prod.legacy.yml b/deployment/docker_compose/docker-compose.prod.legacy.yml deleted file mode 100644 index f69290ab77..0000000000 --- a/deployment/docker_compose/docker-compose.prod.legacy.yml +++ /dev/null @@ -1,132 +0,0 @@ -version: '3' -services: - api_server: - image: danswer/danswer-backend:latest - build: - context: ../../backend - dockerfile: Dockerfile - command: > - /bin/sh -c "alembic upgrade head && - echo \"Starting Danswer Api Server\" && - uvicorn danswer.main:app --host 0.0.0.0 --port 8080" - depends_on: - - relational_db - - vector_db - - search_engine - restart: always - env_file: - - .env - environment: - - AUTH_TYPE=${AUTH_TYPE:-google_oauth} - - DOCUMENT_INDEX_TYPE=split - - POSTGRES_HOST=relational_db - - QDRANT_HOST=vector_db - - TYPESENSE_HOST=search_engine - volumes: - - local_dynamic_storage:/home/storage - - file_connector_tmp_storage:/home/file_connector_storage - - model_cache_torch:/root/.cache/torch/ - - model_cache_nltk:/root/nltk_data/ - - model_cache_huggingface:/root/.cache/huggingface/ - background: - image: danswer/danswer-backend:latest - build: - context: ../../backend - dockerfile: Dockerfile - command: /usr/bin/supervisord - depends_on: - - relational_db - - vector_db - restart: always - env_file: - - .env - environment: - - AUTH_TYPE=${AUTH_TYPE:-google_oauth} - - DOCUMENT_INDEX_TYPE=split - - POSTGRES_HOST=relational_db - - QDRANT_HOST=vector_db - - TYPESENSE_HOST=search_engine - volumes: - - local_dynamic_storage:/home/storage - - file_connector_tmp_storage:/home/file_connector_storage - - model_cache_torch:/root/.cache/torch/ - - model_cache_nltk:/root/nltk_data/ - - model_cache_huggingface:/root/.cache/huggingface/ - web_server: - image: danswer/danswer-web-server:latest - build: - context: ../../web - dockerfile: Dockerfile - args: - - NEXT_PUBLIC_DISABLE_STREAMING=${NEXT_PUBLIC_DISABLE_STREAMING:-false} - depends_on: - - api_server - restart: always - env_file: - - .env - environment: - - INTERNAL_URL=http://api_server:8080 - relational_db: - image: postgres:15.2-alpine - restart: always - # POSTGRES_USER and POSTGRES_PASSWORD should be set in .env file - env_file: - - .env - volumes: - - db_volume:/var/lib/postgresql/data - vector_db: - image: qdrant/qdrant:v1.3.0 - restart: always - env_file: - - .env - environment: - - QDRANT__TELEMETRY_DISABLED=true - volumes: - - qdrant_volume:/qdrant/storage - search_engine: - image: typesense/typesense:0.24.1 - restart: always - # TYPESENSE_API_KEY must be set in .env file - environment: - - TYPESENSE_DATA_DIR=/typesense/storage - env_file: - - .env - volumes: - - typesense_volume:/typesense/storage - nginx: - image: nginx:1.23.4-alpine - restart: always - # nginx will immediately crash with `nginx: [emerg] host not found in upstream` - # if api_server / web_server are not up - depends_on: - - api_server - - web_server - ports: - - "80:80" - - "443:443" - volumes: - - ../data/nginx:/etc/nginx/conf.d - - ../data/certbot/conf:/etc/letsencrypt - - ../data/certbot/www:/var/www/certbot - command: > - /bin/sh -c "envsubst '$$\{DOMAIN\}' < /etc/nginx/conf.d/app.conf.template > /etc/nginx/conf.d/app.conf - && while :; do sleep 6h & wait $${!}; nginx -s reload; done & nginx -g \"daemon off;\"" - env_file: - - .env.nginx - # follows https://pentacent.medium.com/nginx-and-lets-encrypt-with-docker-in-less-than-5-minutes-b4b8a60d3a71 - certbot: - image: certbot/certbot - restart: always - volumes: - - ../data/certbot/conf:/etc/letsencrypt - - ../data/certbot/www:/var/www/certbot - entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'" -volumes: - local_dynamic_storage: - file_connector_tmp_storage: # used to store files uploaded by the user temporarily while we are indexing them - db_volume: - qdrant_volume: - typesense_volume: - model_cache_torch: - model_cache_nltk: - model_cache_huggingface: diff --git a/web/src/lib/search/qa.ts b/web/src/lib/search/qa.ts index c4b1b8b818..d594bccbdf 100644 --- a/web/src/lib/search/qa.ts +++ b/web/src/lib/search/qa.ts @@ -36,11 +36,8 @@ export const searchRequest = async ({ query, collection: "danswer_index", use_keyword: useKeyword, - ...(filters.length > 0 - ? { - filters, - } - : {}), + filters, + enable_auto_detect_filters: false, offset: offset, }), headers: { diff --git a/web/src/lib/search/streamingQa.ts b/web/src/lib/search/streamingQa.ts index bf767f4c95..92815db1e2 100644 --- a/web/src/lib/search/streamingQa.ts +++ b/web/src/lib/search/streamingQa.ts @@ -82,11 +82,8 @@ export const searchRequestStreamed = async ({ query, collection: "danswer_index", use_keyword: useKeyword, - ...(filters.length > 0 - ? { - filters, - } - : {}), + filters, + enable_auto_detect_filters: false, offset: offset, }), headers: { diff --git a/web/src/lib/search/streamingQuestionValidation.ts b/web/src/lib/search/streamingQuestionValidation.ts index 703c8a0db2..4fc92da7c7 100644 --- a/web/src/lib/search/streamingQuestionValidation.ts +++ b/web/src/lib/search/streamingQuestionValidation.ts @@ -10,13 +10,20 @@ export const questionValidationStreamed = async ({ query, update, }: QuestionValidationArgs) => { + const emptyFilters = { + source_type: null, + document_set: null, + time_cutoff: null, + }; + const response = await fetch("/api/stream-query-validation", { method: "POST", body: JSON.stringify({ query, collection: "danswer_index", use_keyword: null, - filters: null, + filters: emptyFilters, + enable_auto_detect_filters: false, offset: null, }), headers: { diff --git a/web/src/lib/search/utils.ts b/web/src/lib/search/utils.ts index d1d5e4ed63..a179025a57 100644 --- a/web/src/lib/search/utils.ts +++ b/web/src/lib/search/utils.ts @@ -1,16 +1,13 @@ import { Source } from "./interfaces"; export const buildFilters = (sources: Source[], documentSets: string[]) => { - const filters = []; - if (sources.length > 0) { - filters.push({ - source_type: sources.map((source) => source.internalName), - }); - } - if (documentSets.length > 0) { - filters.push({ - document_sets: documentSets, - }); - } + const filters = { + source_type: + sources.length > 0 ? sources.map((source) => source.internalName) : null, + document_set: documentSets.length > 0 ? documentSets : null, + // TODO make this a date selector + time_cutoff: null, + }; + return filters; };