valid vespa indexing + search for multi tenant use case

This commit is contained in:
pablodanswer 2024-09-26 12:44:27 -07:00
parent daad96d180
commit f0a5ec223f
8 changed files with 40 additions and 22 deletions

View File

@ -105,7 +105,7 @@ def _run_indexing(
document_index = get_default_document_index(
primary_index_name=index_name, secondary_index_name=None
)
embedding_model = DefaultIndexingEmbedder.from_db_search_settings(
search_settings=search_settings
@ -202,7 +202,7 @@ def _run_indexing(
db_session.refresh(index_attempt)
if index_attempt.status != IndexingStatus.IN_PROGRESS:
# Likely due to user manually disabling it or model swap
raise RuntimeError("Index Attempt was canceled")
raise RuntimeError(f"Index Attempt was canceled, status is {index_attempt.status}")
batch_description = []
for doc in doc_batch:

View File

@ -271,6 +271,7 @@ def cleanup_indexing_jobs(
# batch of documents indexed
current_db_time = get_db_current_time(db_session=db_session)
time_since_update = current_db_time - index_attempt.time_updated
logger.info("ERRORS 1")
if time_since_update.total_seconds() > 60 * 60 * timeout_hours:
existing_jobs[index_attempt.id].cancel()
_mark_run_failed(
@ -280,6 +281,8 @@ def cleanup_indexing_jobs(
"The run will be re-attempted at next scheduled indexing time.",
)
else:
logger.info(f"ERRORS 2 {tenant_id} {len(existing_jobs)}")
continue
# If job isn't known, simply mark it as failed
_mark_run_failed(
db_session=db_session,

View File

@ -335,12 +335,15 @@ def query_vespa(
return inference_chunks
def _get_chunks_via_batch_search(
index_name: str,
chunk_requests: list[VespaChunkRequest],
filters: IndexFilters,
get_large_chunks: bool = False,
) -> list[InferenceChunkUncleaned]:
print("GET CHUNKS")
if not chunk_requests:
return []

View File

@ -178,8 +178,9 @@ def _index_vespa_chunk(
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
}
vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_chunk_id}"
logger.debug(f'Indexing to URL "{vespa_url}"')
logger.debug(f'Indexing to URL "{vespa_url}" with TENANT ID "{chunk.tenant_id}"')
res = http_client.post(
vespa_url, headers=json_header, json={"fields": vespa_document_fields}
)

View File

@ -20,6 +20,8 @@ logger = setup_logger()
def build_vespa_filters(filters: IndexFilters, include_hidden: bool = False) -> str:
print("\n\n\n\n\nzzzzzzzzzzBUILDING VVESPA FILTESR")
def _build_or_filters(key: str, vals: list[str] | None) -> str:
if vals is None:
return ""
@ -55,7 +57,10 @@ def build_vespa_filters(filters: IndexFilters, include_hidden: bool = False) ->
filter_str = f"!({HIDDEN}=true) and " if not include_hidden else ""
if filters.tenant_id:
filter_str += f"({TENANT_ID} contains {filters.tenant_id}) and "
print("TENANT ID")
filter_str += f'({TENANT_ID} contains "{filters.tenant_id}") and '
else:
print("NO TENANT ID")
# CAREFUL touching this one, currently there is no second ACL double-check post retrieval
if filters.access_control_list is not None:

View File

@ -212,6 +212,7 @@ def stream_answer_objects(
answer_style_config=answer_config,
prompt_config=PromptConfig.from_model(prompt),
llm=get_main_llm_from_tuple(get_llms_for_persona(persona=chat_session.persona, db_session=db_session)),
# TODO: change back
single_message_history=history_str,
tools=[search_tool],
force_use_tool=ForceUseTool(

View File

@ -29,7 +29,7 @@ from danswer.utils.logger import setup_logger
from danswer.utils.threadpool_concurrency import FunctionCall
from danswer.utils.threadpool_concurrency import run_functions_in_parallel
from danswer.utils.timing import log_function_time
from danswer.db.engine import current_tenant_id
logger = setup_logger()
@ -151,12 +151,15 @@ def retrieval_preprocessing(
user_acl_filters = (
None if bypass_acl else build_access_filters_for_user(user, db_session)
)
print("building filtes")
print(current_tenant_id.get())
final_filters = IndexFilters(
source_type=preset_filters.source_type or predicted_source_filters,
document_set=preset_filters.document_set,
time_cutoff=preset_filters.time_cutoff or predicted_time_cutoff,
tags=preset_filters.tags, # Tags are never auto-extracted
access_control_list=user_acl_filters,
tenant_id="4990f0d0-8447-4476-b6a0-53f4938654c1" # TODO FIX
)
llm_evaluation_type = LLMEvaluationType.BASIC

View File

@ -30,23 +30,25 @@ const nextConfig = {
if (process.env.NODE_ENV === "production") return defaultRedirects;
return defaultRedirects.concat([
{
source: "/api/chat/send-message:params*",
destination: "http://127.0.0.1:8080/chat/send-message:params*", // Proxy to Backend
permanent: true,
},
{
source: "/api/query/stream-answer-with-quote:params*",
destination:
"http://127.0.0.1:8080/query/stream-answer-with-quote:params*", // Proxy to Backend
permanent: true,
},
{
source: "/api/query/stream-query-validation:params*",
destination:
"http://127.0.0.1:8080/query/stream-query-validation:params*", // Proxy to Backend
permanent: true,
},
// TODO: validate the db sesion in tenancy for loca dev
// {
// source: "/api/chat/send-message:params*",
// destination: "http://127.0.0.1:8080/chat/send-message:params*", // Proxy to Backend
// permanent: true,
// },
// {
// source: "/api/query/stream-answer-with-quote:params*",
// destination:
// "http://127.0.0.1:8080/query/stream-answer-with-quote:params*", // Proxy to Backend
// permanent: true,
// },
// {
// source: "/api/query/stream-query-validation:params*",
// destination:
// "http://127.0.0.1:8080/query/stream-query-validation:params*", // Proxy to Backend
// permanent: true,
// },
]);
},
publicRuntimeConfig: {