mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-05-28 12:39:54 +02:00
Introduce Time Filters (#610)
This commit is contained in:
parent
8e3258981e
commit
e279918f95
2
backend/.gitignore
vendored
2
backend/.gitignore
vendored
@ -4,8 +4,6 @@ site_crawls/
|
||||
.ipynb_checkpoints/
|
||||
api_keys.py
|
||||
*ipynb
|
||||
qdrant-data/
|
||||
typesense-data/
|
||||
.env
|
||||
vespa-app.zip
|
||||
dynamic_config_storage/
|
||||
|
@ -196,7 +196,7 @@ def _run_indexing(
|
||||
) -> None:
|
||||
"""
|
||||
1. Get documents which are either new or updated from specified application
|
||||
2. Embed and index these documents into the chosen datastores (e.g. Qdrant / Typesense or Vespa)
|
||||
2. Embed and index these documents into the chosen datastore (vespa)
|
||||
3. Updates Postgres to record the indexed documents + the outcome of this run
|
||||
"""
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
from datetime import datetime
|
||||
|
||||
from slack_sdk.models.blocks import ActionsBlock
|
||||
from slack_sdk.models.blocks import Block
|
||||
from slack_sdk.models.blocks import ButtonElement
|
||||
@ -180,10 +182,24 @@ def build_qa_response_blocks(
|
||||
query_event_id: int,
|
||||
answer: str | None,
|
||||
quotes: list[DanswerQuote] | None,
|
||||
time_cutoff: datetime | None,
|
||||
favor_recent: bool,
|
||||
) -> list[Block]:
|
||||
quotes_blocks: list[Block] = []
|
||||
|
||||
ai_answer_header = HeaderBlock(text="AI Answer")
|
||||
filter_block: Block | None = None
|
||||
if time_cutoff or favor_recent:
|
||||
filter_text = "Filters: "
|
||||
if time_cutoff is not None:
|
||||
time_str = time_cutoff.strftime("%b %d, %Y")
|
||||
filter_text += f"`Docs Updated >= {time_str}` "
|
||||
if favor_recent:
|
||||
if time_cutoff is not None:
|
||||
filter_text += "+ "
|
||||
filter_text += "`Prioritize Recently Updated Docs`"
|
||||
|
||||
filter_block = SectionBlock(text=f"_{filter_text}_")
|
||||
|
||||
if not answer:
|
||||
answer_block = SectionBlock(
|
||||
@ -203,12 +219,14 @@ def build_qa_response_blocks(
|
||||
]
|
||||
|
||||
feedback_block = build_qa_feedback_block(query_event_id=query_event_id)
|
||||
return (
|
||||
[
|
||||
ai_answer_header,
|
||||
answer_block,
|
||||
feedback_block,
|
||||
]
|
||||
+ quotes_blocks
|
||||
+ [DividerBlock()]
|
||||
|
||||
response_blocks: list[Block] = [ai_answer_header]
|
||||
|
||||
if filter_block is not None:
|
||||
response_blocks.append(filter_block)
|
||||
|
||||
response_blocks.extend(
|
||||
[answer_block, feedback_block] + quotes_blocks + [DividerBlock()]
|
||||
)
|
||||
|
||||
return response_blocks
|
||||
|
@ -15,12 +15,12 @@ from danswer.bots.slack.utils import ChannelIdAdapter
|
||||
from danswer.bots.slack.utils import fetch_userids_from_emails
|
||||
from danswer.bots.slack.utils import respond_in_thread
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.constants import DOCUMENT_SETS
|
||||
from danswer.configs.danswerbot_configs import DANSWER_BOT_ANSWER_GENERATION_TIMEOUT
|
||||
from danswer.configs.danswerbot_configs import DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER
|
||||
from danswer.configs.danswerbot_configs import DANSWER_BOT_DISPLAY_ERROR_MSGS
|
||||
from danswer.configs.danswerbot_configs import DANSWER_BOT_NUM_RETRIES
|
||||
from danswer.configs.danswerbot_configs import DANSWER_REACT_EMOJI
|
||||
from danswer.configs.danswerbot_configs import DISABLE_DANSWER_BOT_FILTER_DETECT
|
||||
from danswer.configs.danswerbot_configs import ENABLE_DANSWERBOT_REFLEXION
|
||||
from danswer.connectors.slack.utils import make_slack_api_rate_limited
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
@ -28,6 +28,7 @@ from danswer.db.models import SlackBotConfig
|
||||
from danswer.direct_qa.answer_question import answer_qa_query
|
||||
from danswer.server.models import QAResponse
|
||||
from danswer.server.models import QuestionRequest
|
||||
from danswer.server.models import RequestFilters
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger_base = setup_logger()
|
||||
@ -72,6 +73,7 @@ def handle_message(
|
||||
answer_generation_timeout: int = DANSWER_BOT_ANSWER_GENERATION_TIMEOUT,
|
||||
should_respond_with_error_msgs: bool = DANSWER_BOT_DISPLAY_ERROR_MSGS,
|
||||
disable_docs_only_answer: bool = DANSWER_BOT_DISABLE_DOCS_ONLY_ANSWER,
|
||||
disable_auto_detect_filters: bool = DISABLE_DANSWER_BOT_FILTER_DETECT,
|
||||
reflexion: bool = ENABLE_DANSWERBOT_REFLEXION,
|
||||
) -> bool:
|
||||
"""Potentially respond to the user message depending on filters and if an answer was generated
|
||||
@ -178,15 +180,23 @@ def handle_message(
|
||||
|
||||
answer_failed = False
|
||||
try:
|
||||
# By leaving time_cutoff and favor_recent as None, and setting enable_auto_detect_filters
|
||||
# it allows the slack flow to extract out filters from the user query
|
||||
filters = RequestFilters(
|
||||
source_type=None,
|
||||
document_set=document_set_names,
|
||||
time_cutoff=None,
|
||||
)
|
||||
|
||||
# This includes throwing out answer via reflexion
|
||||
answer = _get_answer(
|
||||
QuestionRequest(
|
||||
query=msg,
|
||||
collection=DOCUMENT_INDEX_NAME,
|
||||
use_keyword=False, # always use semantic search when handling Slack messages
|
||||
filters=[{DOCUMENT_SETS: document_set_names}]
|
||||
if document_set_names
|
||||
else None,
|
||||
enable_auto_detect_filters=not disable_auto_detect_filters,
|
||||
filters=filters,
|
||||
favor_recent=None,
|
||||
offset=None,
|
||||
)
|
||||
)
|
||||
@ -251,6 +261,8 @@ def handle_message(
|
||||
query_event_id=answer.query_event_id,
|
||||
answer=answer.answer,
|
||||
quotes=answer.quotes,
|
||||
time_cutoff=answer.time_cutoff,
|
||||
favor_recent=answer.favor_recent,
|
||||
)
|
||||
|
||||
document_blocks = build_documents_blocks(
|
||||
|
@ -1,12 +1,12 @@
|
||||
import re
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Iterator
|
||||
from uuid import UUID
|
||||
|
||||
from langchain.schema.messages import AIMessage
|
||||
from langchain.schema.messages import BaseMessage
|
||||
from langchain.schema.messages import HumanMessage
|
||||
from langchain.schema.messages import SystemMessage
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.chat.chat_prompts import build_combined_query
|
||||
from danswer.chat.chat_prompts import DANSWER_TOOL_NAME
|
||||
@ -28,6 +28,7 @@ from danswer.configs.model_configs import GEN_AI_MAX_INPUT_TOKENS
|
||||
from danswer.datastores.document_index import get_default_document_index
|
||||
from danswer.db.models import ChatMessage
|
||||
from danswer.db.models import Persona
|
||||
from danswer.db.models import User
|
||||
from danswer.direct_qa.interfaces import DanswerAnswerPiece
|
||||
from danswer.direct_qa.interfaces import DanswerChatModelOut
|
||||
from danswer.direct_qa.qa_utils import get_usable_chunks
|
||||
@ -35,6 +36,7 @@ from danswer.llm.build import get_default_llm
|
||||
from danswer.llm.llm import LLM
|
||||
from danswer.llm.utils import get_default_llm_tokenizer
|
||||
from danswer.llm.utils import translate_danswer_msg_to_langchain
|
||||
from danswer.search.access_filters import build_user_only_filters
|
||||
from danswer.search.semantic_search import chunks_to_search_docs
|
||||
from danswer.search.semantic_search import retrieve_ranked_documents
|
||||
from danswer.server.models import RetrievalDocs
|
||||
@ -118,7 +120,8 @@ def danswer_chat_retrieval(
|
||||
query_message: ChatMessage,
|
||||
history: list[ChatMessage],
|
||||
llm: LLM,
|
||||
user_id: UUID | None,
|
||||
user: User | None,
|
||||
db_session: Session,
|
||||
) -> list[InferenceChunk]:
|
||||
if history:
|
||||
query_combination_msgs = build_combined_query(query_message, history)
|
||||
@ -128,9 +131,9 @@ def danswer_chat_retrieval(
|
||||
|
||||
# Good Debug/Breakpoint
|
||||
ranked_chunks, unranked_chunks = retrieve_ranked_documents(
|
||||
reworded_query,
|
||||
user_id=user_id,
|
||||
filters=None,
|
||||
query=reworded_query,
|
||||
filters=build_user_only_filters(user, db_session),
|
||||
favor_recent=False,
|
||||
datastore=get_default_document_index(),
|
||||
)
|
||||
if not ranked_chunks:
|
||||
@ -277,8 +280,9 @@ def extract_citations_from_stream(
|
||||
def llm_contextual_chat_answer(
|
||||
messages: list[ChatMessage],
|
||||
persona: Persona,
|
||||
user_id: UUID | None,
|
||||
user: User | None,
|
||||
tokenizer: Callable,
|
||||
db_session: Session,
|
||||
run_search_system_text: str = REQUIRE_DANSWER_SYSTEM_MSG,
|
||||
) -> Iterator[str | list[InferenceChunk]]:
|
||||
last_message = messages[-1]
|
||||
@ -322,7 +326,8 @@ def llm_contextual_chat_answer(
|
||||
query_message=last_message,
|
||||
history=previous_messages,
|
||||
llm=llm,
|
||||
user_id=user_id,
|
||||
user=user,
|
||||
db_session=db_session,
|
||||
)
|
||||
yield retrieved_chunks
|
||||
tool_result_str = format_danswer_chunks_for_chat(retrieved_chunks)
|
||||
@ -369,8 +374,9 @@ def llm_contextual_chat_answer(
|
||||
def llm_tools_enabled_chat_answer(
|
||||
messages: list[ChatMessage],
|
||||
persona: Persona,
|
||||
user_id: UUID | None,
|
||||
user: User | None,
|
||||
tokenizer: Callable,
|
||||
db_session: Session,
|
||||
) -> Iterator[str | list[InferenceChunk]]:
|
||||
retrieval_enabled = persona.retrieval_enabled
|
||||
system_text = build_system_text_from_persona(persona)
|
||||
@ -447,12 +453,13 @@ def llm_tools_enabled_chat_answer(
|
||||
query_message=last_message,
|
||||
history=previous_messages,
|
||||
llm=llm,
|
||||
user_id=user_id,
|
||||
user=user,
|
||||
db_session=db_session,
|
||||
)
|
||||
yield retrieved_chunks
|
||||
tool_result_str = format_danswer_chunks_for_chat(retrieved_chunks)
|
||||
else:
|
||||
tool_result_str = call_tool(final_result, user_id=user_id)
|
||||
tool_result_str = call_tool(final_result)
|
||||
|
||||
# The AI's tool calling message
|
||||
tool_call_msg_text = final_result.model_raw
|
||||
@ -508,8 +515,9 @@ def wrap_chat_package_in_model(
|
||||
def llm_chat_answer(
|
||||
messages: list[ChatMessage],
|
||||
persona: Persona | None,
|
||||
user_id: UUID | None,
|
||||
tokenizer: Callable,
|
||||
user: User | None,
|
||||
db_session: Session,
|
||||
) -> Iterator[DanswerAnswerPiece | RetrievalDocs]:
|
||||
# Common error cases to keep in mind:
|
||||
# - User asks question about something long ago, due to context limit, the message is dropped
|
||||
@ -535,13 +543,21 @@ def llm_chat_answer(
|
||||
# Doesn't require tool calling output format (all LLM outputs are therefore valid)
|
||||
elif persona.retrieval_enabled and not persona.tools and not FORCE_TOOL_PROMPT:
|
||||
for package in llm_contextual_chat_answer(
|
||||
messages=messages, persona=persona, user_id=user_id, tokenizer=tokenizer
|
||||
messages=messages,
|
||||
persona=persona,
|
||||
tokenizer=tokenizer,
|
||||
user=user,
|
||||
db_session=db_session,
|
||||
):
|
||||
yield wrap_chat_package_in_model(package)
|
||||
|
||||
# Use most flexible/complex prompt format
|
||||
else:
|
||||
for package in llm_tools_enabled_chat_answer(
|
||||
messages=messages, persona=persona, user_id=user_id, tokenizer=tokenizer
|
||||
messages=messages,
|
||||
persona=persona,
|
||||
tokenizer=tokenizer,
|
||||
user=user,
|
||||
db_session=db_session,
|
||||
):
|
||||
yield wrap_chat_package_in_model(package)
|
||||
|
@ -1,10 +1,7 @@
|
||||
from uuid import UUID
|
||||
|
||||
from danswer.direct_qa.interfaces import DanswerChatModelOut
|
||||
|
||||
|
||||
def call_tool(
|
||||
model_actions: DanswerChatModelOut,
|
||||
user_id: UUID | None,
|
||||
) -> str:
|
||||
raise NotImplementedError("There are no additional tool integrations right now")
|
||||
|
@ -92,17 +92,6 @@ VESPA_TENANT_PORT = os.environ.get("VESPA_TENANT_PORT") or "19071"
|
||||
VESPA_DEPLOYMENT_ZIP = (
|
||||
os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip"
|
||||
)
|
||||
# Qdrant is Semantic Search Vector DB
|
||||
# Url / Key are used to connect to a remote Qdrant instance
|
||||
QDRANT_URL = os.environ.get("QDRANT_URL", "")
|
||||
QDRANT_API_KEY = os.environ.get("QDRANT_API_KEY", "")
|
||||
# Host / Port are used for connecting to local Qdrant instance
|
||||
QDRANT_HOST = os.environ.get("QDRANT_HOST") or "localhost"
|
||||
QDRANT_PORT = 6333
|
||||
# Typesense is the Keyword Search Engine
|
||||
TYPESENSE_HOST = os.environ.get("TYPESENSE_HOST") or "localhost"
|
||||
TYPESENSE_PORT = 8108
|
||||
TYPESENSE_API_KEY = os.environ.get("TYPESENSE_API_KEY", "")
|
||||
# Number of documents in a batch during indexing (further batching done by chunks before passing to bi-encoder)
|
||||
INDEX_BATCH_SIZE = 16
|
||||
|
||||
@ -153,11 +142,15 @@ NUM_DOCUMENT_TOKENS_FED_TO_GENERATIVE_MODEL = int(
|
||||
NUM_DOCUMENT_TOKENS_FED_TO_CHAT = int(
|
||||
os.environ.get("NUM_DOCUMENT_TOKENS_FED_TO_CHAT") or (512 * 3)
|
||||
)
|
||||
# 1 / (1 + DOC_TIME_DECAY * doc-age-in-years)
|
||||
# 1 / (1 + DOC_TIME_DECAY * doc-age-in-years), set to 0 to have no decay
|
||||
# Capped in Vespa at 0.5
|
||||
DOC_TIME_DECAY = float(
|
||||
os.environ.get("DOC_TIME_DECAY") or 0.5 # Hits limit at 2 years by default
|
||||
)
|
||||
FAVOR_RECENT_DECAY_MULTIPLIER = 2
|
||||
DISABLE_TIME_FILTER_EXTRACTION = (
|
||||
os.environ.get("DISABLE_TIME_FILTER_EXTRACTION", "").lower() == "true"
|
||||
)
|
||||
# 1 edit per 2 characters, currently unused due to fuzzy match being too slow
|
||||
QUOTE_ALLOWED_ERROR_PERCENT = 0.05
|
||||
QA_TIMEOUT = int(os.environ.get("QA_TIMEOUT") or "60") # 60 seconds
|
||||
|
@ -14,6 +14,7 @@ EMBEDDINGS = "embeddings"
|
||||
ALLOWED_USERS = "allowed_users"
|
||||
ACCESS_CONTROL_LIST = "access_control_list"
|
||||
DOCUMENT_SETS = "document_sets"
|
||||
TIME_FILTER = "time_filter"
|
||||
METADATA = "metadata"
|
||||
MATCH_HIGHLIGHTS = "match_highlights"
|
||||
# stored in the `metadata` of a chunk. Used to signify that this chunk should
|
||||
|
@ -35,6 +35,10 @@ DANSWER_BOT_DISPLAY_ERROR_MSGS = os.environ.get(
|
||||
DANSWER_BOT_RESPOND_EVERY_CHANNEL = (
|
||||
os.environ.get("DANSWER_BOT_RESPOND_EVERY_CHANNEL", "").lower() == "true"
|
||||
)
|
||||
# Auto detect query options like time cutoff or heavily favor recently updated docs
|
||||
DISABLE_DANSWER_BOT_FILTER_DETECT = (
|
||||
os.environ.get("DISABLE_DANSWER_BOT_FILTER_DETECT", "").lower() == "true"
|
||||
)
|
||||
# Add a second LLM call post Answer to verify if the Answer is valid
|
||||
# Throws out answers that don't directly or fully answer the user query
|
||||
# This is the default for all DanswerBot channels unless the bot is configured individually
|
||||
|
@ -1,7 +1,5 @@
|
||||
import math
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
|
||||
from danswer.chunking.models import IndexChunk
|
||||
from danswer.chunking.models import InferenceChunk
|
||||
@ -32,13 +30,3 @@ def get_uuid_from_chunk(
|
||||
[doc_str, str(chunk.chunk_id), str(mini_chunk_ind)]
|
||||
)
|
||||
return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)
|
||||
|
||||
|
||||
def translate_to_epoch_seconds_ensure_tz(t: datetime | None) -> int | None:
|
||||
if not t:
|
||||
return None
|
||||
|
||||
if t.tzinfo != timezone.utc:
|
||||
raise ValueError("Connectors must provide document update time in UTC")
|
||||
|
||||
return int(t.timestamp())
|
||||
|
@ -1,109 +1,12 @@
|
||||
from typing import Type
|
||||
from uuid import UUID
|
||||
|
||||
from danswer.chunking.models import DocMetadataAwareIndexChunk
|
||||
from danswer.chunking.models import InferenceChunk
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_TYPE
|
||||
from danswer.configs.app_configs import NUM_RETURNED_HITS
|
||||
from danswer.configs.constants import DocumentIndexType
|
||||
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
|
||||
from danswer.datastores.interfaces import DocumentIndex
|
||||
from danswer.datastores.interfaces import DocumentInsertionRecord
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.datastores.interfaces import KeywordIndex
|
||||
from danswer.datastores.interfaces import UpdateRequest
|
||||
from danswer.datastores.interfaces import VectorIndex
|
||||
from danswer.datastores.qdrant.store import QdrantIndex
|
||||
from danswer.datastores.typesense.store import TypesenseIndex
|
||||
from danswer.datastores.vespa.store import VespaIndex
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
class SplitDocumentIndex(DocumentIndex):
|
||||
"""A Document index that uses 2 separate storages
|
||||
one for keyword index and one for vector index"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
index_name: str | None = DOCUMENT_INDEX_NAME,
|
||||
keyword_index_cls: Type[KeywordIndex] = TypesenseIndex,
|
||||
vector_index_cls: Type[VectorIndex] = QdrantIndex,
|
||||
) -> None:
|
||||
index_name = index_name or DOCUMENT_INDEX_NAME
|
||||
self.keyword_index = keyword_index_cls(index_name)
|
||||
self.vector_index = vector_index_cls(index_name)
|
||||
|
||||
def ensure_indices_exist(self) -> None:
|
||||
self.keyword_index.ensure_indices_exist()
|
||||
self.vector_index.ensure_indices_exist()
|
||||
|
||||
def index(
|
||||
self,
|
||||
chunks: list[DocMetadataAwareIndexChunk],
|
||||
) -> set[DocumentInsertionRecord]:
|
||||
keyword_index_result = self.keyword_index.index(chunks)
|
||||
vector_index_result = self.vector_index.index(chunks)
|
||||
if keyword_index_result != vector_index_result:
|
||||
logger.error(
|
||||
f"Inconsistent document indexing:\n"
|
||||
f"Keyword: {keyword_index_result}\n"
|
||||
f"Vector: {vector_index_result}"
|
||||
)
|
||||
return keyword_index_result.union(vector_index_result)
|
||||
|
||||
def update(self, update_requests: list[UpdateRequest]) -> None:
|
||||
self.keyword_index.update(update_requests)
|
||||
self.vector_index.update(update_requests)
|
||||
|
||||
def delete(self, doc_ids: list[str]) -> None:
|
||||
self.keyword_index.delete(doc_ids)
|
||||
self.vector_index.delete(doc_ids)
|
||||
|
||||
def keyword_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
) -> list[InferenceChunk]:
|
||||
return self.keyword_index.keyword_retrieval(
|
||||
query, user_id, filters, num_to_retrieve
|
||||
)
|
||||
|
||||
def semantic_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF,
|
||||
) -> list[InferenceChunk]:
|
||||
return self.vector_index.semantic_retrieval(
|
||||
query, user_id, filters, num_to_retrieve, distance_cutoff
|
||||
)
|
||||
|
||||
def hybrid_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
num_to_retrieve: int,
|
||||
) -> list[InferenceChunk]:
|
||||
"""Currently results from vector and keyword indices are not combined post retrieval.
|
||||
This may change in the future but for now, the default behavior is to use semantic search
|
||||
which should be a more flexible/powerful search option"""
|
||||
return self.semantic_retrieval(query, user_id, filters, num_to_retrieve)
|
||||
|
||||
|
||||
def get_default_document_index(
|
||||
collection: str | None = DOCUMENT_INDEX_NAME, index_type: str = DOCUMENT_INDEX_TYPE
|
||||
) -> DocumentIndex:
|
||||
if index_type == DocumentIndexType.COMBINED.value:
|
||||
return VespaIndex() # Can't specify collection without modifying the deployment
|
||||
elif index_type == DocumentIndexType.SPLIT.value:
|
||||
return SplitDocumentIndex(index_name=collection)
|
||||
else:
|
||||
raise ValueError("Invalid document index type selected")
|
||||
def get_default_document_index() -> DocumentIndex:
|
||||
# Currently only supporting Vespa
|
||||
# Supporting multiple index types with multiple
|
||||
# Search-Engines / VectorDBs was too much overhead
|
||||
return VespaIndex()
|
||||
|
@ -2,14 +2,12 @@ import abc
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from danswer.access.models import DocumentAccess
|
||||
from danswer.chunking.models import DocMetadataAwareIndexChunk
|
||||
from danswer.chunking.models import InferenceChunk
|
||||
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
|
||||
|
||||
IndexFilter = dict[str, str | list[str] | None]
|
||||
from danswer.server.models import IndexFilters
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@ -84,8 +82,8 @@ class KeywordCapable(abc.ABC):
|
||||
def keyword_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
filters: IndexFilters,
|
||||
favor_recent: bool,
|
||||
num_to_retrieve: int,
|
||||
) -> list[InferenceChunk]:
|
||||
raise NotImplementedError
|
||||
@ -96,8 +94,8 @@ class VectorCapable(abc.ABC):
|
||||
def semantic_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
filters: IndexFilters,
|
||||
favor_recent: bool,
|
||||
num_to_retrieve: int,
|
||||
distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF,
|
||||
) -> list[InferenceChunk]:
|
||||
@ -109,14 +107,25 @@ class HybridCapable(abc.ABC):
|
||||
def hybrid_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
filters: IndexFilters,
|
||||
favor_recent: bool,
|
||||
num_to_retrieve: int,
|
||||
) -> list[InferenceChunk]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class BaseIndex(Verifiable, Indexable, Updatable, Deletable, abc.ABC):
|
||||
class AdminCapable(abc.ABC):
|
||||
@abc.abstractmethod
|
||||
def admin_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
filters: IndexFilters,
|
||||
num_to_retrieve: int,
|
||||
) -> list[InferenceChunk]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class BaseIndex(Verifiable, AdminCapable, Indexable, Updatable, Deletable, abc.ABC):
|
||||
"""All basic functionalities excluding a specific retrieval approach
|
||||
Indices need to be able to
|
||||
- Check that the index exists with a schema definition
|
||||
|
@ -1,152 +0,0 @@
|
||||
import json
|
||||
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http import models
|
||||
from qdrant_client.http.exceptions import ResponseHandlingException
|
||||
from qdrant_client.http.models.models import UpdateResult
|
||||
from qdrant_client.models import PointStruct
|
||||
|
||||
from danswer.chunking.models import DocMetadataAwareIndexChunk
|
||||
from danswer.configs.constants import ALLOWED_USERS
|
||||
from danswer.configs.constants import BLURB
|
||||
from danswer.configs.constants import CHUNK_ID
|
||||
from danswer.configs.constants import CONTENT
|
||||
from danswer.configs.constants import DOCUMENT_ID
|
||||
from danswer.configs.constants import METADATA
|
||||
from danswer.configs.constants import SECTION_CONTINUATION
|
||||
from danswer.configs.constants import SEMANTIC_IDENTIFIER
|
||||
from danswer.configs.constants import SOURCE_LINKS
|
||||
from danswer.configs.constants import SOURCE_TYPE
|
||||
from danswer.datastores.datastore_utils import DEFAULT_BATCH_SIZE
|
||||
from danswer.datastores.datastore_utils import get_uuid_from_chunk
|
||||
from danswer.datastores.interfaces import DocumentInsertionRecord
|
||||
from danswer.utils.clients import get_qdrant_client
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def _does_document_exist(
|
||||
doc_chunk_id: str, collection_name: str, q_client: QdrantClient
|
||||
) -> bool:
|
||||
"""Get whether a document is found and the existing whitelists"""
|
||||
results = q_client.retrieve(
|
||||
collection_name=collection_name,
|
||||
ids=[doc_chunk_id],
|
||||
)
|
||||
if len(results) == 0:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def delete_qdrant_doc_chunks(
|
||||
document_id: str, collection_name: str, q_client: QdrantClient
|
||||
) -> bool:
|
||||
q_client.delete(
|
||||
collection_name=collection_name,
|
||||
points_selector=models.FilterSelector(
|
||||
filter=models.Filter(
|
||||
must=[
|
||||
models.FieldCondition(
|
||||
key=DOCUMENT_ID,
|
||||
match=models.MatchValue(value=document_id),
|
||||
),
|
||||
],
|
||||
)
|
||||
),
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def index_qdrant_chunks(
|
||||
chunks: list[DocMetadataAwareIndexChunk],
|
||||
collection: str,
|
||||
client: QdrantClient | None = None,
|
||||
batch_upsert: bool = True,
|
||||
) -> set[DocumentInsertionRecord]:
|
||||
# Public documents will have the PUBLIC string in ALLOWED_USERS
|
||||
# If credential that kicked this off has no user associated, either Auth is off or the doc is public
|
||||
q_client: QdrantClient = client if client else get_qdrant_client()
|
||||
|
||||
point_structs: list[PointStruct] = []
|
||||
insertion_records: set[DocumentInsertionRecord] = set()
|
||||
# document ids of documents that existed BEFORE this indexing
|
||||
already_existing_documents: set[str] = set()
|
||||
for chunk in chunks:
|
||||
document = chunk.source_document
|
||||
|
||||
# Delete all chunks related to the document if (1) it already exists and
|
||||
# (2) this is our first time running into it during this indexing attempt
|
||||
document_exists = _does_document_exist(document.id, collection, q_client)
|
||||
if document_exists and document.id not in already_existing_documents:
|
||||
# Processing the first chunk of the doc and the doc exists
|
||||
delete_qdrant_doc_chunks(document.id, collection, q_client)
|
||||
already_existing_documents.add(document.id)
|
||||
|
||||
embeddings = chunk.embeddings
|
||||
vector_list = [embeddings.full_embedding]
|
||||
vector_list.extend(embeddings.mini_chunk_embeddings)
|
||||
|
||||
for minichunk_ind, embedding in enumerate(vector_list):
|
||||
qdrant_id = str(get_uuid_from_chunk(chunk, minichunk_ind))
|
||||
insertion_records.add(
|
||||
DocumentInsertionRecord(
|
||||
document_id=document.id,
|
||||
already_existed=document.id in already_existing_documents,
|
||||
)
|
||||
)
|
||||
point_structs.append(
|
||||
PointStruct(
|
||||
id=qdrant_id,
|
||||
payload={
|
||||
DOCUMENT_ID: document.id,
|
||||
CHUNK_ID: chunk.chunk_id,
|
||||
BLURB: chunk.blurb,
|
||||
CONTENT: chunk.content,
|
||||
SOURCE_TYPE: str(document.source.value),
|
||||
SOURCE_LINKS: chunk.source_links,
|
||||
SEMANTIC_IDENTIFIER: document.semantic_identifier,
|
||||
SECTION_CONTINUATION: chunk.section_continuation,
|
||||
ALLOWED_USERS: json.dumps(chunk.access.to_acl()),
|
||||
METADATA: json.dumps(document.metadata),
|
||||
},
|
||||
vector=embedding,
|
||||
)
|
||||
)
|
||||
|
||||
if batch_upsert:
|
||||
point_struct_batches = [
|
||||
point_structs[x : x + DEFAULT_BATCH_SIZE]
|
||||
for x in range(0, len(point_structs), DEFAULT_BATCH_SIZE)
|
||||
]
|
||||
for point_struct_batch in point_struct_batches:
|
||||
|
||||
def upsert() -> UpdateResult | None:
|
||||
for _ in range(5):
|
||||
try:
|
||||
return q_client.upsert(
|
||||
collection_name=collection, points=point_struct_batch
|
||||
)
|
||||
except ResponseHandlingException as e:
|
||||
logger.warning(
|
||||
f"Failed to upsert batch into qdrant due to error: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
index_results = upsert()
|
||||
log_status = index_results.status if index_results else "Failed"
|
||||
logger.info(
|
||||
f"Indexed {len(point_struct_batch)} chunks into Qdrant collection '{collection}', "
|
||||
f"status: {log_status}"
|
||||
)
|
||||
else:
|
||||
index_results = q_client.upsert(
|
||||
collection_name=collection, points=point_structs
|
||||
)
|
||||
logger.info(
|
||||
f"Document batch of size {len(point_structs)} indexing status: {index_results.status}"
|
||||
)
|
||||
|
||||
return insertion_records
|
@ -1,221 +0,0 @@
|
||||
from uuid import UUID
|
||||
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http.exceptions import ResponseHandlingException
|
||||
from qdrant_client.http.exceptions import UnexpectedResponse
|
||||
from qdrant_client.http.models import FieldCondition
|
||||
from qdrant_client.http.models import Filter
|
||||
from qdrant_client.http.models import MatchAny
|
||||
from qdrant_client.http.models import MatchValue
|
||||
|
||||
from danswer.chunking.models import DocMetadataAwareIndexChunk
|
||||
from danswer.chunking.models import InferenceChunk
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.app_configs import NUM_RETURNED_HITS
|
||||
from danswer.configs.constants import ALLOWED_USERS
|
||||
from danswer.configs.constants import DOCUMENT_ID
|
||||
from danswer.configs.constants import PUBLIC_DOC_PAT
|
||||
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
|
||||
from danswer.datastores.datastore_utils import get_uuid_from_chunk
|
||||
from danswer.datastores.interfaces import DocumentInsertionRecord
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.datastores.interfaces import UpdateRequest
|
||||
from danswer.datastores.interfaces import VectorIndex
|
||||
from danswer.datastores.qdrant.indexing import index_qdrant_chunks
|
||||
from danswer.datastores.qdrant.utils import create_qdrant_collection
|
||||
from danswer.datastores.qdrant.utils import list_qdrant_collections
|
||||
from danswer.search.semantic_search import embed_query
|
||||
from danswer.utils.batching import batch_generator
|
||||
from danswer.utils.clients import get_qdrant_client
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.timing import log_function_time
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# how many points we want to delete/update at a time
|
||||
_BATCH_SIZE = 200
|
||||
|
||||
|
||||
def _build_qdrant_filters(
|
||||
user_id: UUID | None, filters: list[IndexFilter] | None
|
||||
) -> list[FieldCondition]:
|
||||
filter_conditions: list[FieldCondition] = []
|
||||
# Permissions filter
|
||||
if user_id:
|
||||
filter_conditions.append(
|
||||
FieldCondition(
|
||||
key=ALLOWED_USERS,
|
||||
match=MatchAny(any=[str(user_id), PUBLIC_DOC_PAT]),
|
||||
)
|
||||
)
|
||||
else:
|
||||
filter_conditions.append(
|
||||
FieldCondition(
|
||||
key=ALLOWED_USERS,
|
||||
match=MatchValue(value=PUBLIC_DOC_PAT),
|
||||
)
|
||||
)
|
||||
|
||||
# Provided query filters
|
||||
if filters:
|
||||
for filter_dict in filters:
|
||||
valid_filters = {
|
||||
key: value for key, value in filter_dict.items() if value is not None
|
||||
}
|
||||
for filter_key, filter_val in valid_filters.items():
|
||||
if isinstance(filter_val, str):
|
||||
filter_conditions.append(
|
||||
FieldCondition(
|
||||
key=filter_key,
|
||||
match=MatchValue(value=filter_val),
|
||||
)
|
||||
)
|
||||
elif isinstance(filter_val, list):
|
||||
filter_conditions.append(
|
||||
FieldCondition(
|
||||
key=filter_key,
|
||||
match=MatchAny(any=filter_val),
|
||||
)
|
||||
)
|
||||
else:
|
||||
raise ValueError("Invalid filters provided")
|
||||
|
||||
return filter_conditions
|
||||
|
||||
|
||||
def _get_points_from_document_ids(
|
||||
document_ids: list[str],
|
||||
collection: str,
|
||||
client: QdrantClient,
|
||||
) -> list[int | str]:
|
||||
offset: int | str | None = 0
|
||||
chunk_ids = []
|
||||
while offset is not None:
|
||||
matches, offset = client.scroll(
|
||||
collection_name=collection,
|
||||
scroll_filter=Filter(
|
||||
must=[FieldCondition(key=DOCUMENT_ID, match=MatchAny(any=document_ids))]
|
||||
),
|
||||
limit=_BATCH_SIZE,
|
||||
with_payload=False,
|
||||
with_vectors=False,
|
||||
offset=offset,
|
||||
)
|
||||
for match in matches:
|
||||
chunk_ids.append(match.id)
|
||||
|
||||
return chunk_ids
|
||||
|
||||
|
||||
class QdrantIndex(VectorIndex):
|
||||
def __init__(self, index_name: str = DOCUMENT_INDEX_NAME) -> None:
|
||||
# In Qdrant, the vector index is referred to as a collection
|
||||
self.collection = index_name
|
||||
self.client = get_qdrant_client()
|
||||
|
||||
def ensure_indices_exist(self) -> None:
|
||||
if self.collection not in {
|
||||
collection.name
|
||||
for collection in list_qdrant_collections(self.client).collections
|
||||
}:
|
||||
logger.info(f"Creating Qdrant collection with name: {self.collection}")
|
||||
create_qdrant_collection(
|
||||
collection_name=self.collection, q_client=self.client
|
||||
)
|
||||
|
||||
def index(
|
||||
self,
|
||||
chunks: list[DocMetadataAwareIndexChunk],
|
||||
) -> set[DocumentInsertionRecord]:
|
||||
return index_qdrant_chunks(
|
||||
chunks=chunks,
|
||||
collection=self.collection,
|
||||
client=self.client,
|
||||
)
|
||||
|
||||
def update(self, update_requests: list[UpdateRequest]) -> None:
|
||||
logger.info(
|
||||
f"Updating {len(update_requests)} documents' allowed_users in Qdrant"
|
||||
)
|
||||
for update_request in update_requests:
|
||||
for doc_id_batch in batch_generator(
|
||||
items=update_request.document_ids,
|
||||
batch_size=_BATCH_SIZE,
|
||||
):
|
||||
if update_request.access is None:
|
||||
continue
|
||||
|
||||
chunk_ids = _get_points_from_document_ids(
|
||||
doc_id_batch, self.collection, self.client
|
||||
)
|
||||
self.client.set_payload(
|
||||
collection_name=self.collection,
|
||||
payload={ALLOWED_USERS: update_request.access.to_acl()},
|
||||
points=chunk_ids,
|
||||
)
|
||||
|
||||
def delete(self, doc_ids: list[str]) -> None:
|
||||
logger.info(f"Deleting {len(doc_ids)} documents from Qdrant")
|
||||
for doc_id_batch in batch_generator(items=doc_ids, batch_size=_BATCH_SIZE):
|
||||
chunk_ids = _get_points_from_document_ids(
|
||||
doc_id_batch, self.collection, self.client
|
||||
)
|
||||
self.client.delete(
|
||||
collection_name=self.collection,
|
||||
points_selector=chunk_ids,
|
||||
)
|
||||
|
||||
@log_function_time()
|
||||
def semantic_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF,
|
||||
page_size: int = NUM_RETURNED_HITS,
|
||||
) -> list[InferenceChunk]:
|
||||
query_embedding = embed_query(query)
|
||||
|
||||
filter_conditions = _build_qdrant_filters(user_id, filters)
|
||||
|
||||
page_offset = 0
|
||||
found_inference_chunks: list[InferenceChunk] = []
|
||||
found_chunk_uuids: set[UUID] = set()
|
||||
while len(found_inference_chunks) < num_to_retrieve:
|
||||
try:
|
||||
hits = self.client.search(
|
||||
collection_name=self.collection,
|
||||
query_vector=query_embedding,
|
||||
query_filter=Filter(must=list(filter_conditions)),
|
||||
limit=page_size,
|
||||
offset=page_offset,
|
||||
score_threshold=distance_cutoff,
|
||||
)
|
||||
page_offset += page_size
|
||||
if not hits:
|
||||
break
|
||||
except ResponseHandlingException as e:
|
||||
logger.exception(
|
||||
f'Qdrant querying failed due to: "{e}", is Qdrant set up?'
|
||||
)
|
||||
break
|
||||
except UnexpectedResponse as e:
|
||||
logger.exception(
|
||||
f'Qdrant querying failed due to: "{e}", has ingestion been run?'
|
||||
)
|
||||
break
|
||||
|
||||
inference_chunks_from_hits = [
|
||||
InferenceChunk.from_dict(hit.payload)
|
||||
for hit in hits
|
||||
if hit.payload is not None
|
||||
]
|
||||
for inf_chunk in inference_chunks_from_hits:
|
||||
# remove duplicate chunks which happen if minichunks are used
|
||||
inf_chunk_id = get_uuid_from_chunk(inf_chunk)
|
||||
if inf_chunk_id not in found_chunk_uuids:
|
||||
found_inference_chunks.append(inf_chunk)
|
||||
found_chunk_uuids.add(inf_chunk_id)
|
||||
|
||||
return found_inference_chunks
|
@ -1,45 +0,0 @@
|
||||
from typing import Any
|
||||
|
||||
from qdrant_client import QdrantClient
|
||||
from qdrant_client.http.models import Record
|
||||
from qdrant_client.models import CollectionsResponse
|
||||
from qdrant_client.models import Distance
|
||||
from qdrant_client.models import VectorParams
|
||||
|
||||
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
|
||||
from danswer.utils.clients import get_qdrant_client
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def list_qdrant_collections(
|
||||
q_client: QdrantClient | None = None,
|
||||
) -> CollectionsResponse:
|
||||
q_client = q_client or get_qdrant_client()
|
||||
return q_client.get_collections()
|
||||
|
||||
|
||||
def create_qdrant_collection(
|
||||
collection_name: str,
|
||||
embedding_dim: int = DOC_EMBEDDING_DIM,
|
||||
q_client: QdrantClient | None = None,
|
||||
) -> None:
|
||||
q_client = q_client or get_qdrant_client()
|
||||
logger.info(f"Attempting to create collection {collection_name}")
|
||||
result = q_client.create_collection(
|
||||
collection_name=collection_name,
|
||||
vectors_config=VectorParams(size=embedding_dim, distance=Distance.COSINE),
|
||||
)
|
||||
if not result:
|
||||
raise RuntimeError("Could not create Qdrant collection")
|
||||
|
||||
|
||||
def get_payload_from_record(record: Record, is_required: bool = True) -> dict[str, Any]:
|
||||
if record.payload is None and is_required:
|
||||
raise RuntimeError(
|
||||
"Qdrant Index is corrupted, Document found with no metadata."
|
||||
)
|
||||
|
||||
return record.payload or {}
|
@ -1,280 +0,0 @@
|
||||
import json
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
import typesense # type: ignore
|
||||
from typesense.exceptions import ObjectNotFound # type: ignore
|
||||
|
||||
from danswer.chunking.models import DocMetadataAwareIndexChunk
|
||||
from danswer.chunking.models import InferenceChunk
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.app_configs import NUM_RETURNED_HITS
|
||||
from danswer.configs.constants import ALLOWED_USERS
|
||||
from danswer.configs.constants import BLURB
|
||||
from danswer.configs.constants import CHUNK_ID
|
||||
from danswer.configs.constants import CONTENT
|
||||
from danswer.configs.constants import DOCUMENT_ID
|
||||
from danswer.configs.constants import METADATA
|
||||
from danswer.configs.constants import PUBLIC_DOC_PAT
|
||||
from danswer.configs.constants import SECTION_CONTINUATION
|
||||
from danswer.configs.constants import SEMANTIC_IDENTIFIER
|
||||
from danswer.configs.constants import SOURCE_LINKS
|
||||
from danswer.configs.constants import SOURCE_TYPE
|
||||
from danswer.datastores.datastore_utils import DEFAULT_BATCH_SIZE
|
||||
from danswer.datastores.datastore_utils import get_uuid_from_chunk
|
||||
from danswer.datastores.interfaces import DocumentInsertionRecord
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.datastores.interfaces import KeywordIndex
|
||||
from danswer.datastores.interfaces import UpdateRequest
|
||||
from danswer.utils.batching import batch_generator
|
||||
from danswer.utils.clients import get_typesense_client
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# how many points we want to delete/update at a time
|
||||
_BATCH_SIZE = 200
|
||||
|
||||
|
||||
def create_typesense_collection(
|
||||
collection_name: str = DOCUMENT_INDEX_NAME,
|
||||
) -> None:
|
||||
ts_client = get_typesense_client()
|
||||
collection_schema = {
|
||||
"name": collection_name,
|
||||
"fields": [
|
||||
# Typesense uses "id" type string as a special field
|
||||
{"name": "id", "type": "string"},
|
||||
{"name": DOCUMENT_ID, "type": "string"},
|
||||
{"name": CHUNK_ID, "type": "int32"},
|
||||
{"name": BLURB, "type": "string"},
|
||||
{"name": CONTENT, "type": "string"},
|
||||
{"name": SOURCE_TYPE, "type": "string"},
|
||||
{"name": SOURCE_LINKS, "type": "string"},
|
||||
{"name": SEMANTIC_IDENTIFIER, "type": "string"},
|
||||
{"name": SECTION_CONTINUATION, "type": "bool"},
|
||||
{"name": ALLOWED_USERS, "type": "string[]"},
|
||||
{"name": METADATA, "type": "string"},
|
||||
],
|
||||
}
|
||||
ts_client.collections.create(collection_schema)
|
||||
|
||||
|
||||
def _check_typesense_collection_exist(
|
||||
collection_name: str = DOCUMENT_INDEX_NAME,
|
||||
) -> bool:
|
||||
client = get_typesense_client()
|
||||
try:
|
||||
client.collections[collection_name].retrieve()
|
||||
except ObjectNotFound:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _does_document_exist(
|
||||
doc_chunk_id: str, collection_name: str, ts_client: typesense.Client
|
||||
) -> bool:
|
||||
"""Returns whether the document already exists and the users/group whitelists"""
|
||||
try:
|
||||
ts_client.collections[collection_name].documents[doc_chunk_id].retrieve()
|
||||
except ObjectNotFound:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _delete_typesense_doc_chunks(
|
||||
document_id: str, collection_name: str, ts_client: typesense.Client
|
||||
) -> bool:
|
||||
doc_id_filter = {"filter_by": f"{DOCUMENT_ID}:'{document_id}'"}
|
||||
|
||||
# Typesense doesn't seem to prioritize individual deletions, problem not seen with this approach
|
||||
# Point to consider if we see instances of number of Typesense and Qdrant docs not matching
|
||||
del_result = ts_client.collections[collection_name].documents.delete(doc_id_filter)
|
||||
return del_result["num_deleted"] != 0
|
||||
|
||||
|
||||
def _index_typesense_chunks(
|
||||
chunks: list[DocMetadataAwareIndexChunk],
|
||||
collection: str,
|
||||
client: typesense.Client | None = None,
|
||||
batch_upsert: bool = True,
|
||||
) -> set[DocumentInsertionRecord]:
|
||||
ts_client: typesense.Client = client if client else get_typesense_client()
|
||||
|
||||
insertion_records: set[DocumentInsertionRecord] = set()
|
||||
new_documents: list[dict[str, Any]] = []
|
||||
# document ids of documents that existed BEFORE this indexing
|
||||
already_existing_documents: set[str] = set()
|
||||
for chunk in chunks:
|
||||
document = chunk.source_document
|
||||
typesense_id = str(get_uuid_from_chunk(chunk))
|
||||
|
||||
# Delete all chunks related to the document if (1) it already exists and
|
||||
# (2) this is our first time running into it during this indexing attempt
|
||||
document_exists = _does_document_exist(typesense_id, collection, ts_client)
|
||||
if document_exists and document.id not in already_existing_documents:
|
||||
# Processing the first chunk of the doc and the doc exists
|
||||
_delete_typesense_doc_chunks(document.id, collection, ts_client)
|
||||
already_existing_documents.add(document.id)
|
||||
|
||||
insertion_records.add(
|
||||
DocumentInsertionRecord(
|
||||
document_id=document.id,
|
||||
already_existed=document.id in already_existing_documents,
|
||||
)
|
||||
)
|
||||
new_documents.append(
|
||||
{
|
||||
"id": typesense_id, # No minichunks for typesense
|
||||
DOCUMENT_ID: document.id,
|
||||
CHUNK_ID: chunk.chunk_id,
|
||||
BLURB: chunk.blurb,
|
||||
CONTENT: chunk.content,
|
||||
SOURCE_TYPE: str(document.source.value),
|
||||
SOURCE_LINKS: json.dumps(chunk.source_links),
|
||||
SEMANTIC_IDENTIFIER: document.semantic_identifier,
|
||||
SECTION_CONTINUATION: chunk.section_continuation,
|
||||
ALLOWED_USERS: json.dumps(chunk.access.to_acl()),
|
||||
METADATA: json.dumps(document.metadata),
|
||||
}
|
||||
)
|
||||
|
||||
if batch_upsert:
|
||||
doc_batches = [
|
||||
new_documents[x : x + DEFAULT_BATCH_SIZE]
|
||||
for x in range(0, len(new_documents), DEFAULT_BATCH_SIZE)
|
||||
]
|
||||
for doc_batch in doc_batches:
|
||||
results = ts_client.collections[collection].documents.import_(
|
||||
doc_batch, {"action": "upsert"}
|
||||
)
|
||||
failures = [
|
||||
doc_res["success"]
|
||||
for doc_res in results
|
||||
if doc_res["success"] is not True
|
||||
]
|
||||
logger.info(
|
||||
f"Indexed {len(doc_batch)} chunks into Typesense collection '{collection}', "
|
||||
f"number failed: {len(failures)}"
|
||||
)
|
||||
else:
|
||||
[
|
||||
ts_client.collections[collection].documents.upsert(document)
|
||||
for document in new_documents
|
||||
]
|
||||
|
||||
return insertion_records
|
||||
|
||||
|
||||
def _build_typesense_filters(
|
||||
user_id: UUID | None, filters: list[IndexFilter] | None
|
||||
) -> str:
|
||||
filter_str = ""
|
||||
|
||||
# Permissions filter
|
||||
if user_id:
|
||||
filter_str += f"{ALLOWED_USERS}:=[{PUBLIC_DOC_PAT},{user_id}] && "
|
||||
else:
|
||||
filter_str += f"{ALLOWED_USERS}:={PUBLIC_DOC_PAT} && "
|
||||
|
||||
# Provided query filters
|
||||
if filters:
|
||||
for filter_dict in filters:
|
||||
valid_filters = {
|
||||
key: value for key, value in filter_dict.items() if value is not None
|
||||
}
|
||||
for filter_key, filter_val in valid_filters.items():
|
||||
if isinstance(filter_val, str):
|
||||
filter_str += f"{filter_key}:={filter_val} && "
|
||||
elif isinstance(filter_val, list):
|
||||
filters_or = ",".join([str(f_val) for f_val in filter_val])
|
||||
filter_str += f"{filter_key}:=[{filters_or}] && "
|
||||
else:
|
||||
raise ValueError("Invalid filters provided")
|
||||
if filter_str[-4:] == " && ":
|
||||
filter_str = filter_str[:-4]
|
||||
return filter_str
|
||||
|
||||
|
||||
class TypesenseIndex(KeywordIndex):
|
||||
def __init__(self, index_name: str = DOCUMENT_INDEX_NAME) -> None:
|
||||
# In Typesense, the document index is referred to as a collection
|
||||
self.collection = index_name
|
||||
self.ts_client = get_typesense_client()
|
||||
|
||||
def ensure_indices_exist(self) -> None:
|
||||
if not _check_typesense_collection_exist(self.collection):
|
||||
logger.info(f"Creating Typesense collection with name: {self.collection}")
|
||||
create_typesense_collection(collection_name=self.collection)
|
||||
|
||||
def index(
|
||||
self, chunks: list[DocMetadataAwareIndexChunk]
|
||||
) -> set[DocumentInsertionRecord]:
|
||||
return _index_typesense_chunks(
|
||||
chunks=chunks,
|
||||
collection=self.collection,
|
||||
client=self.ts_client,
|
||||
)
|
||||
|
||||
def update(self, update_requests: list[UpdateRequest]) -> None:
|
||||
logger.info(
|
||||
f"Updating {len(update_requests)} documents' allowed_users in Typesense"
|
||||
)
|
||||
for update_request in update_requests:
|
||||
for id_batch in batch_generator(
|
||||
items=update_request.document_ids, batch_size=_BATCH_SIZE
|
||||
):
|
||||
if update_request.access is None:
|
||||
continue
|
||||
|
||||
typesense_updates = [
|
||||
{
|
||||
DOCUMENT_ID: doc_id,
|
||||
ALLOWED_USERS: update_request.access.to_acl(),
|
||||
}
|
||||
for doc_id in id_batch
|
||||
]
|
||||
self.ts_client.collections[self.collection].documents.import_(
|
||||
typesense_updates, {"action": "update"}
|
||||
)
|
||||
|
||||
def delete(self, doc_ids: list[str]) -> None:
|
||||
logger.info(f"Deleting {len(doc_ids)} documents from Typesense")
|
||||
for id_batch in batch_generator(items=doc_ids, batch_size=_BATCH_SIZE):
|
||||
self.ts_client.collections[self.collection].documents.delete(
|
||||
{"filter_by": f'{DOCUMENT_ID}:[{",".join(id_batch)}]'}
|
||||
)
|
||||
|
||||
def keyword_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
) -> list[InferenceChunk]:
|
||||
filters_str = _build_typesense_filters(user_id, filters)
|
||||
|
||||
search_query = {
|
||||
"q": query,
|
||||
"query_by": CONTENT,
|
||||
"filter_by": filters_str,
|
||||
"per_page": num_to_retrieve,
|
||||
"limit_hits": num_to_retrieve,
|
||||
"num_typos": 2,
|
||||
"prefix": "false",
|
||||
# below is required to allow proper partial matching of a query
|
||||
# (partial matching = only some of the terms in the query match)
|
||||
# more info here: https://typesense-community.slack.com/archives/C01P749MET0/p1688083239192799
|
||||
"exhaustive_search": "true",
|
||||
}
|
||||
|
||||
search_results = self.ts_client.collections[self.collection].documents.search(
|
||||
search_query
|
||||
)
|
||||
|
||||
hits = search_results["hits"]
|
||||
inference_chunks = [InferenceChunk.from_dict(hit["document"]) for hit in hits]
|
||||
|
||||
return inference_chunks
|
@ -4,9 +4,11 @@ import string
|
||||
import time
|
||||
from collections.abc import Mapping
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from uuid import UUID
|
||||
|
||||
import requests
|
||||
from requests import HTTPError
|
||||
@ -17,6 +19,7 @@ from danswer.chunking.models import InferenceChunk
|
||||
from danswer.configs.app_configs import DOC_TIME_DECAY
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.app_configs import EDIT_KEYWORD_QUERY
|
||||
from danswer.configs.app_configs import FAVOR_RECENT_DECAY_MULTIPLIER
|
||||
from danswer.configs.app_configs import NUM_RETURNED_HITS
|
||||
from danswer.configs.app_configs import VESPA_DEPLOYMENT_ZIP
|
||||
from danswer.configs.app_configs import VESPA_HOST
|
||||
@ -46,10 +49,9 @@ from danswer.configs.constants import SOURCE_TYPE
|
||||
from danswer.configs.constants import TITLE
|
||||
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
|
||||
from danswer.datastores.datastore_utils import get_uuid_from_chunk
|
||||
from danswer.datastores.datastore_utils import translate_to_epoch_seconds_ensure_tz
|
||||
from danswer.datastores.interfaces import DocumentIndex
|
||||
from danswer.datastores.interfaces import DocumentInsertionRecord
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.datastores.interfaces import IndexFilters
|
||||
from danswer.datastores.interfaces import UpdateRequest
|
||||
from danswer.datastores.vespa.utils import remove_invalid_unicode_chars
|
||||
from danswer.search.keyword_search import remove_stop_words
|
||||
@ -99,6 +101,16 @@ def _does_document_exist(
|
||||
return True
|
||||
|
||||
|
||||
def _vespa_get_updated_at_attribute(t: datetime | None) -> int | None:
|
||||
if not t:
|
||||
return None
|
||||
|
||||
if t.tzinfo != timezone.utc:
|
||||
raise ValueError("Connectors must provide document update time in UTC")
|
||||
|
||||
return int(t.timestamp())
|
||||
|
||||
|
||||
def _get_vespa_chunk_ids_by_document_id(
|
||||
document_id: str, hits_per_page: int = _BATCH_SIZE
|
||||
) -> list[str]:
|
||||
@ -178,7 +190,7 @@ def _index_vespa_chunk(
|
||||
METADATA: json.dumps(document.metadata),
|
||||
EMBEDDINGS: embeddings_name_vector_map,
|
||||
BOOST: DEFAULT_BOOST,
|
||||
DOC_UPDATED_AT: translate_to_epoch_seconds_ensure_tz(document.doc_updated_at),
|
||||
DOC_UPDATED_AT: _vespa_get_updated_at_attribute(document.doc_updated_at),
|
||||
PRIMARY_OWNERS: document.primary_owners,
|
||||
SECONDARY_OWNERS: document.secondary_owners,
|
||||
# the only `set` vespa has is `weightedset`, so we have to give each
|
||||
@ -275,37 +287,48 @@ def _index_vespa_chunks(
|
||||
return insertion_records
|
||||
|
||||
|
||||
def _build_vespa_filters(
|
||||
filters: list[IndexFilter] | None, include_hidden: bool = False
|
||||
) -> str:
|
||||
# NOTE: permissions filters are expected to be passed in directly via
|
||||
# the `filters` arg, which is why they are not considered explicitly here
|
||||
def _build_vespa_filters(filters: IndexFilters, include_hidden: bool = False) -> str:
|
||||
def _build_or_filters(key: str, vals: list[str] | None) -> str:
|
||||
if vals is None:
|
||||
return ""
|
||||
|
||||
# NOTE: document-set filters are also expected to be passed in directly
|
||||
# via the `filters` arg. These are set either in the Web UI or in the Slack
|
||||
# listener
|
||||
valid_vals = [val for val in vals if val]
|
||||
if not key or not valid_vals:
|
||||
return ""
|
||||
|
||||
eq_elems = [f'{key} contains "{elem}"' for elem in valid_vals]
|
||||
or_clause = " or ".join(eq_elems)
|
||||
return f"({or_clause}) and "
|
||||
|
||||
def _build_time_filter(
|
||||
cutoff: datetime | None,
|
||||
untimed_doc_cutoff: timedelta = timedelta(days=62), # Slightly over 2 Months
|
||||
) -> str:
|
||||
if not cutoff:
|
||||
return ""
|
||||
|
||||
# For Documents that don't have an updated at, filter them out for queries asking for
|
||||
# very recent documents (2 months) default
|
||||
include_untimed = datetime.now(timezone.utc) - untimed_doc_cutoff > cutoff
|
||||
cutoff_secs = int(cutoff.timestamp())
|
||||
|
||||
if include_untimed:
|
||||
# Documents without updated_at are assigned -1 as their date
|
||||
return f"!({DOC_UPDATED_AT} < {cutoff_secs}) and "
|
||||
|
||||
return f"({DOC_UPDATED_AT} >= {cutoff_secs}) and "
|
||||
|
||||
# usually ignore hidden docs unless explicitly requested. We may want to
|
||||
# get hidden docs on the admin panel to allow for un-hiding
|
||||
filter_str = f"!({HIDDEN}=true) and " if not include_hidden else ""
|
||||
|
||||
# Handle provided query filters
|
||||
if filters:
|
||||
for filter_dict in filters:
|
||||
valid_filters = {
|
||||
key: value for key, value in filter_dict.items() if value is not None
|
||||
}
|
||||
for filter_key, filter_val in valid_filters.items():
|
||||
if isinstance(filter_val, str):
|
||||
filter_str += f'{filter_key} contains "{filter_val}" and '
|
||||
elif isinstance(filter_val, list):
|
||||
eq_elems = [
|
||||
f'{filter_key} contains "{elem}"' for elem in filter_val
|
||||
]
|
||||
filters_or = " or ".join(eq_elems)
|
||||
filter_str += f"({filters_or}) and "
|
||||
else:
|
||||
raise ValueError("Invalid filters provided")
|
||||
# CAREFUL touching this one, currently there is no second ACL double-check post retrieval
|
||||
filter_str += _build_or_filters(ACCESS_CONTROL_LIST, filters.access_control_list)
|
||||
|
||||
filter_str += _build_or_filters(SOURCE_TYPE, filters.source_type)
|
||||
|
||||
filter_str += _build_or_filters(DOCUMENT_SETS, filters.document_set)
|
||||
|
||||
filter_str += _build_time_filter(filters.time_cutoff)
|
||||
|
||||
return filter_str
|
||||
|
||||
|
||||
@ -530,10 +553,11 @@ class VespaIndex(DocumentIndex):
|
||||
def keyword_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
filters: IndexFilters,
|
||||
favor_recent: bool,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
) -> list[InferenceChunk]:
|
||||
decay_multiplier = FAVOR_RECENT_DECAY_MULTIPLIER if favor_recent else 1
|
||||
vespa_where_clauses = _build_vespa_filters(filters)
|
||||
yql = (
|
||||
VespaIndex.yql_base
|
||||
@ -549,7 +573,7 @@ class VespaIndex(DocumentIndex):
|
||||
params: dict[str, str | int] = {
|
||||
"yql": yql,
|
||||
"query": query,
|
||||
"input.query(decay_factor)": str(DOC_TIME_DECAY),
|
||||
"input.query(decay_factor)": str(DOC_TIME_DECAY * decay_multiplier),
|
||||
"hits": num_to_retrieve,
|
||||
"num_to_rerank": 10 * num_to_retrieve,
|
||||
"ranking.profile": "keyword_search",
|
||||
@ -560,11 +584,12 @@ class VespaIndex(DocumentIndex):
|
||||
def semantic_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
num_to_retrieve: int,
|
||||
filters: IndexFilters,
|
||||
favor_recent: bool,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
distance_cutoff: float | None = SEARCH_DISTANCE_CUTOFF,
|
||||
) -> list[InferenceChunk]:
|
||||
decay_multiplier = FAVOR_RECENT_DECAY_MULTIPLIER if favor_recent else 1
|
||||
vespa_where_clauses = _build_vespa_filters(filters)
|
||||
yql = (
|
||||
VespaIndex.yql_base
|
||||
@ -587,7 +612,7 @@ class VespaIndex(DocumentIndex):
|
||||
"yql": yql,
|
||||
"query": query_keywords,
|
||||
"input.query(query_embedding)": str(query_embedding),
|
||||
"input.query(decay_factor)": str(DOC_TIME_DECAY),
|
||||
"input.query(decay_factor)": str(DOC_TIME_DECAY * decay_multiplier),
|
||||
"ranking.profile": "semantic_search",
|
||||
}
|
||||
|
||||
@ -596,8 +621,8 @@ class VespaIndex(DocumentIndex):
|
||||
def hybrid_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
filters: IndexFilters,
|
||||
favor_recent: bool,
|
||||
num_to_retrieve: int,
|
||||
) -> list[InferenceChunk]:
|
||||
vespa_where_clauses = _build_vespa_filters(filters)
|
||||
@ -628,8 +653,7 @@ class VespaIndex(DocumentIndex):
|
||||
def admin_retrieval(
|
||||
self,
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
filters: IndexFilters,
|
||||
num_to_retrieve: int = NUM_RETURNED_HITS,
|
||||
) -> list[InferenceChunk]:
|
||||
vespa_where_clauses = _build_vespa_filters(filters, include_hidden=True)
|
||||
|
@ -26,6 +26,8 @@ from danswer.search.models import SearchType
|
||||
from danswer.search.semantic_search import chunks_to_search_docs
|
||||
from danswer.search.semantic_search import retrieve_ranked_documents
|
||||
from danswer.secondary_llm_flows.answer_validation import get_answer_validity
|
||||
from danswer.secondary_llm_flows.extract_filters import extract_question_time_filters
|
||||
from danswer.server.models import IndexFilters
|
||||
from danswer.server.models import QAResponse
|
||||
from danswer.server.models import QuestionRequest
|
||||
from danswer.utils.logger import setup_logger
|
||||
@ -49,11 +51,14 @@ def answer_qa_query(
|
||||
llm_metrics_callback: Callable[[LLMMetricsContainer], None] | None = None,
|
||||
) -> QAResponse:
|
||||
query = question.query
|
||||
filters = question.filters
|
||||
use_keyword = question.use_keyword
|
||||
offset_count = question.offset if question.offset is not None else 0
|
||||
logger.info(f"Received QA query: {query}")
|
||||
|
||||
time_cutoff, favor_recent = extract_question_time_filters(question)
|
||||
question.filters.time_cutoff = time_cutoff
|
||||
filters = question.filters
|
||||
|
||||
query_event_id = create_query_event(
|
||||
query=query,
|
||||
selected_flow=SearchType.KEYWORD
|
||||
@ -70,22 +75,27 @@ def answer_qa_query(
|
||||
|
||||
user_id = None if user is None else user.id
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
final_filters = (filters or []) + user_acl_filters
|
||||
final_filters = IndexFilters(
|
||||
source_type=filters.source_type,
|
||||
document_set=filters.document_set,
|
||||
time_cutoff=time_cutoff,
|
||||
access_control_list=user_acl_filters,
|
||||
)
|
||||
if use_keyword:
|
||||
ranked_chunks: list[InferenceChunk] | None = retrieve_keyword_documents(
|
||||
query,
|
||||
user_id,
|
||||
final_filters,
|
||||
get_default_document_index(),
|
||||
query=query,
|
||||
filters=final_filters,
|
||||
favor_recent=favor_recent,
|
||||
datastore=get_default_document_index(),
|
||||
retrieval_metrics_callback=retrieval_metrics_callback,
|
||||
)
|
||||
unranked_chunks: list[InferenceChunk] | None = []
|
||||
else:
|
||||
ranked_chunks, unranked_chunks = retrieve_ranked_documents(
|
||||
query,
|
||||
user_id,
|
||||
final_filters,
|
||||
get_default_document_index(),
|
||||
query=query,
|
||||
filters=final_filters,
|
||||
favor_recent=favor_recent,
|
||||
datastore=get_default_document_index(),
|
||||
retrieval_metrics_callback=retrieval_metrics_callback,
|
||||
rerank_metrics_callback=rerank_metrics_callback,
|
||||
)
|
||||
@ -98,6 +108,8 @@ def answer_qa_query(
|
||||
predicted_flow=predicted_flow,
|
||||
predicted_search=predicted_search,
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
)
|
||||
|
||||
top_docs = chunks_to_search_docs(ranked_chunks)
|
||||
@ -121,6 +133,8 @@ def answer_qa_query(
|
||||
predicted_flow=QueryFlow.SEARCH,
|
||||
predicted_search=predicted_search,
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
)
|
||||
|
||||
try:
|
||||
@ -135,8 +149,10 @@ def answer_qa_query(
|
||||
lower_ranked_docs=unranked_top_docs,
|
||||
predicted_flow=predicted_flow,
|
||||
predicted_search=predicted_search,
|
||||
error_msg=str(e),
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
error_msg=str(e),
|
||||
)
|
||||
|
||||
# remove chunks marked as not applicable for QA (e.g. Google Drive file
|
||||
@ -179,8 +195,10 @@ def answer_qa_query(
|
||||
predicted_flow=predicted_flow,
|
||||
predicted_search=predicted_search,
|
||||
eval_res_valid=True if valid else False,
|
||||
error_msg=error_msg,
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
error_msg=error_msg,
|
||||
)
|
||||
|
||||
return QAResponse(
|
||||
@ -190,6 +208,8 @@ def answer_qa_query(
|
||||
lower_ranked_docs=unranked_top_docs,
|
||||
predicted_flow=predicted_flow,
|
||||
predicted_search=predicted_search,
|
||||
error_msg=error_msg,
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
error_msg=error_msg,
|
||||
)
|
||||
|
@ -1,13 +1,20 @@
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.access.access import get_acl_for_user
|
||||
from danswer.configs.constants import ACCESS_CONTROL_LIST
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.db.models import User
|
||||
from danswer.server.models import IndexFilters
|
||||
|
||||
|
||||
def build_access_filters_for_user(
|
||||
user: User | None, session: Session
|
||||
) -> list[IndexFilter]:
|
||||
def build_access_filters_for_user(user: User | None, session: Session) -> list[str]:
|
||||
user_acl = get_acl_for_user(user, session)
|
||||
return [{ACCESS_CONTROL_LIST: list(user_acl)}]
|
||||
return list(user_acl)
|
||||
|
||||
|
||||
def build_user_only_filters(user: User | None, db_session: Session) -> IndexFilters:
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
return IndexFilters(
|
||||
source_type=None,
|
||||
document_set=None,
|
||||
time_cutoff=None,
|
||||
access_control_list=user_acl_filters,
|
||||
)
|
||||
|
@ -1,6 +1,4 @@
|
||||
import json
|
||||
from collections.abc import Callable
|
||||
from uuid import UUID
|
||||
|
||||
from nltk.corpus import stopwords # type:ignore
|
||||
from nltk.stem import WordNetLemmatizer # type:ignore
|
||||
@ -10,7 +8,7 @@ from danswer.chunking.models import InferenceChunk
|
||||
from danswer.configs.app_configs import EDIT_KEYWORD_QUERY
|
||||
from danswer.configs.app_configs import NUM_RETURNED_HITS
|
||||
from danswer.datastores.interfaces import DocumentIndex
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.datastores.interfaces import IndexFilters
|
||||
from danswer.search.models import ChunkMetric
|
||||
from danswer.search.models import MAX_METRICS_CONTENT
|
||||
from danswer.search.models import RetrievalMetricsContainer
|
||||
@ -44,8 +42,8 @@ def query_processing(
|
||||
@log_function_time()
|
||||
def retrieve_keyword_documents(
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
filters: IndexFilters,
|
||||
favor_recent: bool,
|
||||
datastore: DocumentIndex,
|
||||
num_hits: int = NUM_RETURNED_HITS,
|
||||
edit_query: bool = EDIT_KEYWORD_QUERY,
|
||||
@ -54,12 +52,13 @@ def retrieve_keyword_documents(
|
||||
) -> list[InferenceChunk] | None:
|
||||
edited_query = query_processing(query) if edit_query else query
|
||||
|
||||
top_chunks = datastore.keyword_retrieval(edited_query, user_id, filters, num_hits)
|
||||
top_chunks = datastore.keyword_retrieval(
|
||||
edited_query, filters, favor_recent, num_hits
|
||||
)
|
||||
|
||||
if not top_chunks:
|
||||
filters_log_msg = json.dumps(filters, separators=(",", ":")).replace("\n", "")
|
||||
logger.warning(
|
||||
f"Keyword search returned no results - Filters: {filters_log_msg}\tEdited Query: {edited_query}"
|
||||
f"Keyword search returned no results - Filters: {filters}\tEdited Query: {edited_query}"
|
||||
)
|
||||
return None
|
||||
|
||||
|
@ -1,6 +1,4 @@
|
||||
import json
|
||||
from collections.abc import Callable
|
||||
from uuid import UUID
|
||||
|
||||
import numpy
|
||||
from sentence_transformers import SentenceTransformer # type: ignore
|
||||
@ -24,7 +22,7 @@ from danswer.configs.model_configs import SIM_SCORE_RANGE_LOW
|
||||
from danswer.configs.model_configs import SKIP_RERANKING
|
||||
from danswer.datastores.datastore_utils import translate_boost_count_to_multiplier
|
||||
from danswer.datastores.interfaces import DocumentIndex
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.datastores.interfaces import IndexFilters
|
||||
from danswer.search.models import ChunkMetric
|
||||
from danswer.search.models import Embedder
|
||||
from danswer.search.models import MAX_METRICS_CONTENT
|
||||
@ -179,8 +177,8 @@ def apply_boost(
|
||||
@log_function_time()
|
||||
def retrieve_ranked_documents(
|
||||
query: str,
|
||||
user_id: UUID | None,
|
||||
filters: list[IndexFilter] | None,
|
||||
filters: IndexFilters,
|
||||
favor_recent: bool,
|
||||
datastore: DocumentIndex,
|
||||
num_hits: int = NUM_RETURNED_HITS,
|
||||
num_rerank: int = NUM_RERANKED_RESULTS,
|
||||
@ -198,12 +196,9 @@ def retrieve_ranked_documents(
|
||||
files_log_msg = f"Top links from semantic search: {', '.join(doc_links)}"
|
||||
logger.info(files_log_msg)
|
||||
|
||||
top_chunks = datastore.semantic_retrieval(query, user_id, filters, num_hits)
|
||||
top_chunks = datastore.semantic_retrieval(query, filters, favor_recent, num_hits)
|
||||
if not top_chunks:
|
||||
filters_log_msg = json.dumps(filters, separators=(",", ":")).replace("\n", "")
|
||||
logger.warning(
|
||||
f"Semantic search returned no results with filters: {filters_log_msg}"
|
||||
)
|
||||
logger.info(f"Semantic search returned no results with filters: {filters}")
|
||||
return None, None
|
||||
logger.debug(top_chunks)
|
||||
|
||||
|
196
backend/danswer/secondary_llm_flows/extract_filters.py
Normal file
196
backend/danswer/secondary_llm_flows/extract_filters.py
Normal file
@ -0,0 +1,196 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from datetime import timedelta
|
||||
from datetime import timezone
|
||||
|
||||
from dateutil.parser import parse
|
||||
|
||||
from danswer.configs.app_configs import DISABLE_TIME_FILTER_EXTRACTION
|
||||
from danswer.llm.build import get_default_llm
|
||||
from danswer.llm.utils import dict_based_prompt_to_langchain_prompt
|
||||
from danswer.server.models import QuestionRequest
|
||||
from danswer.utils.logger import setup_logger
|
||||
from danswer.utils.timing import log_function_time
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def best_match_time(time_str: str) -> datetime | None:
|
||||
preferred_formats = ["%m/%d/%Y", "%m-%d-%Y"]
|
||||
|
||||
for fmt in preferred_formats:
|
||||
try:
|
||||
# As we don't know if the user is interacting with the API server from
|
||||
# the same timezone as the API server, just assume the queries are UTC time
|
||||
# the few hours offset (if any) shouldn't make any significant difference
|
||||
dt = datetime.strptime(time_str, fmt)
|
||||
return dt.replace(tzinfo=timezone.utc)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
# If the above formats don't match, try using dateutil's parser
|
||||
try:
|
||||
dt = parse(time_str)
|
||||
return (
|
||||
dt.astimezone(timezone.utc)
|
||||
if dt.tzinfo
|
||||
else dt.replace(tzinfo=timezone.utc)
|
||||
)
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
|
||||
@log_function_time()
|
||||
def extract_time_filter(query: str) -> tuple[datetime | None, bool]:
|
||||
"""Returns a datetime if a hard time filter should be applied for the given query
|
||||
Additionally returns a bool, True if more recently updated Documents should be
|
||||
heavily favored"""
|
||||
|
||||
def _get_time_filter_messages(query: str) -> list[dict[str, str]]:
|
||||
messages = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a tool to identify time filters to apply to a user query for "
|
||||
"a downstream search application. The downstream application is able to "
|
||||
"use a recency bias or apply a hard cutoff to remove all documents "
|
||||
"before the cutoff. Identify the correct filters to apply for the user "
|
||||
"query.\n\n"
|
||||
"Always answer with ONLY a json which contains the keys "
|
||||
'"filter_type", "filter_value", "value_multiple" and "date".\n\n'
|
||||
'The valid values for "filter_type" are "hard cutoff", '
|
||||
'"favors recent", or "not time sensitive".\n'
|
||||
'The valid values for "filter_value" are "day", "week", "month", '
|
||||
'"quarter", "half", or "year".\n'
|
||||
'The valid values for "value_multiple" is any number.\n'
|
||||
'The valid values for "date" is a date in format MM/DD/YYYY.',
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "What documents in Confluence were written in the last two quarters",
|
||||
},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": json.dumps(
|
||||
{
|
||||
"filter_type": "hard cutoff",
|
||||
"filter_value": "quarter",
|
||||
"value_multiple": 2,
|
||||
}
|
||||
),
|
||||
},
|
||||
{"role": "user", "content": "What's the latest on project Corgies?"},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": json.dumps({"filter_type": "favor recent"}),
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "Which customer asked about security features in February of 2022?",
|
||||
},
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": json.dumps(
|
||||
{"filter_type": "hard cutoff", "date": "02/01/2022"}
|
||||
),
|
||||
},
|
||||
{"role": "user", "content": query},
|
||||
]
|
||||
return messages
|
||||
|
||||
def _extract_time_filter_from_llm_out(
|
||||
model_out: str,
|
||||
) -> tuple[datetime | None, bool]:
|
||||
"""Returns a datetime for a hard cutoff and a bool for if the"""
|
||||
try:
|
||||
model_json = json.loads(model_out, strict=False)
|
||||
except json.JSONDecodeError:
|
||||
return None, False
|
||||
|
||||
# If filter type is not present, just assume something has gone wrong
|
||||
# Potentially model has identified a date and just returned that but
|
||||
# better to be conservative and not identify the wrong filter.
|
||||
if "filter_type" not in model_json:
|
||||
return None, False
|
||||
|
||||
if "hard" in model_json["filter_type"] or "recent" in model_json["filter_type"]:
|
||||
favor_recent = "recent" in model_json["filter_type"]
|
||||
|
||||
if "date" in model_json:
|
||||
extracted_time = best_match_time(model_json["date"])
|
||||
if extracted_time is not None:
|
||||
return extracted_time, favor_recent
|
||||
|
||||
time_diff = None
|
||||
multiplier = 1.0
|
||||
|
||||
if "value_multiple" in model_json:
|
||||
try:
|
||||
multiplier = float(model_json["value_multiple"])
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
if "filter_value" in model_json:
|
||||
filter_value = model_json["filter_value"]
|
||||
if "day" in filter_value:
|
||||
time_diff = timedelta(days=multiplier)
|
||||
elif "week" in filter_value:
|
||||
time_diff = timedelta(weeks=multiplier)
|
||||
elif "month" in filter_value:
|
||||
# Have to just use the average here, too complicated to calculate exact day
|
||||
# based on current day etc.
|
||||
time_diff = timedelta(days=multiplier * 30.437)
|
||||
elif "quarter" in filter_value:
|
||||
time_diff = timedelta(days=multiplier * 91.25)
|
||||
elif "year" in filter_value:
|
||||
time_diff = timedelta(days=multiplier * 365)
|
||||
|
||||
if time_diff is not None:
|
||||
current = datetime.now(timezone.utc)
|
||||
return current - time_diff, favor_recent
|
||||
|
||||
# If we failed to extract a hard filter, just pass back the value of favor recent
|
||||
return None, favor_recent
|
||||
|
||||
return None, False
|
||||
|
||||
messages = _get_time_filter_messages(query)
|
||||
filled_llm_prompt = dict_based_prompt_to_langchain_prompt(messages)
|
||||
model_output = get_default_llm().invoke(filled_llm_prompt)
|
||||
logger.debug(model_output)
|
||||
|
||||
return _extract_time_filter_from_llm_out(model_output)
|
||||
|
||||
|
||||
def extract_question_time_filters(
|
||||
question: QuestionRequest,
|
||||
disable_llm_extraction: bool = DISABLE_TIME_FILTER_EXTRACTION,
|
||||
) -> tuple[datetime | None, bool]:
|
||||
time_cutoff = question.filters.time_cutoff
|
||||
favor_recent = question.favor_recent
|
||||
# Frontend needs to be able to set this flag so that if user deletes the time filter,
|
||||
# we don't automatically reapply it. The env variable is a global disable of this feature
|
||||
# for the sake of latency
|
||||
if not question.enable_auto_detect_filters or disable_llm_extraction:
|
||||
if favor_recent is None:
|
||||
favor_recent = False
|
||||
return time_cutoff, favor_recent
|
||||
|
||||
llm_cutoff, llm_favor_recent = extract_time_filter(question.query)
|
||||
|
||||
# For all extractable filters, don't overwrite the provided values if any is provided
|
||||
if time_cutoff is None:
|
||||
time_cutoff = llm_cutoff
|
||||
|
||||
if favor_recent is None:
|
||||
favor_recent = llm_favor_recent
|
||||
|
||||
return time_cutoff, favor_recent
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Just for testing purposes, too tedious to unit test as it relies on an LLM
|
||||
while True:
|
||||
user_input = input("Query to Extract Time: ")
|
||||
cutoff, recency_bias = extract_time_filter(user_input)
|
||||
print(f"Time Cutoff: {cutoff}")
|
||||
print(f"Favor Recent: {recency_bias}")
|
@ -308,8 +308,9 @@ def handle_new_chat_message(
|
||||
response_packets = llm_chat_answer(
|
||||
messages=mainline_messages,
|
||||
persona=persona,
|
||||
user_id=user_id,
|
||||
tokenizer=llm_tokenizer,
|
||||
user=user,
|
||||
db_session=db_session,
|
||||
)
|
||||
llm_output = ""
|
||||
fetched_docs: RetrievalDocs | None = None
|
||||
@ -397,8 +398,9 @@ def regenerate_message_given_parent(
|
||||
response_packets = llm_chat_answer(
|
||||
messages=mainline_messages,
|
||||
persona=persona,
|
||||
user_id=user_id,
|
||||
tokenizer=llm_tokenizer,
|
||||
user=user,
|
||||
db_session=db_session,
|
||||
)
|
||||
llm_output = ""
|
||||
fetched_docs: RetrievalDocs | None = None
|
||||
|
@ -18,7 +18,6 @@ from danswer.configs.constants import MessageType
|
||||
from danswer.configs.constants import QAFeedbackType
|
||||
from danswer.configs.constants import SearchFeedbackType
|
||||
from danswer.connectors.models import InputType
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.db.models import AllowedAnswerFilters
|
||||
from danswer.db.models import ChannelConfig
|
||||
from danswer.db.models import Connector
|
||||
@ -166,18 +165,32 @@ class RerankedRetrievalDocs(RetrievalDocs):
|
||||
unranked_top_documents: list[SearchDoc]
|
||||
predicted_flow: QueryFlow
|
||||
predicted_search: SearchType
|
||||
time_cutoff: datetime | None
|
||||
favor_recent: bool
|
||||
|
||||
|
||||
class CreateChatSessionID(BaseModel):
|
||||
chat_session_id: int
|
||||
|
||||
|
||||
class RequestFilters(BaseModel):
|
||||
source_type: list[str] | None
|
||||
document_set: list[str] | None
|
||||
time_cutoff: datetime | None = None
|
||||
|
||||
|
||||
class IndexFilters(RequestFilters):
|
||||
access_control_list: list[str]
|
||||
|
||||
|
||||
class QuestionRequest(BaseModel):
|
||||
query: str
|
||||
collection: str
|
||||
use_keyword: bool | None
|
||||
filters: list[IndexFilter] | None
|
||||
filters: RequestFilters
|
||||
offset: int | None
|
||||
enable_auto_detect_filters: bool
|
||||
favor_recent: bool | None = None
|
||||
|
||||
|
||||
class QAFeedbackRequest(BaseModel):
|
||||
@ -266,6 +279,8 @@ class SearchResponse(BaseModel):
|
||||
top_ranked_docs: list[SearchDoc] | None
|
||||
lower_ranked_docs: list[SearchDoc] | None
|
||||
query_event_id: int
|
||||
time_cutoff: datetime | None
|
||||
favor_recent: bool
|
||||
|
||||
|
||||
class QAResponse(SearchResponse):
|
||||
|
@ -14,7 +14,6 @@ from danswer.configs.app_configs import DISABLE_GENERATIVE_AI
|
||||
from danswer.configs.app_configs import NUM_DOCUMENT_TOKENS_FED_TO_GENERATIVE_MODEL
|
||||
from danswer.configs.constants import IGNORE_FOR_QA
|
||||
from danswer.datastores.document_index import get_default_document_index
|
||||
from danswer.datastores.interfaces import IndexFilter
|
||||
from danswer.datastores.vespa.store import VespaIndex
|
||||
from danswer.db.engine import get_session
|
||||
from danswer.db.feedback import create_doc_retrieval_feedback
|
||||
@ -36,9 +35,11 @@ from danswer.search.models import QueryFlow
|
||||
from danswer.search.models import SearchType
|
||||
from danswer.search.semantic_search import chunks_to_search_docs
|
||||
from danswer.search.semantic_search import retrieve_ranked_documents
|
||||
from danswer.secondary_llm_flows.extract_filters import extract_question_time_filters
|
||||
from danswer.secondary_llm_flows.query_validation import get_query_answerability
|
||||
from danswer.secondary_llm_flows.query_validation import stream_query_answerability
|
||||
from danswer.server.models import HelperResponse
|
||||
from danswer.server.models import IndexFilters
|
||||
from danswer.server.models import QAFeedbackRequest
|
||||
from danswer.server.models import QAResponse
|
||||
from danswer.server.models import QueryValidationResponse
|
||||
@ -61,7 +62,7 @@ router = APIRouter()
|
||||
|
||||
class AdminSearchRequest(BaseModel):
|
||||
query: str
|
||||
filters: list[IndexFilter] | None = None
|
||||
filters: IndexFilters
|
||||
|
||||
|
||||
class AdminSearchResponse(BaseModel):
|
||||
@ -78,9 +79,13 @@ def admin_search(
|
||||
filters = question.filters
|
||||
logger.info(f"Received admin search query: {query}")
|
||||
|
||||
user_id = None if user is None else user.id
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
final_filters = (filters or []) + user_acl_filters
|
||||
final_filters = IndexFilters(
|
||||
source_type=filters.source_type,
|
||||
document_set=filters.document_set,
|
||||
time_cutoff=filters.time_cutoff,
|
||||
access_control_list=user_acl_filters,
|
||||
)
|
||||
document_index = get_default_document_index()
|
||||
if not isinstance(document_index, VespaIndex):
|
||||
raise HTTPException(
|
||||
@ -88,9 +93,7 @@ def admin_search(
|
||||
detail="Cannot use admin-search when using a non-Vespa document index",
|
||||
)
|
||||
|
||||
matching_chunks = document_index.admin_retrieval(
|
||||
query=query, user_id=user_id, filters=final_filters
|
||||
)
|
||||
matching_chunks = document_index.admin_retrieval(query=query, filters=final_filters)
|
||||
|
||||
documents = chunks_to_search_docs(matching_chunks)
|
||||
|
||||
@ -142,9 +145,12 @@ def semantic_search(
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> SearchResponse:
|
||||
query = question.query
|
||||
filters = question.filters
|
||||
logger.info(f"Received semantic search query: {query}")
|
||||
|
||||
time_cutoff, favor_recent = extract_question_time_filters(question)
|
||||
question.filters.time_cutoff = time_cutoff
|
||||
filters = question.filters
|
||||
|
||||
query_event_id = create_query_event(
|
||||
query=query,
|
||||
selected_flow=SearchType.SEMANTIC,
|
||||
@ -155,13 +161,25 @@ def semantic_search(
|
||||
|
||||
user_id = None if user is None else user.id
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
final_filters = (filters or []) + user_acl_filters
|
||||
final_filters = IndexFilters(
|
||||
source_type=filters.source_type,
|
||||
document_set=filters.document_set,
|
||||
time_cutoff=filters.time_cutoff,
|
||||
access_control_list=user_acl_filters,
|
||||
)
|
||||
ranked_chunks, unranked_chunks = retrieve_ranked_documents(
|
||||
query, user_id, final_filters, get_default_document_index()
|
||||
query=query,
|
||||
filters=final_filters,
|
||||
favor_recent=favor_recent,
|
||||
datastore=get_default_document_index(),
|
||||
)
|
||||
if not ranked_chunks:
|
||||
return SearchResponse(
|
||||
top_ranked_docs=None, lower_ranked_docs=None, query_event_id=query_event_id
|
||||
top_ranked_docs=None,
|
||||
lower_ranked_docs=None,
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
)
|
||||
|
||||
top_docs = chunks_to_search_docs(ranked_chunks)
|
||||
@ -177,6 +195,8 @@ def semantic_search(
|
||||
top_ranked_docs=top_docs,
|
||||
lower_ranked_docs=other_top_docs,
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
)
|
||||
|
||||
|
||||
@ -187,9 +207,12 @@ def keyword_search(
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> SearchResponse:
|
||||
query = question.query
|
||||
filters = question.filters
|
||||
logger.info(f"Received keyword search query: {query}")
|
||||
|
||||
time_cutoff, favor_recent = extract_question_time_filters(question)
|
||||
question.filters.time_cutoff = time_cutoff
|
||||
filters = question.filters
|
||||
|
||||
query_event_id = create_query_event(
|
||||
query=query,
|
||||
selected_flow=SearchType.KEYWORD,
|
||||
@ -200,13 +223,25 @@ def keyword_search(
|
||||
|
||||
user_id = None if user is None else user.id
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
final_filters = (filters or []) + user_acl_filters
|
||||
final_filters = IndexFilters(
|
||||
source_type=filters.source_type,
|
||||
document_set=filters.document_set,
|
||||
time_cutoff=filters.time_cutoff,
|
||||
access_control_list=user_acl_filters,
|
||||
)
|
||||
ranked_chunks = retrieve_keyword_documents(
|
||||
query, user_id, final_filters, get_default_document_index()
|
||||
query=query,
|
||||
filters=final_filters,
|
||||
favor_recent=favor_recent,
|
||||
datastore=get_default_document_index(),
|
||||
)
|
||||
if not ranked_chunks:
|
||||
return SearchResponse(
|
||||
top_ranked_docs=None, lower_ranked_docs=None, query_event_id=query_event_id
|
||||
top_ranked_docs=None,
|
||||
lower_ranked_docs=None,
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
)
|
||||
|
||||
top_docs = chunks_to_search_docs(ranked_chunks)
|
||||
@ -218,7 +253,11 @@ def keyword_search(
|
||||
)
|
||||
|
||||
return SearchResponse(
|
||||
top_ranked_docs=top_docs, lower_ranked_docs=None, query_event_id=query_event_id
|
||||
top_ranked_docs=top_docs,
|
||||
lower_ranked_docs=None,
|
||||
query_event_id=query_event_id,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
)
|
||||
|
||||
|
||||
@ -228,6 +267,8 @@ def direct_qa(
|
||||
user: User | None = Depends(current_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
) -> QAResponse:
|
||||
# Everything handled via answer_qa_query which is also used by default
|
||||
# for the DanswerBot flow
|
||||
return answer_qa_query(question=question, user=user, db_session=db_session)
|
||||
|
||||
|
||||
@ -255,31 +296,39 @@ def stream_direct_qa(
|
||||
) -> Generator[str, None, None]:
|
||||
answer_so_far: str = ""
|
||||
query = question.query
|
||||
filters = question.filters
|
||||
use_keyword = question.use_keyword
|
||||
offset_count = question.offset if question.offset is not None else 0
|
||||
|
||||
time_cutoff, favor_recent = extract_question_time_filters(question)
|
||||
question.filters.time_cutoff = time_cutoff
|
||||
filters = question.filters
|
||||
|
||||
predicted_search, predicted_flow = query_intent(query)
|
||||
if use_keyword is None:
|
||||
use_keyword = predicted_search == SearchType.KEYWORD
|
||||
|
||||
user_id = None if user is None else user.id
|
||||
user_acl_filters = build_access_filters_for_user(user, db_session)
|
||||
final_filters = (filters or []) + user_acl_filters
|
||||
final_filters = IndexFilters(
|
||||
source_type=filters.source_type,
|
||||
document_set=filters.document_set,
|
||||
time_cutoff=filters.time_cutoff,
|
||||
access_control_list=user_acl_filters,
|
||||
)
|
||||
if use_keyword:
|
||||
ranked_chunks: list[InferenceChunk] | None = retrieve_keyword_documents(
|
||||
query,
|
||||
user_id,
|
||||
final_filters,
|
||||
get_default_document_index(),
|
||||
query=query,
|
||||
filters=final_filters,
|
||||
favor_recent=favor_recent,
|
||||
datastore=get_default_document_index(),
|
||||
)
|
||||
unranked_chunks: list[InferenceChunk] | None = []
|
||||
else:
|
||||
ranked_chunks, unranked_chunks = retrieve_ranked_documents(
|
||||
query,
|
||||
user_id,
|
||||
final_filters,
|
||||
get_default_document_index(),
|
||||
query=query,
|
||||
filters=final_filters,
|
||||
favor_recent=favor_recent,
|
||||
datastore=get_default_document_index(),
|
||||
)
|
||||
if not ranked_chunks:
|
||||
logger.debug("No Documents Found")
|
||||
@ -304,6 +353,8 @@ def stream_direct_qa(
|
||||
if disable_generative_answer
|
||||
else predicted_flow,
|
||||
predicted_search=predicted_search,
|
||||
time_cutoff=time_cutoff,
|
||||
favor_recent=favor_recent,
|
||||
).dict()
|
||||
|
||||
logger.debug(send_packet_debug_msg.format(initial_response))
|
||||
|
@ -1,49 +0,0 @@
|
||||
import typesense # type: ignore
|
||||
from qdrant_client import QdrantClient
|
||||
|
||||
from danswer.configs.app_configs import QDRANT_API_KEY
|
||||
from danswer.configs.app_configs import QDRANT_HOST
|
||||
from danswer.configs.app_configs import QDRANT_PORT
|
||||
from danswer.configs.app_configs import QDRANT_URL
|
||||
from danswer.configs.app_configs import TYPESENSE_API_KEY
|
||||
from danswer.configs.app_configs import TYPESENSE_HOST
|
||||
from danswer.configs.app_configs import TYPESENSE_PORT
|
||||
|
||||
|
||||
_qdrant_client: QdrantClient | None = None
|
||||
_typesense_client: typesense.Client | None = None
|
||||
|
||||
|
||||
def get_qdrant_client() -> QdrantClient:
|
||||
global _qdrant_client
|
||||
if _qdrant_client is None:
|
||||
if QDRANT_URL and QDRANT_API_KEY:
|
||||
_qdrant_client = QdrantClient(url=QDRANT_URL, api_key=QDRANT_API_KEY)
|
||||
elif QDRANT_HOST and QDRANT_PORT:
|
||||
_qdrant_client = QdrantClient(host=QDRANT_HOST, port=QDRANT_PORT)
|
||||
else:
|
||||
raise Exception("Unable to instantiate QdrantClient")
|
||||
|
||||
return _qdrant_client
|
||||
|
||||
|
||||
def get_typesense_client() -> typesense.Client:
|
||||
global _typesense_client
|
||||
if _typesense_client is None:
|
||||
if TYPESENSE_HOST and TYPESENSE_PORT and TYPESENSE_API_KEY:
|
||||
_typesense_client = typesense.Client(
|
||||
{
|
||||
"api_key": TYPESENSE_API_KEY,
|
||||
"nodes": [
|
||||
{
|
||||
"host": TYPESENSE_HOST,
|
||||
"port": str(TYPESENSE_PORT),
|
||||
"protocol": "http",
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise Exception("Unable to instantiate TypesenseClient")
|
||||
|
||||
return _typesense_client
|
@ -37,7 +37,6 @@ PyGithub==1.58.2
|
||||
pypdf==3.16.4
|
||||
pytest-playwright==0.3.2
|
||||
python-multipart==0.0.6
|
||||
qdrant-client==1.2.0
|
||||
requests==2.31.0
|
||||
requests-oauthlib==1.3.1
|
||||
retry==0.9.2 # This pulls in py which is in CVE-2022-42969, must remove py from image
|
||||
@ -53,7 +52,6 @@ tiktoken==0.4.0
|
||||
torch==2.0.1
|
||||
torchvision==0.15.2
|
||||
transformers==4.30.1
|
||||
typesense==0.15.1
|
||||
uvicorn==0.21.1
|
||||
zulip==0.8.2
|
||||
hubspot-api-client==8.1.0
|
||||
|
@ -1,3 +1,4 @@
|
||||
# This file is purely for development use, not included in any builds
|
||||
import subprocess
|
||||
import threading
|
||||
|
||||
|
@ -1,25 +0,0 @@
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.utils.clients import get_typesense_client
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
ts_client = get_typesense_client()
|
||||
|
||||
page_number = 1
|
||||
per_page = 100 # number of documents to retrieve per page
|
||||
while True:
|
||||
params = {
|
||||
"q": "",
|
||||
"query_by": "content",
|
||||
"page": page_number,
|
||||
"per_page": per_page,
|
||||
}
|
||||
response = ts_client.collections[DOCUMENT_INDEX_NAME].documents.search(params)
|
||||
documents = response.get("hits")
|
||||
if not documents:
|
||||
break # if there are no more documents, break out of the loop
|
||||
|
||||
for document in documents:
|
||||
print(document)
|
||||
|
||||
page_number += 1 # move on to the next page
|
@ -1,46 +1,13 @@
|
||||
# This file is purely for development use, not included in any builds
|
||||
import requests
|
||||
from qdrant_client.http.models import Distance
|
||||
from qdrant_client.http.models import VectorParams
|
||||
from typesense.exceptions import ObjectNotFound # type: ignore
|
||||
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
|
||||
from danswer.datastores.document_index import get_default_document_index
|
||||
from danswer.datastores.document_index import SplitDocumentIndex
|
||||
from danswer.datastores.typesense.store import create_typesense_collection
|
||||
from danswer.datastores.vespa.store import DOCUMENT_ID_ENDPOINT
|
||||
from danswer.datastores.vespa.store import VespaIndex
|
||||
from danswer.utils.clients import get_qdrant_client
|
||||
from danswer.utils.clients import get_typesense_client
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
def recreate_qdrant_collection(
|
||||
collection_name: str, embedding_dim: int = DOC_EMBEDDING_DIM
|
||||
) -> None:
|
||||
logger.info(f"Attempting to recreate Qdrant collection {collection_name}")
|
||||
result = get_qdrant_client().recreate_collection(
|
||||
collection_name=collection_name,
|
||||
vectors_config=VectorParams(size=embedding_dim, distance=Distance.COSINE),
|
||||
)
|
||||
if not result:
|
||||
raise RuntimeError("Could not create Qdrant collection")
|
||||
|
||||
|
||||
def recreate_typesense_collection(collection_name: str) -> None:
|
||||
logger.info(f"Attempting to recreate Typesense collection {collection_name}")
|
||||
ts_client = get_typesense_client()
|
||||
try:
|
||||
ts_client.collections[collection_name].delete()
|
||||
except ObjectNotFound:
|
||||
logger.debug(f"Collection {collection_name} does not already exist")
|
||||
|
||||
create_typesense_collection(collection_name)
|
||||
|
||||
|
||||
def wipe_vespa_index() -> None:
|
||||
params = {"selection": "true", "cluster": DOCUMENT_INDEX_NAME}
|
||||
response = requests.delete(DOCUMENT_ID_ENDPOINT, params=params)
|
||||
@ -48,9 +15,4 @@ def wipe_vespa_index() -> None:
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
document_index = get_default_document_index()
|
||||
if isinstance(document_index, SplitDocumentIndex):
|
||||
recreate_qdrant_collection("danswer_index")
|
||||
recreate_typesense_collection("danswer_index")
|
||||
elif isinstance(document_index, VespaIndex):
|
||||
wipe_vespa_index()
|
||||
wipe_vespa_index()
|
||||
|
@ -3,28 +3,17 @@ import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
from qdrant_client.http.models.models import SnapshotDescription
|
||||
from typesense.exceptions import ObjectNotFound # type: ignore
|
||||
|
||||
from alembic import command
|
||||
from alembic.config import Config
|
||||
from danswer.configs.app_configs import DOCUMENT_INDEX_NAME
|
||||
from danswer.configs.app_configs import POSTGRES_DB
|
||||
from danswer.configs.app_configs import POSTGRES_HOST
|
||||
from danswer.configs.app_configs import POSTGRES_PASSWORD
|
||||
from danswer.configs.app_configs import POSTGRES_PORT
|
||||
from danswer.configs.app_configs import POSTGRES_USER
|
||||
from danswer.configs.app_configs import QDRANT_HOST
|
||||
from danswer.configs.app_configs import QDRANT_PORT
|
||||
from danswer.datastores.qdrant.utils import create_qdrant_collection
|
||||
from danswer.datastores.qdrant.utils import list_qdrant_collections
|
||||
from danswer.datastores.typesense.store import create_typesense_collection
|
||||
from danswer.datastores.vespa.store import DOCUMENT_ID_ENDPOINT
|
||||
from danswer.utils.clients import get_qdrant_client
|
||||
from danswer.utils.clients import get_typesense_client
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
@ -51,80 +40,6 @@ def load_postgres(filename: str) -> None:
|
||||
)
|
||||
|
||||
|
||||
def snapshot_time_compare(snap: SnapshotDescription) -> datetime:
|
||||
if not hasattr(snap, "creation_time") or snap.creation_time is None:
|
||||
raise RuntimeError("Qdrant Snapshots Failed")
|
||||
return datetime.strptime(snap.creation_time, "%Y-%m-%dT%H:%M:%S")
|
||||
|
||||
|
||||
def save_qdrant(filename: str) -> None:
|
||||
logger.info("Attempting to take Qdrant snapshot")
|
||||
qdrant_client = get_qdrant_client()
|
||||
qdrant_client.create_snapshot(collection_name=DOCUMENT_INDEX_NAME)
|
||||
snapshots = qdrant_client.list_snapshots(collection_name=DOCUMENT_INDEX_NAME)
|
||||
valid_snapshots = [snap for snap in snapshots if snap.creation_time is not None]
|
||||
|
||||
sorted_snapshots = sorted(valid_snapshots, key=snapshot_time_compare)
|
||||
last_snapshot_name = sorted_snapshots[-1].name
|
||||
url = f"http://{QDRANT_HOST}:{QDRANT_PORT}/collections/{DOCUMENT_INDEX_NAME}/snapshots/{last_snapshot_name}"
|
||||
|
||||
response = requests.get(url, stream=True)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Qdrant Save Failed")
|
||||
|
||||
with open(filename, "wb") as file:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
file.write(chunk)
|
||||
|
||||
|
||||
def load_qdrant(filename: str) -> None:
|
||||
logger.info("Attempting to load Qdrant snapshot")
|
||||
if DOCUMENT_INDEX_NAME not in {
|
||||
collection.name for collection in list_qdrant_collections().collections
|
||||
}:
|
||||
create_qdrant_collection(DOCUMENT_INDEX_NAME)
|
||||
snapshot_url = f"http://{QDRANT_HOST}:{QDRANT_PORT}/collections/{DOCUMENT_INDEX_NAME}/snapshots/"
|
||||
|
||||
with open(filename, "rb") as f:
|
||||
files = {"snapshot": (os.path.basename(filename), f)}
|
||||
response = requests.post(snapshot_url + "upload", files=files)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Qdrant Snapshot Upload Failed")
|
||||
|
||||
data = {"location": snapshot_url + os.path.basename(filename)}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
response = requests.put(
|
||||
snapshot_url + "recover", data=json.dumps(data), headers=headers
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError("Loading Qdrant Snapshot Failed")
|
||||
|
||||
|
||||
def save_typesense(filename: str) -> None:
|
||||
logger.info("Attempting to take Typesense snapshot")
|
||||
ts_client = get_typesense_client()
|
||||
all_docs = ts_client.collections[DOCUMENT_INDEX_NAME].documents.export()
|
||||
with open(filename, "w") as f:
|
||||
f.write(all_docs)
|
||||
|
||||
|
||||
def load_typesense(filename: str) -> None:
|
||||
logger.info("Attempting to load Typesense snapshot")
|
||||
ts_client = get_typesense_client()
|
||||
try:
|
||||
ts_client.collections[DOCUMENT_INDEX_NAME].delete()
|
||||
except ObjectNotFound:
|
||||
pass
|
||||
|
||||
create_typesense_collection(DOCUMENT_INDEX_NAME)
|
||||
|
||||
with open(filename) as jsonl_file:
|
||||
ts_client.collections[DOCUMENT_INDEX_NAME].documents.import_(
|
||||
jsonl_file.read().encode("utf-8"), {"action": "create"}
|
||||
)
|
||||
|
||||
|
||||
def save_vespa(filename: str) -> None:
|
||||
logger.info("Attempting to take Vespa snapshot")
|
||||
continuation = ""
|
||||
@ -189,10 +104,6 @@ if __name__ == "__main__":
|
||||
if args.load:
|
||||
load_postgres(os.path.join(checkpoint_dir, "postgres_snapshot.tar"))
|
||||
load_vespa(os.path.join(checkpoint_dir, "vespa_snapshot.jsonl"))
|
||||
# load_qdrant(os.path.join(checkpoint_dir, "qdrant.snapshot"))
|
||||
# load_typesense(os.path.join(checkpoint_dir, "typesense_snapshot.jsonl"))
|
||||
else:
|
||||
save_postgres(os.path.join(checkpoint_dir, "postgres_snapshot.tar"))
|
||||
save_vespa(os.path.join(checkpoint_dir, "vespa_snapshot.jsonl"))
|
||||
# save_qdrant(os.path.join(checkpoint_dir, "qdrant.snapshot"))
|
||||
# save_typesense(os.path.join(checkpoint_dir, "typesense_snapshot.jsonl"))
|
||||
|
@ -8,11 +8,13 @@ from typing import TextIO
|
||||
import yaml
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from danswer.access.access import get_acl_for_user
|
||||
from danswer.db.engine import get_sqlalchemy_engine
|
||||
from danswer.direct_qa.answer_question import answer_qa_query
|
||||
from danswer.direct_qa.models import LLMMetricsContainer
|
||||
from danswer.search.models import RerankMetricsContainer
|
||||
from danswer.search.models import RetrievalMetricsContainer
|
||||
from danswer.server.models import IndexFilters
|
||||
from danswer.server.models import QuestionRequest
|
||||
from danswer.utils.callbacks import MetricsHander
|
||||
|
||||
@ -74,11 +76,18 @@ def get_answer_for_question(
|
||||
RerankMetricsContainer | None,
|
||||
LLMMetricsContainer | None,
|
||||
]:
|
||||
filters = IndexFilters(
|
||||
source_type=None,
|
||||
document_set=None,
|
||||
time_cutoff=None,
|
||||
access_control_list=list(get_acl_for_user(user=None)),
|
||||
)
|
||||
question = QuestionRequest(
|
||||
query=query,
|
||||
collection="danswer_index",
|
||||
use_keyword=False,
|
||||
filters=None,
|
||||
filters=filters,
|
||||
enable_auto_detect_filters=False,
|
||||
offset=None,
|
||||
)
|
||||
|
||||
|
@ -27,7 +27,6 @@ Requirements: Docker and docker compose
|
||||
- `docker compose -f docker-compose.dev.yml -p danswer-stack up -d --pull always --force-recreate`
|
||||
- or run: `docker compose -f docker-compose.dev.yml -p danswer-stack up -d --build --force-recreate`
|
||||
to build from source
|
||||
- This will start Web/API servers, Postgres (backend DB), Qdrant (vector DB), and the background indexing job.
|
||||
- Downloading images or packages/requirements may take 15+ minutes depending on your internet connection.
|
||||
|
||||
|
||||
|
@ -1,154 +0,0 @@
|
||||
# This legacy version runs the app with typesense and qdrant together as the document indices
|
||||
# Danswer is moving forward with Vespa to offer a consolidated index and a better search experience
|
||||
version: '3'
|
||||
services:
|
||||
api_server:
|
||||
image: danswer/danswer-backend:latest
|
||||
build:
|
||||
context: ../../backend
|
||||
dockerfile: Dockerfile
|
||||
command: >
|
||||
/bin/sh -c "alembic upgrade head &&
|
||||
echo \"Starting Danswer Api Server\" &&
|
||||
uvicorn danswer.main:app --host 0.0.0.0 --port 8080"
|
||||
depends_on:
|
||||
- relational_db
|
||||
- vector_db
|
||||
- search_engine
|
||||
restart: always
|
||||
ports:
|
||||
- "8080:8080"
|
||||
environment:
|
||||
- DOCUMENT_INDEX_TYPE=split
|
||||
- INTERNAL_MODEL_VERSION=${INTERNAL_MODEL_VERSION:-openai-chat-completion}
|
||||
- GEN_AI_MODEL_VERSION=${GEN_AI_MODEL_VERSION:-gpt-3.5-turbo}
|
||||
- GEN_AI_API_KEY=${GEN_AI_API_KEY:-}
|
||||
- GEN_AI_ENDPOINT=${GEN_AI_ENDPOINT:-}
|
||||
- GEN_AI_HOST_TYPE=${GEN_AI_HOST_TYPE:-}
|
||||
- POSTGRES_HOST=relational_db
|
||||
- QDRANT_HOST=vector_db
|
||||
- TYPESENSE_HOST=search_engine
|
||||
- TYPESENSE_API_KEY=${TYPESENSE_API_KEY:-typesense_api_key}
|
||||
- LOG_LEVEL=${LOG_LEVEL:-info}
|
||||
- AUTH_TYPE=${AUTH_TYPE:-disabled}
|
||||
- QA_TIMEOUT=${QA_TIMEOUT:-}
|
||||
- GOOGLE_OAUTH_CLIENT_ID=${GOOGLE_OAUTH_CLIENT_ID:-}
|
||||
- GOOGLE_OAUTH_CLIENT_SECRET=${GOOGLE_OAUTH_CLIENT_SECRET:-}
|
||||
- DISABLE_GENERATIVE_AI=${DISABLE_GENERATIVE_AI:-}
|
||||
- API_BASE_OPENAI=${API_BASE_OPENAI:-}
|
||||
- API_TYPE_OPENAI=${API_TYPE_OPENAI:-}
|
||||
- API_VERSION_OPENAI=${API_VERSION_OPENAI:-}
|
||||
- AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-}
|
||||
volumes:
|
||||
- local_dynamic_storage:/home/storage
|
||||
- file_connector_tmp_storage:/home/file_connector_storage
|
||||
- model_cache_torch:/root/.cache/torch/
|
||||
- model_cache_nltk:/root/nltk_data/
|
||||
- model_cache_huggingface:/root/.cache/huggingface/
|
||||
background:
|
||||
image: danswer/danswer-backend:latest
|
||||
build:
|
||||
context: ../../backend
|
||||
dockerfile: Dockerfile
|
||||
command: /usr/bin/supervisord
|
||||
depends_on:
|
||||
- relational_db
|
||||
- vector_db
|
||||
restart: always
|
||||
environment:
|
||||
- DOCUMENT_INDEX_TYPE=split
|
||||
- INTERNAL_MODEL_VERSION=${INTERNAL_MODEL_VERSION:-openai-chat-completion}
|
||||
- GEN_AI_MODEL_VERSION=${GEN_AI_MODEL_VERSION:-gpt-3.5-turbo}
|
||||
- GEN_AI_API_KEY=${GEN_AI_API_KEY:-}
|
||||
- GEN_AI_ENDPOINT=${GEN_AI_ENDPOINT:-}
|
||||
- GEN_AI_HOST_TYPE=${GEN_AI_HOST_TYPE:-}
|
||||
- NUM_DOCUMENT_TOKENS_FED_TO_GENERATIVE_MODEL=${NUM_DOCUMENT_TOKENS_FED_TO_GENERATIVE_MODEL:-}
|
||||
- POSTGRES_HOST=relational_db
|
||||
- QDRANT_HOST=vector_db
|
||||
- TYPESENSE_HOST=search_engine
|
||||
- TYPESENSE_API_KEY=${TYPESENSE_API_KEY:-typesense_api_key}
|
||||
- API_BASE_OPENAI=${API_BASE_OPENAI:-}
|
||||
- API_TYPE_OPENAI=${API_TYPE_OPENAI:-}
|
||||
- API_VERSION_OPENAI=${API_VERSION_OPENAI:-}
|
||||
- AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-}
|
||||
- CONTINUE_ON_CONNECTOR_FAILURE=${CONTINUE_ON_CONNECTOR_FAILURE:-}
|
||||
- NUM_INDEXING_WORKERS=${NUM_INDEXING_WORKERS:-}
|
||||
- DANSWER_BOT_SLACK_APP_TOKEN=${DANSWER_BOT_SLACK_APP_TOKEN:-}
|
||||
- DANSWER_BOT_SLACK_BOT_TOKEN=${DANSWER_BOT_SLACK_BOT_TOKEN:-}
|
||||
- LOG_LEVEL=${LOG_LEVEL:-info}
|
||||
volumes:
|
||||
- local_dynamic_storage:/home/storage
|
||||
- file_connector_tmp_storage:/home/file_connector_storage
|
||||
- model_cache_torch:/root/.cache/torch/
|
||||
- model_cache_nltk:/root/nltk_data/
|
||||
- model_cache_huggingface:/root/.cache/huggingface/
|
||||
web_server:
|
||||
image: danswer/danswer-web-server:latest
|
||||
build:
|
||||
context: ../../web
|
||||
dockerfile: Dockerfile
|
||||
args:
|
||||
- NEXT_PUBLIC_DISABLE_STREAMING=${NEXT_PUBLIC_DISABLE_STREAMING:-false}
|
||||
depends_on:
|
||||
- api_server
|
||||
restart: always
|
||||
environment:
|
||||
- INTERNAL_URL=http://api_server:8080
|
||||
- WEB_DOMAIN=${WEB_DOMAIN:-}
|
||||
- OAUTH_NAME=${OAUTH_NAME:-}
|
||||
relational_db:
|
||||
image: postgres:15.2-alpine
|
||||
restart: always
|
||||
environment:
|
||||
- POSTGRES_USER=${POSTGRES_USER:-postgres}
|
||||
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD:-password}
|
||||
ports:
|
||||
- "5432:5432"
|
||||
volumes:
|
||||
- db_volume:/var/lib/postgresql/data
|
||||
vector_db:
|
||||
image: qdrant/qdrant:v1.3.0
|
||||
restart: always
|
||||
environment:
|
||||
- QDRANT__TELEMETRY_DISABLED=true
|
||||
ports:
|
||||
- "6333:6333"
|
||||
volumes:
|
||||
- qdrant_volume:/qdrant/storage
|
||||
search_engine:
|
||||
image: typesense/typesense:0.24.1
|
||||
restart: always
|
||||
environment:
|
||||
- TYPESENSE_API_KEY=${TYPESENSE_API_KEY:-typesense_api_key}
|
||||
- TYPESENSE_DATA_DIR=/typesense/storage
|
||||
ports:
|
||||
- "8108:8108"
|
||||
volumes:
|
||||
- typesense_volume:/typesense/storage
|
||||
nginx:
|
||||
image: nginx:1.23.4-alpine
|
||||
restart: always
|
||||
# nginx will immediately crash with `nginx: [emerg] host not found in upstream`
|
||||
# if api_server / web_server are not up
|
||||
depends_on:
|
||||
- api_server
|
||||
- web_server
|
||||
environment:
|
||||
- DOMAIN=localhost
|
||||
ports:
|
||||
- "80:80"
|
||||
- "3000:80" # allow for localhost:3000 usage, since that is the norm
|
||||
volumes:
|
||||
- ../data/nginx:/etc/nginx/conf.d
|
||||
command: >
|
||||
/bin/sh -c "envsubst '$$\{DOMAIN\}' < /etc/nginx/conf.d/app.conf.template.dev > /etc/nginx/conf.d/app.conf &&
|
||||
while :; do sleep 6h & wait $${!}; nginx -s reload; done & nginx -g \"daemon off;\""
|
||||
volumes:
|
||||
local_dynamic_storage:
|
||||
file_connector_tmp_storage: # used to store files uploaded by the user temporarily while we are indexing them
|
||||
db_volume:
|
||||
qdrant_volume:
|
||||
typesense_volume:
|
||||
model_cache_torch:
|
||||
model_cache_nltk:
|
||||
model_cache_huggingface:
|
@ -34,6 +34,8 @@ services:
|
||||
- API_TYPE_OPENAI=${API_TYPE_OPENAI:-}
|
||||
- API_VERSION_OPENAI=${API_VERSION_OPENAI:-}
|
||||
- AZURE_DEPLOYMENT_ID=${AZURE_DEPLOYMENT_ID:-}
|
||||
- NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP=${NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP:-}
|
||||
- DISABLE_TIME_FILTER_EXTRACTION=${DISABLE_TIME_FILTER_EXTRACTION:-}
|
||||
# Don't change the NLP model configs unless you know what you're doing
|
||||
- DOCUMENT_ENCODER_MODEL=${DOCUMENT_ENCODER_MODEL:-}
|
||||
- NORMALIZE_EMBEDDINGS=${NORMALIZE_EMBEDDINGS:-}
|
||||
@ -44,7 +46,6 @@ services:
|
||||
- SKIP_RERANKING=${SKIP_RERANKING:-}
|
||||
# Set to debug to get more fine-grained logs
|
||||
- LOG_LEVEL=${LOG_LEVEL:-info}
|
||||
- NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP=${NOTION_CONNECTOR_ENABLE_RECURSIVE_PAGE_LOOKUP:-}
|
||||
volumes:
|
||||
- local_dynamic_storage:/home/storage
|
||||
- file_connector_tmp_storage:/home/file_connector_storage
|
||||
|
@ -1,132 +0,0 @@
|
||||
version: '3'
|
||||
services:
|
||||
api_server:
|
||||
image: danswer/danswer-backend:latest
|
||||
build:
|
||||
context: ../../backend
|
||||
dockerfile: Dockerfile
|
||||
command: >
|
||||
/bin/sh -c "alembic upgrade head &&
|
||||
echo \"Starting Danswer Api Server\" &&
|
||||
uvicorn danswer.main:app --host 0.0.0.0 --port 8080"
|
||||
depends_on:
|
||||
- relational_db
|
||||
- vector_db
|
||||
- search_engine
|
||||
restart: always
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
- AUTH_TYPE=${AUTH_TYPE:-google_oauth}
|
||||
- DOCUMENT_INDEX_TYPE=split
|
||||
- POSTGRES_HOST=relational_db
|
||||
- QDRANT_HOST=vector_db
|
||||
- TYPESENSE_HOST=search_engine
|
||||
volumes:
|
||||
- local_dynamic_storage:/home/storage
|
||||
- file_connector_tmp_storage:/home/file_connector_storage
|
||||
- model_cache_torch:/root/.cache/torch/
|
||||
- model_cache_nltk:/root/nltk_data/
|
||||
- model_cache_huggingface:/root/.cache/huggingface/
|
||||
background:
|
||||
image: danswer/danswer-backend:latest
|
||||
build:
|
||||
context: ../../backend
|
||||
dockerfile: Dockerfile
|
||||
command: /usr/bin/supervisord
|
||||
depends_on:
|
||||
- relational_db
|
||||
- vector_db
|
||||
restart: always
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
- AUTH_TYPE=${AUTH_TYPE:-google_oauth}
|
||||
- DOCUMENT_INDEX_TYPE=split
|
||||
- POSTGRES_HOST=relational_db
|
||||
- QDRANT_HOST=vector_db
|
||||
- TYPESENSE_HOST=search_engine
|
||||
volumes:
|
||||
- local_dynamic_storage:/home/storage
|
||||
- file_connector_tmp_storage:/home/file_connector_storage
|
||||
- model_cache_torch:/root/.cache/torch/
|
||||
- model_cache_nltk:/root/nltk_data/
|
||||
- model_cache_huggingface:/root/.cache/huggingface/
|
||||
web_server:
|
||||
image: danswer/danswer-web-server:latest
|
||||
build:
|
||||
context: ../../web
|
||||
dockerfile: Dockerfile
|
||||
args:
|
||||
- NEXT_PUBLIC_DISABLE_STREAMING=${NEXT_PUBLIC_DISABLE_STREAMING:-false}
|
||||
depends_on:
|
||||
- api_server
|
||||
restart: always
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
- INTERNAL_URL=http://api_server:8080
|
||||
relational_db:
|
||||
image: postgres:15.2-alpine
|
||||
restart: always
|
||||
# POSTGRES_USER and POSTGRES_PASSWORD should be set in .env file
|
||||
env_file:
|
||||
- .env
|
||||
volumes:
|
||||
- db_volume:/var/lib/postgresql/data
|
||||
vector_db:
|
||||
image: qdrant/qdrant:v1.3.0
|
||||
restart: always
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
- QDRANT__TELEMETRY_DISABLED=true
|
||||
volumes:
|
||||
- qdrant_volume:/qdrant/storage
|
||||
search_engine:
|
||||
image: typesense/typesense:0.24.1
|
||||
restart: always
|
||||
# TYPESENSE_API_KEY must be set in .env file
|
||||
environment:
|
||||
- TYPESENSE_DATA_DIR=/typesense/storage
|
||||
env_file:
|
||||
- .env
|
||||
volumes:
|
||||
- typesense_volume:/typesense/storage
|
||||
nginx:
|
||||
image: nginx:1.23.4-alpine
|
||||
restart: always
|
||||
# nginx will immediately crash with `nginx: [emerg] host not found in upstream`
|
||||
# if api_server / web_server are not up
|
||||
depends_on:
|
||||
- api_server
|
||||
- web_server
|
||||
ports:
|
||||
- "80:80"
|
||||
- "443:443"
|
||||
volumes:
|
||||
- ../data/nginx:/etc/nginx/conf.d
|
||||
- ../data/certbot/conf:/etc/letsencrypt
|
||||
- ../data/certbot/www:/var/www/certbot
|
||||
command: >
|
||||
/bin/sh -c "envsubst '$$\{DOMAIN\}' < /etc/nginx/conf.d/app.conf.template > /etc/nginx/conf.d/app.conf
|
||||
&& while :; do sleep 6h & wait $${!}; nginx -s reload; done & nginx -g \"daemon off;\""
|
||||
env_file:
|
||||
- .env.nginx
|
||||
# follows https://pentacent.medium.com/nginx-and-lets-encrypt-with-docker-in-less-than-5-minutes-b4b8a60d3a71
|
||||
certbot:
|
||||
image: certbot/certbot
|
||||
restart: always
|
||||
volumes:
|
||||
- ../data/certbot/conf:/etc/letsencrypt
|
||||
- ../data/certbot/www:/var/www/certbot
|
||||
entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'"
|
||||
volumes:
|
||||
local_dynamic_storage:
|
||||
file_connector_tmp_storage: # used to store files uploaded by the user temporarily while we are indexing them
|
||||
db_volume:
|
||||
qdrant_volume:
|
||||
typesense_volume:
|
||||
model_cache_torch:
|
||||
model_cache_nltk:
|
||||
model_cache_huggingface:
|
@ -36,11 +36,8 @@ export const searchRequest = async ({
|
||||
query,
|
||||
collection: "danswer_index",
|
||||
use_keyword: useKeyword,
|
||||
...(filters.length > 0
|
||||
? {
|
||||
filters,
|
||||
}
|
||||
: {}),
|
||||
filters,
|
||||
enable_auto_detect_filters: false,
|
||||
offset: offset,
|
||||
}),
|
||||
headers: {
|
||||
|
@ -82,11 +82,8 @@ export const searchRequestStreamed = async ({
|
||||
query,
|
||||
collection: "danswer_index",
|
||||
use_keyword: useKeyword,
|
||||
...(filters.length > 0
|
||||
? {
|
||||
filters,
|
||||
}
|
||||
: {}),
|
||||
filters,
|
||||
enable_auto_detect_filters: false,
|
||||
offset: offset,
|
||||
}),
|
||||
headers: {
|
||||
|
@ -10,13 +10,20 @@ export const questionValidationStreamed = async <T>({
|
||||
query,
|
||||
update,
|
||||
}: QuestionValidationArgs) => {
|
||||
const emptyFilters = {
|
||||
source_type: null,
|
||||
document_set: null,
|
||||
time_cutoff: null,
|
||||
};
|
||||
|
||||
const response = await fetch("/api/stream-query-validation", {
|
||||
method: "POST",
|
||||
body: JSON.stringify({
|
||||
query,
|
||||
collection: "danswer_index",
|
||||
use_keyword: null,
|
||||
filters: null,
|
||||
filters: emptyFilters,
|
||||
enable_auto_detect_filters: false,
|
||||
offset: null,
|
||||
}),
|
||||
headers: {
|
||||
|
@ -1,16 +1,13 @@
|
||||
import { Source } from "./interfaces";
|
||||
|
||||
export const buildFilters = (sources: Source[], documentSets: string[]) => {
|
||||
const filters = [];
|
||||
if (sources.length > 0) {
|
||||
filters.push({
|
||||
source_type: sources.map((source) => source.internalName),
|
||||
});
|
||||
}
|
||||
if (documentSets.length > 0) {
|
||||
filters.push({
|
||||
document_sets: documentSets,
|
||||
});
|
||||
}
|
||||
const filters = {
|
||||
source_type:
|
||||
sources.length > 0 ? sources.map((source) => source.internalName) : null,
|
||||
document_set: documentSets.length > 0 ? documentSets : null,
|
||||
// TODO make this a date selector
|
||||
time_cutoff: null,
|
||||
};
|
||||
|
||||
return filters;
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user