From f85c2059d3f80a425f229868533f740a12e40af9 Mon Sep 17 00:00:00 2001 From: pablodanswer Date: Fri, 25 Oct 2024 18:56:07 -0700 Subject: [PATCH] tenant seeding docs --- backend/danswer/background/celery/apps/beat.py | 1 + backend/danswer/document_index/vespa/index.py | 6 ++++++ .../danswer/document_index/vespa/indexing_utils.py | 3 ++- backend/danswer/document_index/vespa_constants.py | 1 + backend/danswer/main.py | 2 +- backend/danswer/seeding/load_docs.py | 13 +++++++------ backend/danswer/setup.py | 4 ++-- backend/ee/danswer/server/tenants/api.py | 2 +- 8 files changed, 21 insertions(+), 11 deletions(-) diff --git a/backend/danswer/background/celery/apps/beat.py b/backend/danswer/background/celery/apps/beat.py index 8ddc17efc..f88295fa1 100644 --- a/backend/danswer/background/celery/apps/beat.py +++ b/backend/danswer/background/celery/apps/beat.py @@ -78,6 +78,7 @@ tasks_to_schedule = [ }, ] + # Build the celery beat schedule dynamically beat_schedule = {} diff --git a/backend/danswer/document_index/vespa/index.py b/backend/danswer/document_index/vespa/index.py index 86bc481e5..595221577 100644 --- a/backend/danswer/document_index/vespa/index.py +++ b/backend/danswer/document_index/vespa/index.py @@ -311,6 +311,8 @@ class VespaIndex(DocumentIndex): with updating the associated permissions. Assumes that a document will not be split into multiple chunk batches calling this function multiple times, otherwise only the last set of chunks will be kept""" + print("INDEXING") + print(chunks[0].tenant_id) # IMPORTANT: This must be done one index at a time, do not use secondary index here cleaned_chunks = [clean_chunk_id_copy(chunk) for chunk in chunks] @@ -322,10 +324,12 @@ class VespaIndex(DocumentIndex): concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor, get_vespa_http_client() as http_client, ): + print("existing docs") # Check for existing documents, existing documents need to have all of their chunks deleted # prior to indexing as the document size (num chunks) may have shrunk first_chunks = [chunk for chunk in cleaned_chunks if chunk.chunk_id == 0] for chunk_batch in batch_generator(first_chunks, BATCH_SIZE): + print("batch") existing_docs.update( get_existing_documents_from_chunks( chunks=chunk_batch, @@ -334,6 +338,7 @@ class VespaIndex(DocumentIndex): executor=executor, ) ) + print("delete docs ") for doc_id_batch in batch_generator(existing_docs, BATCH_SIZE): delete_vespa_docs( @@ -343,6 +348,7 @@ class VespaIndex(DocumentIndex): executor=executor, ) + print("index chunks") for chunk_batch in batch_generator(cleaned_chunks, BATCH_SIZE): batch_index_vespa_chunks( chunks=chunk_batch, diff --git a/backend/danswer/document_index/vespa/indexing_utils.py b/backend/danswer/document_index/vespa/indexing_utils.py index 8ecdc2267..1e7d7a6d2 100644 --- a/backend/danswer/document_index/vespa/indexing_utils.py +++ b/backend/danswer/document_index/vespa/indexing_utils.py @@ -57,7 +57,7 @@ def _does_document_exist( chunk. This checks for whether the chunk exists already in the index""" doc_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}" doc_fetch_response = http_client.get(doc_url) - + print("doc fetch response") if doc_fetch_response.status_code == 404: return False @@ -96,6 +96,7 @@ def get_existing_documents_from_chunks( document_ids: set[str] = set() try: + print("chunk existence future") chunk_existence_future = { executor.submit( _does_document_exist, diff --git a/backend/danswer/document_index/vespa_constants.py b/backend/danswer/document_index/vespa_constants.py index d4a36ef97..30039922f 100644 --- a/backend/danswer/document_index/vespa_constants.py +++ b/backend/danswer/document_index/vespa_constants.py @@ -29,6 +29,7 @@ VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2" # main search application VESPA_APP_CONTAINER_URL = VESPA_CLOUD_URL or f"http://{VESPA_HOST}:{VESPA_PORT}" + # danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd DOCUMENT_ID_ENDPOINT = ( f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid" diff --git a/backend/danswer/main.py b/backend/danswer/main.py index a6a338b4c..5244d5f1a 100644 --- a/backend/danswer/main.py +++ b/backend/danswer/main.py @@ -183,7 +183,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator: # If we are multi-tenant, we need to only set up initial public tables with Session(engine) as db_session: - setup_danswer(db_session) + setup_danswer(db_session, None) else: setup_multitenant_danswer() diff --git a/backend/danswer/seeding/load_docs.py b/backend/danswer/seeding/load_docs.py index 2756e7ddf..edd52d9b5 100644 --- a/backend/danswer/seeding/load_docs.py +++ b/backend/danswer/seeding/load_docs.py @@ -31,7 +31,6 @@ from danswer.key_value_store.factory import get_kv_store from danswer.key_value_store.interface import KvKeyNotFoundError from danswer.server.documents.models import ConnectorBase from danswer.utils.logger import setup_logger -from danswer.utils.retry_wrapper import retry_builder logger = setup_logger() @@ -39,6 +38,7 @@ logger = setup_logger() def _create_indexable_chunks( preprocessed_docs: list[dict], + tenant_id: str | None, ) -> tuple[list[Document], list[DocMetadataAwareIndexChunk]]: ids_to_documents = {} chunks = [] @@ -80,7 +80,7 @@ def _create_indexable_chunks( mini_chunk_embeddings=[], ), title_embedding=preprocessed_doc["title_embedding"], - tenant_id=None, + tenant_id=tenant_id, access=default_public_access, document_sets=set(), boost=DEFAULT_BOOST, @@ -90,7 +90,7 @@ def _create_indexable_chunks( return list(ids_to_documents.values()), chunks -def seed_initial_documents(db_session: Session) -> None: +def seed_initial_documents(db_session: Session, tenant_id: str | None) -> None: """ Seed initial documents so users don't have an empty index to start @@ -177,7 +177,7 @@ def seed_initial_documents(db_session: Session) -> None: ) processed_docs = json.load(open(initial_docs_path)) - docs, chunks = _create_indexable_chunks(processed_docs) + docs, chunks = _create_indexable_chunks(processed_docs, tenant_id) index_doc_batch_prepare( document_batch=docs, @@ -198,8 +198,9 @@ def seed_initial_documents(db_session: Session) -> None: # Retries here because the index may take a few seconds to become ready # as we just sent over the Vespa schema and there is a slight delay - index_with_retries = retry_builder()(document_index.index) - index_with_retries(chunks=chunks) + + document_index.index(chunks=chunks) + # index_with_retries(chunks=chunks) # Mock a run for the UI even though it did not actually call out to anything mock_successful_index_attempt( diff --git a/backend/danswer/setup.py b/backend/danswer/setup.py index a27580a2b..426dc5e4b 100644 --- a/backend/danswer/setup.py +++ b/backend/danswer/setup.py @@ -58,7 +58,7 @@ from shared_configs.model_server_models import SupportedEmbeddingModel logger = setup_logger() -def setup_danswer(db_session: Session) -> None: +def setup_danswer(db_session: Session, tenant_id: str | None) -> None: """ Setup Danswer for a particular tenant. In the Single Tenant case, it will set it up for the default schema on server startup. In the MT case, it will be called when the tenant is created. @@ -147,7 +147,7 @@ def setup_danswer(db_session: Session) -> None: # update multipass indexing setting based on GPU availability update_default_multipass_indexing(db_session) - seed_initial_documents(db_session) + seed_initial_documents(db_session, tenant_id) def translate_saved_search_settings(db_session: Session) -> None: diff --git a/backend/ee/danswer/server/tenants/api.py b/backend/ee/danswer/server/tenants/api.py index 342554c1c..3776c726e 100644 --- a/backend/ee/danswer/server/tenants/api.py +++ b/backend/ee/danswer/server/tenants/api.py @@ -59,7 +59,7 @@ def create_tenant( run_alembic_migrations(tenant_id) with get_session_with_tenant(tenant_id) as db_session: - setup_danswer(db_session) + setup_danswer(db_session, tenant_id) add_users_to_tenant([email], tenant_id)