tenant seeding docs

This commit is contained in:
pablodanswer 2024-10-25 18:56:07 -07:00
parent 9b147ae437
commit f85c2059d3
8 changed files with 21 additions and 11 deletions

View File

@ -78,6 +78,7 @@ tasks_to_schedule = [
},
]
# Build the celery beat schedule dynamically
beat_schedule = {}

View File

@ -311,6 +311,8 @@ class VespaIndex(DocumentIndex):
with updating the associated permissions. Assumes that a document will not be split into
multiple chunk batches calling this function multiple times, otherwise only the last set of
chunks will be kept"""
print("INDEXING")
print(chunks[0].tenant_id)
# IMPORTANT: This must be done one index at a time, do not use secondary index here
cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks]
@ -322,10 +324,12 @@ class VespaIndex(DocumentIndex):
concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor,
get_vespa_http_client() as http_client,
):
print("existing docs")
# Check for existing documents, existing documents need to have all of their chunks deleted
# prior to indexing as the document size (num chunks) may have shrunk
first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0]
for chunk_batch in batch_generator(first_chunks, BATCH_SIZE):
print("batch")
existing_docs.update(
get_existing_documents_from_chunks(
chunks=chunk_batch,
@ -334,6 +338,7 @@ class VespaIndex(DocumentIndex):
executor=executor,
)
)
print("delete docs ")
for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE):
delete_vespa_docs(
@ -343,6 +348,7 @@ class VespaIndex(DocumentIndex):
executor=executor,
)
print("index chunks")
for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE):
batch_index_vespa_chunks(
chunks=chunk_batch,

View File

@ -57,7 +57,7 @@ def _does_document_exist(
chunk. This checks for whether the chunk exists already in the index"""
doc_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}"
doc_fetch_response = http_client.get(doc_url)
print("doc fetch response")
if doc_fetch_response.status_code == 404:
return False
@ -96,6 +96,7 @@ def get_existing_documents_from_chunks(
document_ids: set[str] = set()
try:
print("chunk existence future")
chunk_existence_future = {
executor.submit(
_does_document_exist,

View File

@ -29,6 +29,7 @@ VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
# main search application
VESPA_APP_CONTAINER_URL = VESPA_CLOUD_URL or f"http://{VESPA_HOST}:{VESPA_PORT}"
# danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd
DOCUMENT_ID_ENDPOINT = (
f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid"

View File

@ -183,7 +183,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator:
# If we are multi-tenant, we need to only set up initial public tables
with Session(engine) as db_session:
setup_danswer(db_session)
setup_danswer(db_session, None)
else:
setup_multitenant_danswer()

View File

@ -31,7 +31,6 @@ from danswer.key_value_store.factory import get_kv_store
from danswer.key_value_store.interface import KvKeyNotFoundError
from danswer.server.documents.models import ConnectorBase
from danswer.utils.logger import setup_logger
from danswer.utils.retry_wrapper import retry_builder
logger = setup_logger()
@ -39,6 +38,7 @@ logger = setup_logger()
def _create_indexable_chunks(
preprocessed_docs: list[dict],
tenant_id: str | None,
) -> tuple[list[Document], list[DocMetadataAwareIndexChunk]]:
ids_to_documents = {}
chunks = []
@ -80,7 +80,7 @@ def _create_indexable_chunks(
mini_chunk_embeddings=[],
),
title_embedding=preprocessed_doc["title_embedding"],
tenant_id=None,
tenant_id=tenant_id,
access=default_public_access,
document_sets=set(),
boost=DEFAULT_BOOST,
@ -90,7 +90,7 @@ def _create_indexable_chunks(
return list(ids_to_documents.values()), chunks
def seed_initial_documents(db_session: Session) -> None:
def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None:
"""
Seed initial documents so users don't have an empty index to start
@ -177,7 +177,7 @@ def seed_initial_documents(db_session: Session) -> None:
)
processed_docs = json.load(open(initial_docs_path))
docs, chunks = _create_indexable_chunks(processed_docs)
docs, chunks = _create_indexable_chunks(processed_docs, tenant_id)
index_doc_batch_prepare(
document_batch=docs,
@ -198,8 +198,9 @@ def seed_initial_documents(db_session: Session) -> None:
# Retries here because the index may take a few seconds to become ready
# as we just sent over the Vespa schema and there is a slight delay
index_with_retries = retry_builder()(document_index.index)
index_with_retries(chunks=chunks)
document_index.index(chunks=chunks)
# index_with_retries(chunks=chunks)
# Mock a run for the UI even though it did not actually call out to anything
mock_successful_index_attempt(

View File

@ -58,7 +58,7 @@ from shared_configs.model_server_models import SupportedEmbeddingModel
logger = setup_logger()
def setup_danswer(db_session: Session) -> None:
def setup_danswer(db_session: Session, tenant_id: str | None) -> None:
"""
Setup Danswer for a particular tenant. In the Single Tenant case, it will set it up for the default schema
on server startup. In the MT case, it will be called when the tenant is created.
@ -147,7 +147,7 @@ def setup_danswer(db_session: Session) -> None:
# update multipass indexing setting based on GPU availability
update_default_multipass_indexing(db_session)
seed_initial_documents(db_session)
seed_initial_documents(db_session, tenant_id)
def translate_saved_search_settings(db_session: Session) -> None:

View File

@ -59,7 +59,7 @@ def create_tenant(
run_alembic_migrations(tenant_id)
with get_session_with_tenant(tenant_id) as db_session:
setup_danswer(db_session)
setup_danswer(db_session, tenant_id)
add_users_to_tenant([email], tenant_id)