From 89f71ac33589314205d2af8a239666fdddb12e74 Mon Sep 17 00:00:00 2001
From: Chris Weaver <25087905+Weves@users.noreply.github.com>
Date: Wed, 9 Aug 2023 00:53:42 -0700
Subject: [PATCH] Support deletion of documents when a connector is deleted
(#271)
---
CONTRIBUTING.md | 9 +
...533f0_make_last_attempt_status_nullable.py | 50 +++
...0c7ad8a076_added_deletion_attempt_table.py | 115 +++++++
.../danswer/background/connector_deletion.py | 296 ++++++++++++++++++
backend/danswer/background/update.py | 8 +-
.../connectors/google_drive/connector.py | 2 +-
backend/danswer/connectors/models.py | 16 +-
backend/danswer/connectors/utils.py | 7 +-
backend/danswer/datastores/datastore_utils.py | 103 ++++--
.../danswer/datastores/indexing_pipeline.py | 77 ++++-
backend/danswer/datastores/interfaces.py | 58 +++-
backend/danswer/datastores/qdrant/indexing.py | 84 +++--
backend/danswer/datastores/qdrant/store.py | 39 ++-
backend/danswer/datastores/qdrant/utils.py | 12 +
backend/danswer/datastores/typesense/store.py | 108 +++++--
backend/danswer/db/connector.py | 1 -
.../danswer/db/connector_credential_pair.py | 14 +-
backend/danswer/db/credentials.py | 16 -
backend/danswer/db/deletion_attempt.py | 85 +++++
backend/danswer/db/document.py | 185 +++++++++++
backend/danswer/db/index_attempt.py | 13 +
backend/danswer/db/models.py | 121 ++++++-
backend/danswer/db/utils.py | 9 +
backend/danswer/server/manage.py | 116 +++++--
backend/danswer/server/models.py | 66 +++-
backend/danswer/server/utils.py | 17 +
backend/scripts/reset_postgres.py | 3 +
backend/supervisord.conf | 7 +
.../app/admin/connectors/bookstack/page.tsx | 7 +-
.../app/admin/connectors/confluence/page.tsx | 7 +-
web/src/app/admin/connectors/file/page.tsx | 155 +++------
web/src/app/admin/connectors/github/page.tsx | 14 +-
.../GoogleDriveConnectorsTable.tsx | 46 ++-
.../admin/connectors/google-drive/page.tsx | 24 +-
web/src/app/admin/connectors/guru/page.tsx | 15 +-
web/src/app/admin/connectors/jira/page.tsx | 7 +-
web/src/app/admin/connectors/notion/page.tsx | 7 +-
.../admin/connectors/productboard/page.tsx | 14 +-
web/src/app/admin/connectors/slab/page.tsx | 14 +-
web/src/app/admin/connectors/slack/page.tsx | 16 +-
web/src/app/admin/connectors/web/page.tsx | 4 +-
web/src/app/admin/indexing/status/page.tsx | 4 +-
web/src/app/auth/login/page.tsx | 2 +-
.../connectors/table/ConnectorsTable.tsx | 62 ++--
.../admin/connectors/table/DeleteColumn.tsx | 78 +++++
.../table/SingleUseConnectorsTable.tsx | 189 +++++++++++
.../search/SearchResultsDisplay.tsx | 1 -
web/src/lib/documentDeletion.ts | 22 ++
web/src/lib/types.ts | 26 +-
49 files changed, 1961 insertions(+), 390 deletions(-)
create mode 100644 backend/alembic/versions/b082fec533f0_make_last_attempt_status_nullable.py
create mode 100644 backend/alembic/versions/df0c7ad8a076_added_deletion_attempt_table.py
create mode 100644 backend/danswer/background/connector_deletion.py
create mode 100644 backend/danswer/datastores/qdrant/utils.py
create mode 100644 backend/danswer/db/deletion_attempt.py
create mode 100644 backend/danswer/db/document.py
create mode 100644 backend/danswer/db/utils.py
create mode 100644 backend/danswer/server/utils.py
create mode 100644 web/src/components/admin/connectors/table/DeleteColumn.tsx
create mode 100644 web/src/components/admin/connectors/table/SingleUseConnectorsTable.tsx
create mode 100644 web/src/lib/documentDeletion.ts
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 7d6e00158..b20c7a15e 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -153,6 +153,15 @@ _For Windows:_
powershell -Command " $env:PYTHONPATH='.'; $env:TYPESENSE_API_KEY='typesense_api_key'; $env:DYNAMIC_CONFIG_DIR_PATH='./dynamic_config_storage'; python danswer/background/update.py "
```
+To run the background job which handles deletion of connectors, navigate to `danswer/backend` and run:
+```bash
+PYTHONPATH=. TYPESENSE_API_KEY=typesense_api_key DYNAMIC_CONFIG_DIR_PATH=./dynamic_config_storage python danswer/background/connector_deletion.py
+```
+_For Windows:_
+```bash
+powershell -Command " $env:PYTHONPATH='.'; $env:TYPESENSE_API_KEY='typesense_api_key'; $env:DYNAMIC_CONFIG_DIR_PATH='./dynamic_config_storage'; python danswer/background/connector_deletion.py "
+```
+
Note: if you need finer logging, add the additional environment variable `LOG_LEVEL=DEBUG` to the relevant services.
### Formatting and Linting
diff --git a/backend/alembic/versions/b082fec533f0_make_last_attempt_status_nullable.py b/backend/alembic/versions/b082fec533f0_make_last_attempt_status_nullable.py
new file mode 100644
index 000000000..b41cc8917
--- /dev/null
+++ b/backend/alembic/versions/b082fec533f0_make_last_attempt_status_nullable.py
@@ -0,0 +1,50 @@
+"""Make 'last_attempt_status' nullable
+
+Revision ID: b082fec533f0
+Revises: df0c7ad8a076
+Create Date: 2023-08-06 12:05:47.087325
+
+"""
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects import postgresql
+
+# revision identifiers, used by Alembic.
+revision = "b082fec533f0"
+down_revision = "df0c7ad8a076"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.alter_column(
+ "connector_credential_pair",
+ "last_attempt_status",
+ existing_type=postgresql.ENUM(
+ "NOT_STARTED",
+ "IN_PROGRESS",
+ "SUCCESS",
+ "FAILED",
+ name="indexingstatus",
+ ),
+ nullable=True,
+ )
+ # ### end Alembic commands ###
+
+
+def downgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.alter_column(
+ "connector_credential_pair",
+ "last_attempt_status",
+ existing_type=postgresql.ENUM(
+ "NOT_STARTED",
+ "IN_PROGRESS",
+ "SUCCESS",
+ "FAILED",
+ name="indexingstatus",
+ ),
+ nullable=False,
+ )
+ # ### end Alembic commands ###
diff --git a/backend/alembic/versions/df0c7ad8a076_added_deletion_attempt_table.py b/backend/alembic/versions/df0c7ad8a076_added_deletion_attempt_table.py
new file mode 100644
index 000000000..41c4a2184
--- /dev/null
+++ b/backend/alembic/versions/df0c7ad8a076_added_deletion_attempt_table.py
@@ -0,0 +1,115 @@
+"""Added deletion_attempt table
+
+Revision ID: df0c7ad8a076
+Revises: d7111c1238cd
+Create Date: 2023-08-05 13:35:39.609619
+
+"""
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision = "df0c7ad8a076"
+down_revision = "d7111c1238cd"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.create_table(
+ "document",
+ sa.Column("id", sa.String(), nullable=False),
+ sa.PrimaryKeyConstraint("id"),
+ )
+ op.create_table(
+ "chunk",
+ sa.Column("id", sa.String(), nullable=False),
+ sa.Column(
+ "document_store_type",
+ sa.Enum(
+ "VECTOR",
+ "KEYWORD",
+ name="documentstoretype",
+ ),
+ nullable=False,
+ ),
+ sa.Column("document_id", sa.String(), nullable=False),
+ sa.ForeignKeyConstraint(
+ ["document_id"],
+ ["document.id"],
+ ),
+ sa.PrimaryKeyConstraint("id", "document_store_type"),
+ )
+ op.create_table(
+ "deletion_attempt",
+ sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("connector_id", sa.Integer(), nullable=False),
+ sa.Column("credential_id", sa.Integer(), nullable=False),
+ sa.Column(
+ "status",
+ sa.Enum(
+ "NOT_STARTED",
+ "IN_PROGRESS",
+ "SUCCESS",
+ "FAILED",
+ name="deletionstatus",
+ ),
+ nullable=False,
+ ),
+ sa.Column("num_docs_deleted", sa.Integer(), nullable=False),
+ sa.Column("error_msg", sa.String(), nullable=True),
+ sa.Column(
+ "time_created",
+ sa.DateTime(timezone=True),
+ server_default=sa.text("now()"),
+ nullable=False,
+ ),
+ sa.Column(
+ "time_updated",
+ sa.DateTime(timezone=True),
+ server_default=sa.text("now()"),
+ nullable=False,
+ ),
+ sa.ForeignKeyConstraint(
+ ["connector_id"],
+ ["connector.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["credential_id"],
+ ["credential.id"],
+ ),
+ sa.PrimaryKeyConstraint("id"),
+ )
+ op.create_table(
+ "document_by_connector_credential_pair",
+ sa.Column("id", sa.String(), nullable=False),
+ sa.Column("connector_id", sa.Integer(), nullable=False),
+ sa.Column("credential_id", sa.Integer(), nullable=False),
+ sa.ForeignKeyConstraint(
+ ["connector_id"],
+ ["connector.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["credential_id"],
+ ["credential.id"],
+ ),
+ sa.ForeignKeyConstraint(
+ ["id"],
+ ["document.id"],
+ ),
+ sa.PrimaryKeyConstraint("id", "connector_id", "credential_id"),
+ )
+ # ### end Alembic commands ###
+
+
+def downgrade() -> None:
+ # ### commands auto generated by Alembic - please adjust! ###
+ op.drop_table("document_by_connector_credential_pair")
+ op.drop_table("deletion_attempt")
+ op.drop_table("chunk")
+ op.drop_table("document")
+ sa.Enum(name="deletionstatus").drop(op.get_bind(), checkfirst=False)
+ sa.Enum(name="documentstoretype").drop(op.get_bind(), checkfirst=False)
+ # ### end Alembic commands ###
diff --git a/backend/danswer/background/connector_deletion.py b/backend/danswer/background/connector_deletion.py
new file mode 100644
index 000000000..214b7fce4
--- /dev/null
+++ b/backend/danswer/background/connector_deletion.py
@@ -0,0 +1,296 @@
+"""
+To delete a connector / credential pair:
+(1) find all documents associated with connector / credential pair where there
+this the is only connector / credential pair that has indexed it
+(2) delete all documents from document stores
+(3) delete all entries from postgres
+(4) find all documents associated with connector / credential pair where there
+are multiple connector / credential pairs that have indexed it
+(5) update document store entries to remove access associated with the
+connector / credential pair from the access list
+(6) delete all relevant entries from postgres
+"""
+import time
+from collections import defaultdict
+from datetime import datetime
+
+from sqlalchemy.orm import Session
+
+from danswer.configs.constants import PUBLIC_DOC_PAT
+from danswer.datastores.interfaces import KeywordIndex
+from danswer.datastores.interfaces import StoreType
+from danswer.datastores.interfaces import UpdateRequest
+from danswer.datastores.interfaces import VectorIndex
+from danswer.datastores.qdrant.store import QdrantIndex
+from danswer.datastores.typesense.store import TypesenseIndex
+from danswer.db.connector import fetch_connector_by_id
+from danswer.db.connector_credential_pair import delete_connector_credential_pair
+from danswer.db.connector_credential_pair import get_connector_credential_pair
+from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
+from danswer.db.deletion_attempt import delete_deletion_attempts
+from danswer.db.deletion_attempt import get_deletion_attempts
+from danswer.db.document import delete_document_by_connector_credential_pair
+from danswer.db.document import delete_documents_complete
+from danswer.db.document import get_chunk_ids_for_document_ids
+from danswer.db.document import (
+ get_chunks_with_single_connector_credential_pair,
+)
+from danswer.db.document import (
+ get_document_by_connector_credential_pairs_indexed_by_multiple,
+)
+from danswer.db.engine import get_sqlalchemy_engine
+from danswer.db.index_attempt import delete_index_attempts
+from danswer.db.models import Credential
+from danswer.db.models import DeletionAttempt
+from danswer.db.models import DeletionStatus
+from danswer.utils.logger import setup_logger
+
+logger = setup_logger()
+
+
+def _delete_connector_credential_pair(
+ db_session: Session,
+ vector_index: VectorIndex,
+ keyword_index: KeywordIndex,
+ deletion_attempt: DeletionAttempt,
+) -> int:
+ connector_id = deletion_attempt.connector_id
+ credential_id = deletion_attempt.credential_id
+
+ def _delete_singly_indexed_docs() -> int:
+ # if a document store entry is only indexed by this connector_credential_pair, delete it
+ num_docs_deleted = 0
+ chunks_to_delete = get_chunks_with_single_connector_credential_pair(
+ db_session=db_session,
+ connector_id=connector_id,
+ credential_id=credential_id,
+ )
+ if chunks_to_delete:
+ document_ids: set[str] = set()
+ vector_chunk_ids_to_delete: list[str] = []
+ keyword_chunk_ids_to_delete: list[str] = []
+ for chunk in chunks_to_delete:
+ document_ids.add(chunk.document_id)
+ if chunk.document_store_type == StoreType.KEYWORD:
+ keyword_chunk_ids_to_delete.append(chunk.id)
+ else:
+ vector_chunk_ids_to_delete.append(chunk.id)
+
+ vector_index.delete(ids=vector_chunk_ids_to_delete)
+ keyword_index.delete(ids=keyword_chunk_ids_to_delete)
+ # removes all `Chunk`, `DocumentByConnectorCredentialPair`, and `Document`
+ # rows from the DB
+ delete_documents_complete(
+ db_session=db_session,
+ document_ids=list(document_ids),
+ )
+ num_docs_deleted += len(document_ids)
+ return num_docs_deleted
+
+ num_docs_deleted = _delete_singly_indexed_docs()
+
+ def _update_multi_indexed_docs() -> None:
+ # if a document is indexed by multiple connector_credential_pairs, we should
+ # update it's access rather than outright delete it
+ document_by_connector_credential_pairs_to_update = (
+ get_document_by_connector_credential_pairs_indexed_by_multiple(
+ db_session=db_session,
+ connector_id=connector_id,
+ credential_id=credential_id,
+ )
+ )
+
+ def _get_user(
+ credential: Credential,
+ ) -> str:
+ if credential.public_doc:
+ return PUBLIC_DOC_PAT
+
+ return str(credential.user.id)
+
+ # find out which documents need to be updated and what their new allowed_users
+ # should be. This is a bit slow as it requires looping through all the documents
+ to_be_deleted_user = _get_user(deletion_attempt.credential)
+ document_ids_not_needing_update: set[str] = set()
+ document_id_to_allowed_users: dict[str, list[str]] = defaultdict(list)
+ for (
+ document_by_connector_credential_pair
+ ) in document_by_connector_credential_pairs_to_update:
+ document_id = document_by_connector_credential_pair.id
+ user = _get_user(document_by_connector_credential_pair.credential)
+ document_id_to_allowed_users[document_id].append(user)
+
+ # if there's another connector / credential pair which has indexed this
+ # document with the same access, we don't need to update it since removing
+ # the access from this connector / credential pair won't change anything
+ if (
+ document_by_connector_credential_pair.connector_id != connector_id
+ or document_by_connector_credential_pair.credential_id != credential_id
+ ) and user == to_be_deleted_user:
+ document_ids_not_needing_update.add(document_id)
+
+ # categorize into groups of updates to try and batch them more efficiently
+ update_groups: dict[tuple[str, ...], list[str]] = {}
+ for document_id, allowed_users_lst in document_id_to_allowed_users.items():
+ # if document_id in document_ids_not_needing_update:
+ # continue
+
+ allowed_users_lst.remove(to_be_deleted_user)
+ allowed_users = tuple(sorted(set(allowed_users_lst)))
+ update_groups[allowed_users] = update_groups.get(allowed_users, []) + [
+ document_id
+ ]
+
+ # actually perform the updates in the document store
+ update_requests = [
+ UpdateRequest(
+ ids=list(
+ get_chunk_ids_for_document_ids(
+ db_session=db_session, document_ids=document_ids
+ )
+ ),
+ allowed_users=list(allowed_users),
+ )
+ for allowed_users, document_ids in update_groups.items()
+ ]
+ vector_index.update(update_requests=update_requests)
+ keyword_index.update(update_requests=update_requests)
+
+ # delete the `document_by_connector_credential_pair` rows for the connector / credential pair
+ delete_document_by_connector_credential_pair(
+ db_session=db_session,
+ document_ids=list(document_id_to_allowed_users.keys()),
+ )
+
+ _update_multi_indexed_docs()
+
+ def _cleanup() -> None:
+ # cleanup everything else up
+ # we cannot undo the deletion of the document store entries if something
+ # goes wrong since they happen outside the postgres world. Best we can do
+ # is keep everything else around and mark the deletion attempt as failed.
+ # If it's a transient failure, re-deleting the connector / credential pair should
+ # fix the weird state.
+ # TODO: lock anything to do with this connector via transaction isolation
+ # NOTE: we have to delete index_attempts and deletion_attempts since they both
+ # have foreign key columns to the connector
+ delete_index_attempts(
+ db_session=db_session,
+ connector_id=connector_id,
+ credential_id=credential_id,
+ )
+ delete_deletion_attempts(
+ db_session=db_session,
+ connector_id=connector_id,
+ credential_id=credential_id,
+ )
+ delete_connector_credential_pair(
+ db_session=db_session,
+ connector_id=connector_id,
+ credential_id=credential_id,
+ )
+ # if there are no credentials left, delete the connector
+ connector = fetch_connector_by_id(
+ db_session=db_session,
+ connector_id=connector_id,
+ )
+ if not connector or not len(connector.credentials):
+ logger.debug("Found no credentials left for connector, deleting connector")
+ db_session.delete(connector)
+ db_session.commit()
+
+ _cleanup()
+
+ logger.info(
+ "Successfully deleted connector_credential_pair with connector_id:"
+ f" '{connector_id}' and credential_id: '{credential_id}'. Deleted {num_docs_deleted} docs."
+ )
+ return num_docs_deleted
+
+
+def _run_deletion(db_session: Session) -> None:
+ # NOTE: makes the assumption that there is only one deletion job running at a time
+ deletion_attempts = get_deletion_attempts(
+ db_session, statuses=[DeletionStatus.NOT_STARTED], limit=1
+ )
+ if not deletion_attempts:
+ logger.info("No deletion attempts to run")
+ return
+
+ deletion_attempt = deletion_attempts[0]
+
+ # validate that the connector / credential pair is deletable
+ cc_pair = get_connector_credential_pair(
+ db_session=db_session,
+ connector_id=deletion_attempt.connector_id,
+ credential_id=deletion_attempt.credential_id,
+ )
+ if not cc_pair or not check_deletion_attempt_is_allowed(
+ connector_credential_pair=cc_pair
+ ):
+ error_msg = (
+ "Cannot run deletion attempt - connector_credential_pair is not deletable. "
+ "This is likely because there is an ongoing / planned indexing attempt OR the "
+ "connector is not disabled."
+ )
+ logger.error(error_msg)
+ deletion_attempt.status = DeletionStatus.FAILED
+ deletion_attempt.error_msg = error_msg
+ db_session.commit()
+ return
+
+ # kick off the actual deletion process
+ deletion_attempt.status = DeletionStatus.IN_PROGRESS
+ db_session.commit()
+ try:
+ num_docs_deleted = _delete_connector_credential_pair(
+ db_session=db_session,
+ vector_index=QdrantIndex(),
+ keyword_index=TypesenseIndex(),
+ deletion_attempt=deletion_attempt,
+ )
+ except Exception as e:
+ logger.exception(f"Failed to delete connector_credential_pair due to {e}")
+ deletion_attempt.status = DeletionStatus.FAILED
+ deletion_attempt.error_msg = str(e)
+ db_session.commit()
+ return
+
+ deletion_attempt.status = DeletionStatus.SUCCESS
+ deletion_attempt.num_docs_deleted = num_docs_deleted
+ db_session.commit()
+
+
+def _cleanup_deletion_jobs(db_session: Session) -> None:
+ """Cleanup any deletion jobs that were in progress but failed to complete
+ NOTE: makes the assumption that there is only one deletion job running at a time.
+ If multiple deletion jobs can be run at once, then this behavior no longer makes
+ sense."""
+ deletion_attempts = get_deletion_attempts(
+ db_session,
+ statuses=[DeletionStatus.IN_PROGRESS],
+ )
+ for deletion_attempt in deletion_attempts:
+ deletion_attempt.status = DeletionStatus.FAILED
+ db_session.commit()
+
+
+def _update_loop(delay: int = 10) -> None:
+ engine = get_sqlalchemy_engine()
+ while True:
+ start = time.time()
+ start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S")
+ logger.info(f"Running connector_deletion, current UTC time: {start_time_utc}")
+ try:
+ with Session(engine) as db_session:
+ _run_deletion(db_session)
+ _cleanup_deletion_jobs(db_session)
+ except Exception as e:
+ logger.exception(f"Failed to run connector_deletion due to {e}")
+ sleep_time = delay - (time.time() - start)
+ if sleep_time > 0:
+ time.sleep(sleep_time)
+
+
+if __name__ == "__main__":
+ _update_loop()
diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py
index 5a412a69c..18fb5ba93 100755
--- a/backend/danswer/background/update.py
+++ b/backend/danswer/background/update.py
@@ -7,6 +7,7 @@ from sqlalchemy.orm import Session
from danswer.connectors.factory import instantiate_connector
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
+from danswer.connectors.models import IndexAttemptMetadata
from danswer.connectors.models import InputType
from danswer.datastores.indexing_pipeline import build_indexing_pipeline
from danswer.db.connector import disable_connector
@@ -188,7 +189,12 @@ def run_indexing_jobs(db_session: Session) -> None:
None if db_credential.public_doc else db_credential.user_id
)
new_docs, total_batch_chunks = indexing_pipeline(
- documents=doc_batch, user_id=index_user_id
+ documents=doc_batch,
+ index_attempt_metadata=IndexAttemptMetadata(
+ user_id=index_user_id,
+ connector_id=db_connector.id,
+ credential_id=db_credential.id,
+ ),
)
net_doc_change += new_docs
chunk_count += total_batch_chunks
diff --git a/backend/danswer/connectors/google_drive/connector.py b/backend/danswer/connectors/google_drive/connector.py
index b7a5dd540..d7dea8c9d 100644
--- a/backend/danswer/connectors/google_drive/connector.py
+++ b/backend/danswer/connectors/google_drive/connector.py
@@ -166,7 +166,7 @@ def get_all_files_batched(
batch_size=batch_size,
)
yield from batch_generator(
- generator=valid_files,
+ items=valid_files,
batch_size=batch_size,
pre_batch_yield=lambda batch_files: logger.info(
f"Parseable Documents in batch: {[file['name'] for file in batch_files]}"
diff --git a/backend/danswer/connectors/models.py b/backend/danswer/connectors/models.py
index 588dfd08e..92734f7e0 100644
--- a/backend/danswer/connectors/models.py
+++ b/backend/danswer/connectors/models.py
@@ -1,6 +1,7 @@
from dataclasses import dataclass
from enum import Enum
from typing import Any
+from uuid import UUID
from pydantic import BaseModel
@@ -36,13 +37,8 @@ class InputType(str, Enum):
EVENT = "event" # e.g. registered an endpoint as a listener, and processing connector events
-class ConnectorDescriptor(BaseModel):
- source: DocumentSource
- # how the raw data being indexed is procured
- input_type: InputType
- # what is passed into the __init__ of the connector described by `source`
- # and `input_type`
- connector_specific_config: dict[str, Any]
-
- class Config:
- arbitrary_types_allowed = True
+@dataclass
+class IndexAttemptMetadata:
+ user_id: UUID | None
+ connector_id: int
+ credential_id: int
diff --git a/backend/danswer/connectors/utils.py b/backend/danswer/connectors/utils.py
index 699edb7af..0200f7225 100644
--- a/backend/danswer/connectors/utils.py
+++ b/backend/danswer/connectors/utils.py
@@ -1,6 +1,6 @@
from collections.abc import Callable
from collections.abc import Generator
-from collections.abc import Iterator
+from collections.abc import Iterable
from itertools import islice
from typing import TypeVar
@@ -8,12 +8,13 @@ T = TypeVar("T")
def batch_generator(
- generator: Iterator[T],
+ items: Iterable[T],
batch_size: int,
pre_batch_yield: Callable[[list[T]], None] | None = None,
) -> Generator[list[T], None, None]:
+ iterable = iter(items)
while True:
- batch = list(islice(generator, batch_size))
+ batch = list(islice(iterable, batch_size))
if not batch:
return
diff --git a/backend/danswer/datastores/datastore_utils.py b/backend/danswer/datastores/datastore_utils.py
index 427e850f2..6121ad899 100644
--- a/backend/danswer/datastores/datastore_utils.py
+++ b/backend/danswer/datastores/datastore_utils.py
@@ -1,12 +1,18 @@
import uuid
from collections.abc import Callable
from copy import deepcopy
+from typing import TypeVar
+
+from pydantic import BaseModel
from danswer.chunking.models import EmbeddedIndexChunk
from danswer.chunking.models import IndexChunk
from danswer.chunking.models import InferenceChunk
from danswer.configs.constants import ALLOWED_GROUPS
from danswer.configs.constants import ALLOWED_USERS
+from danswer.configs.constants import PUBLIC_DOC_PAT
+from danswer.connectors.models import Document
+from danswer.connectors.models import IndexAttemptMetadata
DEFAULT_BATCH_SIZE = 30
@@ -29,44 +35,87 @@ def get_uuid_from_chunk(
return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)
-# Takes the chunk identifier returns whether the chunk exists and the user/group whitelists
-WhitelistCallable = Callable[[str], tuple[bool, list[str], list[str]]]
+class CrossConnectorDocumentMetadata(BaseModel):
+ """Represents metadata about a single document. This is needed since the
+ `Document` class represents a document from a single connector, but that same
+ document may be indexed by multiple connectors."""
+
+ allowed_users: list[str]
+ allowed_user_groups: list[str]
+ already_in_index: bool
-def update_doc_user_map(
+# Takes the chunk identifier returns the existing metaddata about that chunk
+CrossConnectorDocumentMetadataFetchCallable = Callable[
+ [str], CrossConnectorDocumentMetadata | None
+]
+
+
+T = TypeVar("T")
+
+
+def _add_if_not_exists(l: list[T], item: T) -> list[T]:
+ if item not in l:
+ return l
+ return l + [item]
+
+
+def update_cross_connector_document_metadata_map(
chunk: IndexChunk | EmbeddedIndexChunk,
- doc_whitelist_map: dict[str, dict[str, list[str]]],
- doc_store_whitelist_fnc: WhitelistCallable,
- user_str: str,
-) -> tuple[dict[str, dict[str, list[str]]], bool]:
- """Returns an updated document id to whitelists mapping and if the document's chunks need to be wiped."""
- doc_whitelist_map = deepcopy(doc_whitelist_map)
+ cross_connector_document_metadata_map: dict[str, CrossConnectorDocumentMetadata],
+ doc_store_cross_connector_document_metadata_fetch_fn: CrossConnectorDocumentMetadataFetchCallable,
+ index_attempt_metadata: IndexAttemptMetadata,
+) -> tuple[dict[str, CrossConnectorDocumentMetadata], bool]:
+ """Returns an updated document_id -> CrossConnectorDocumentMetadata map and
+ if the document's chunks need to be wiped."""
+ user_str = (
+ PUBLIC_DOC_PAT
+ if index_attempt_metadata.user_id is None
+ else str(index_attempt_metadata.user_id)
+ )
+
+ cross_connector_document_metadata_map = deepcopy(
+ cross_connector_document_metadata_map
+ )
first_chunk_uuid = str(get_uuid_from_chunk(chunk))
document = chunk.source_document
- if document.id not in doc_whitelist_map:
- first_chunk_found, whitelist_users, whitelist_groups = doc_store_whitelist_fnc(
- first_chunk_uuid
+ if document.id not in cross_connector_document_metadata_map:
+ document_metadata_in_doc_store = (
+ doc_store_cross_connector_document_metadata_fetch_fn(first_chunk_uuid)
)
- if not first_chunk_found:
- doc_whitelist_map[document.id] = {
- ALLOWED_USERS: [user_str],
- # TODO introduce groups logic here
- ALLOWED_GROUPS: whitelist_groups,
- }
+ if not document_metadata_in_doc_store:
+ cross_connector_document_metadata_map[
+ document.id
+ ] = CrossConnectorDocumentMetadata(
+ allowed_users=[user_str],
+ allowed_user_groups=[],
+ already_in_index=False,
+ )
# First chunk does not exist so document does not exist, no need for deletion
- return doc_whitelist_map, False
+ return cross_connector_document_metadata_map, False
else:
- if user_str not in whitelist_users:
- whitelist_users.append(user_str)
# TODO introduce groups logic here
- doc_whitelist_map[document.id] = {
- ALLOWED_USERS: whitelist_users,
- ALLOWED_GROUPS: whitelist_groups,
- }
+ cross_connector_document_metadata_map[
+ document.id
+ ] = CrossConnectorDocumentMetadata(
+ allowed_users=_add_if_not_exists(
+ document_metadata_in_doc_store.allowed_users, user_str
+ ),
+ allowed_user_groups=document_metadata_in_doc_store.allowed_user_groups,
+ already_in_index=True,
+ )
# First chunk exists, but with update, there may be less total chunks now
# Must delete rest of document chunks
- return doc_whitelist_map, True
+ return cross_connector_document_metadata_map, True
+ existing_document_metadata = cross_connector_document_metadata_map[document.id]
+ cross_connector_document_metadata_map[document.id] = CrossConnectorDocumentMetadata(
+ allowed_users=_add_if_not_exists(
+ existing_document_metadata.allowed_users, user_str
+ ),
+ allowed_user_groups=existing_document_metadata.allowed_user_groups,
+ already_in_index=existing_document_metadata.already_in_index,
+ )
# If document is already in the mapping, don't delete again
- return doc_whitelist_map, False
+ return cross_connector_document_metadata_map, False
diff --git a/backend/danswer/datastores/indexing_pipeline.py b/backend/danswer/datastores/indexing_pipeline.py
index 42275db7b..d0bb79d83 100644
--- a/backend/danswer/datastores/indexing_pipeline.py
+++ b/backend/danswer/datastores/indexing_pipeline.py
@@ -1,15 +1,22 @@
from functools import partial
from itertools import chain
from typing import Protocol
-from uuid import UUID
+
+from sqlalchemy.orm import Session
from danswer.chunking.chunk import Chunker
from danswer.chunking.chunk import DefaultChunker
from danswer.connectors.models import Document
+from danswer.connectors.models import IndexAttemptMetadata
+from danswer.datastores.interfaces import ChunkInsertionRecord
+from danswer.datastores.interfaces import ChunkMetadata
from danswer.datastores.interfaces import KeywordIndex
+from danswer.datastores.interfaces import StoreType
from danswer.datastores.interfaces import VectorIndex
from danswer.datastores.qdrant.store import QdrantIndex
from danswer.datastores.typesense.store import TypesenseIndex
+from danswer.db.document import upsert_documents_complete
+from danswer.db.engine import get_sqlalchemy_engine
from danswer.search.models import Embedder
from danswer.search.semantic_search import DefaultEmbedder
from danswer.utils.logger import setup_logger
@@ -19,11 +26,44 @@ logger = setup_logger()
class IndexingPipelineProtocol(Protocol):
def __call__(
- self, documents: list[Document], user_id: UUID | None
+ self, documents: list[Document], index_attempt_metadata: IndexAttemptMetadata
) -> tuple[int, int]:
...
+def _upsert_insertion_records(
+ insertion_records: list[ChunkInsertionRecord],
+ index_attempt_metadata: IndexAttemptMetadata,
+ document_store_type: StoreType,
+) -> None:
+ with Session(get_sqlalchemy_engine()) as session:
+ upsert_documents_complete(
+ db_session=session,
+ document_metadata_batch=[
+ ChunkMetadata(
+ connector_id=index_attempt_metadata.connector_id,
+ credential_id=index_attempt_metadata.credential_id,
+ document_id=insertion_record.document_id,
+ store_id=insertion_record.store_id,
+ document_store_type=document_store_type,
+ )
+ for insertion_record in insertion_records
+ ],
+ )
+
+
+def _get_net_new_documents(
+ insertion_records: list[ChunkInsertionRecord],
+) -> int:
+ net_new_documents = 0
+ seen_documents: set[str] = set()
+ for insertion_record in insertion_records:
+ if insertion_record.document_id not in seen_documents:
+ net_new_documents += 1
+ seen_documents.add(insertion_record.document_id)
+ return net_new_documents
+
+
def _indexing_pipeline(
*,
chunker: Chunker,
@@ -31,18 +71,37 @@ def _indexing_pipeline(
vector_index: VectorIndex,
keyword_index: KeywordIndex,
documents: list[Document],
- user_id: UUID | None,
+ index_attempt_metadata: IndexAttemptMetadata,
) -> tuple[int, int]:
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
Note that the documents should already be batched at this point so that it does not inflate the
memory requirements"""
- # TODO: make entire indexing pipeline async to not block the entire process
- # when running on async endpoints
- chunks = list(chain(*[chunker.chunk(document) for document in documents]))
+ chunks = list(chain(*[chunker.chunk(document=document) for document in documents]))
# TODO keyword indexing can occur at same time as embedding
- net_doc_count_keyword = keyword_index.index(chunks, user_id)
- chunks_with_embeddings = embedder.embed(chunks)
- net_doc_count_vector = vector_index.index(chunks_with_embeddings, user_id)
+ keyword_store_insertion_records = keyword_index.index(
+ chunks=chunks, index_attempt_metadata=index_attempt_metadata
+ )
+ _upsert_insertion_records(
+ insertion_records=keyword_store_insertion_records,
+ index_attempt_metadata=index_attempt_metadata,
+ document_store_type=StoreType.KEYWORD,
+ )
+ net_doc_count_keyword = _get_net_new_documents(
+ insertion_records=keyword_store_insertion_records
+ )
+
+ chunks_with_embeddings = embedder.embed(chunks=chunks)
+ vector_store_insertion_records = vector_index.index(
+ chunks=chunks_with_embeddings, index_attempt_metadata=index_attempt_metadata
+ )
+ _upsert_insertion_records(
+ insertion_records=vector_store_insertion_records,
+ index_attempt_metadata=index_attempt_metadata,
+ document_store_type=StoreType.VECTOR,
+ )
+ net_doc_count_vector = _get_net_new_documents(
+ insertion_records=vector_store_insertion_records
+ )
if net_doc_count_vector != net_doc_count_keyword:
logger.warning("Document count change from keyword/vector indices don't align")
net_new_docs = max(net_doc_count_keyword, net_doc_count_vector)
diff --git a/backend/danswer/datastores/interfaces.py b/backend/danswer/datastores/interfaces.py
index 2340101ca..382a62784 100644
--- a/backend/danswer/datastores/interfaces.py
+++ b/backend/danswer/datastores/interfaces.py
@@ -1,4 +1,7 @@
import abc
+from dataclasses import dataclass
+from enum import Enum
+from typing import Any
from typing import Generic
from typing import TypeVar
from uuid import UUID
@@ -7,20 +10,65 @@ from danswer.chunking.models import BaseChunk
from danswer.chunking.models import EmbeddedIndexChunk
from danswer.chunking.models import IndexChunk
from danswer.chunking.models import InferenceChunk
+from danswer.connectors.models import IndexAttemptMetadata
T = TypeVar("T", bound=BaseChunk)
IndexFilter = dict[str, str | list[str] | None]
-class DocumentIndex(Generic[T], abc.ABC):
+class StoreType(str, Enum):
+ VECTOR = "vector"
+ KEYWORD = "keyword"
+
+
+@dataclass
+class ChunkInsertionRecord:
+ document_id: str
+ store_id: str
+ already_existed: bool
+
+
+@dataclass
+class ChunkMetadata:
+ connector_id: int
+ credential_id: int
+ document_id: str
+ store_id: str
+ document_store_type: StoreType
+
+
+@dataclass
+class UpdateRequest:
+ ids: list[str]
+ # all other fields will be left alone
+ allowed_users: list[str]
+
+
+class Indexable(Generic[T], abc.ABC):
@abc.abstractmethod
- def index(self, chunks: list[T], user_id: UUID | None) -> int:
- """Indexes document chunks into the Document Index and return the number of new documents"""
+ def index(
+ self, chunks: list[T], index_attempt_metadata: IndexAttemptMetadata
+ ) -> list[ChunkInsertionRecord]:
+ """Indexes document chunks into the Document Index and return the IDs of all the documents indexed"""
raise NotImplementedError
-class VectorIndex(DocumentIndex[EmbeddedIndexChunk], abc.ABC):
+class Deletable(abc.ABC):
+ @abc.abstractmethod
+ def delete(self, ids: list[str]) -> None:
+ """Removes the specified documents from the Index"""
+ raise NotImplementedError
+
+
+class Updatable(abc.ABC):
+ @abc.abstractmethod
+ def update(self, update_requests: list[UpdateRequest]) -> None:
+ """Updates metadata for the specified documents in the Index"""
+ raise NotImplementedError
+
+
+class VectorIndex(Indexable[EmbeddedIndexChunk], Deletable, Updatable, abc.ABC):
@abc.abstractmethod
def semantic_retrieval(
self,
@@ -32,7 +80,7 @@ class VectorIndex(DocumentIndex[EmbeddedIndexChunk], abc.ABC):
raise NotImplementedError
-class KeywordIndex(DocumentIndex[IndexChunk], abc.ABC):
+class KeywordIndex(Indexable[IndexChunk], Deletable, Updatable, abc.ABC):
@abc.abstractmethod
def keyword_search(
self,
diff --git a/backend/danswer/datastores/qdrant/indexing.py b/backend/danswer/datastores/qdrant/indexing.py
index db9692323..5e60d5650 100644
--- a/backend/danswer/datastores/qdrant/indexing.py
+++ b/backend/danswer/datastores/qdrant/indexing.py
@@ -19,15 +19,20 @@ from danswer.configs.constants import CHUNK_ID
from danswer.configs.constants import CONTENT
from danswer.configs.constants import DOCUMENT_ID
from danswer.configs.constants import METADATA
-from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.configs.constants import SECTION_CONTINUATION
from danswer.configs.constants import SEMANTIC_IDENTIFIER
from danswer.configs.constants import SOURCE_LINKS
from danswer.configs.constants import SOURCE_TYPE
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
+from danswer.connectors.models import IndexAttemptMetadata
+from danswer.datastores.datastore_utils import CrossConnectorDocumentMetadata
from danswer.datastores.datastore_utils import DEFAULT_BATCH_SIZE
from danswer.datastores.datastore_utils import get_uuid_from_chunk
-from danswer.datastores.datastore_utils import update_doc_user_map
+from danswer.datastores.datastore_utils import (
+ update_cross_connector_document_metadata_map,
+)
+from danswer.datastores.interfaces import ChunkInsertionRecord
+from danswer.datastores.qdrant.utils import get_payload_from_record
from danswer.utils.clients import get_qdrant_client
from danswer.utils.logger import setup_logger
@@ -51,9 +56,9 @@ def create_qdrant_collection(
raise RuntimeError("Could not create Qdrant collection")
-def get_qdrant_document_whitelists(
+def get_qdrant_document_cross_connector_metadata(
doc_chunk_id: str, collection_name: str, q_client: QdrantClient
-) -> tuple[bool, list[str], list[str]]:
+) -> CrossConnectorDocumentMetadata | None:
"""Get whether a document is found and the existing whitelists"""
results = q_client.retrieve(
collection_name=collection_name,
@@ -61,13 +66,13 @@ def get_qdrant_document_whitelists(
with_payload=[ALLOWED_USERS, ALLOWED_GROUPS],
)
if len(results) == 0:
- return False, [], []
- payload = results[0].payload
- if not payload:
- raise RuntimeError(
- "Qdrant Index is corrupted, Document found with no access lists."
- )
- return True, payload[ALLOWED_USERS], payload[ALLOWED_GROUPS]
+ return None
+ payload = get_payload_from_record(results[0])
+ return CrossConnectorDocumentMetadata(
+ allowed_users=payload[ALLOWED_USERS],
+ allowed_user_groups=payload[ALLOWED_GROUPS],
+ already_in_index=True,
+ )
def delete_qdrant_doc_chunks(
@@ -91,42 +96,53 @@ def delete_qdrant_doc_chunks(
def index_qdrant_chunks(
chunks: list[EmbeddedIndexChunk],
- user_id: UUID | None,
+ index_attempt_metadata: IndexAttemptMetadata,
collection: str,
client: QdrantClient | None = None,
batch_upsert: bool = True,
-) -> int:
+) -> list[ChunkInsertionRecord]:
# Public documents will have the PUBLIC string in ALLOWED_USERS
# If credential that kicked this off has no user associated, either Auth is off or the doc is public
- user_str = PUBLIC_DOC_PAT if user_id is None else str(user_id)
q_client: QdrantClient = client if client else get_qdrant_client()
point_structs: list[PointStruct] = []
+ insertion_records: list[ChunkInsertionRecord] = []
# Maps document id to dict of whitelists for users/groups each containing list of users/groups as strings
- doc_user_map: dict[str, dict[str, list[str]]] = {}
- docs_deleted = 0
+ cross_connector_document_metadata_map: dict[
+ str, CrossConnectorDocumentMetadata
+ ] = {}
for chunk in chunks:
document = chunk.source_document
- doc_user_map, delete_doc = update_doc_user_map(
- chunk,
- doc_user_map,
- partial(
- get_qdrant_document_whitelists,
+ (
+ cross_connector_document_metadata_map,
+ should_delete_doc,
+ ) = update_cross_connector_document_metadata_map(
+ chunk=chunk,
+ cross_connector_document_metadata_map=cross_connector_document_metadata_map,
+ doc_store_cross_connector_document_metadata_fetch_fn=partial(
+ get_qdrant_document_cross_connector_metadata,
collection_name=collection,
q_client=q_client,
),
- user_str,
+ index_attempt_metadata=index_attempt_metadata,
)
- if delete_doc:
+ if should_delete_doc:
# Processing the first chunk of the doc and the doc exists
- docs_deleted += 1
delete_qdrant_doc_chunks(document.id, collection, q_client)
- point_structs.extend(
- [
+ for minichunk_ind, embedding in enumerate(chunk.embeddings):
+ qdrant_id = str(get_uuid_from_chunk(chunk, minichunk_ind))
+ insertion_records.append(
+ ChunkInsertionRecord(
+ document_id=document.id,
+ store_id=qdrant_id,
+ already_existed=should_delete_doc,
+ )
+ )
+ point_structs.append(
PointStruct(
- id=str(get_uuid_from_chunk(chunk, minichunk_ind)),
+ id=qdrant_id,
payload={
DOCUMENT_ID: document.id,
CHUNK_ID: chunk.chunk_id,
@@ -136,15 +152,17 @@ def index_qdrant_chunks(
SOURCE_LINKS: chunk.source_links,
SEMANTIC_IDENTIFIER: document.semantic_identifier,
SECTION_CONTINUATION: chunk.section_continuation,
- ALLOWED_USERS: doc_user_map[document.id][ALLOWED_USERS],
- ALLOWED_GROUPS: doc_user_map[document.id][ALLOWED_GROUPS],
+ ALLOWED_USERS: cross_connector_document_metadata_map[
+ document.id
+ ].allowed_users,
+ ALLOWED_GROUPS: cross_connector_document_metadata_map[
+ document.id
+ ].allowed_user_groups,
METADATA: json.dumps(document.metadata),
},
vector=embedding,
)
- for minichunk_ind, embedding in enumerate(chunk.embeddings)
- ]
- )
+ )
if batch_upsert:
point_struct_batches = [
@@ -179,4 +197,4 @@ def index_qdrant_chunks(
f"Document batch of size {len(point_structs)} indexing status: {index_results.status}"
)
- return len(doc_user_map.keys()) - docs_deleted
+ return insertion_records
diff --git a/backend/danswer/datastores/qdrant/store.py b/backend/danswer/datastores/qdrant/store.py
index d13a293a5..68ea1b6d1 100644
--- a/backend/danswer/datastores/qdrant/store.py
+++ b/backend/danswer/datastores/qdrant/store.py
@@ -1,3 +1,4 @@
+from typing import Any
from uuid import UUID
from qdrant_client.http.exceptions import ResponseHandlingException
@@ -14,8 +15,12 @@ from danswer.configs.app_configs import QDRANT_DEFAULT_COLLECTION
from danswer.configs.constants import ALLOWED_USERS
from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
+from danswer.connectors.models import IndexAttemptMetadata
+from danswer.connectors.utils import batch_generator
from danswer.datastores.datastore_utils import get_uuid_from_chunk
+from danswer.datastores.interfaces import ChunkInsertionRecord
from danswer.datastores.interfaces import IndexFilter
+from danswer.datastores.interfaces import UpdateRequest
from danswer.datastores.interfaces import VectorIndex
from danswer.datastores.qdrant.indexing import index_qdrant_chunks
from danswer.search.search_utils import get_default_embedding_model
@@ -25,6 +30,9 @@ from danswer.utils.timing import log_function_time
logger = setup_logger()
+# how many points we want to delete/update at a time
+_BATCH_SIZE = 200
+
def _build_qdrant_filters(
user_id: UUID | None, filters: list[IndexFilter] | None
@@ -78,10 +86,14 @@ class QdrantIndex(VectorIndex):
self.collection = collection
self.client = get_qdrant_client()
- def index(self, chunks: list[EmbeddedIndexChunk], user_id: UUID | None) -> int:
+ def index(
+ self,
+ chunks: list[EmbeddedIndexChunk],
+ index_attempt_metadata: IndexAttemptMetadata,
+ ) -> list[ChunkInsertionRecord]:
return index_qdrant_chunks(
chunks=chunks,
- user_id=user_id,
+ index_attempt_metadata=index_attempt_metadata,
collection=self.collection,
client=self.client,
)
@@ -145,6 +157,29 @@ class QdrantIndex(VectorIndex):
return found_inference_chunks
+ def delete(self, ids: list[str]) -> None:
+ logger.info(f"Deleting {len(ids)} documents from Qdrant")
+ for id_batch in batch_generator(items=ids, batch_size=_BATCH_SIZE):
+ self.client.delete(
+ collection_name=self.collection,
+ points_selector=id_batch,
+ )
+
+ def update(self, update_requests: list[UpdateRequest]) -> None:
+ logger.info(
+ f"Updating {len(update_requests)} documents' allowed_users in Qdrant"
+ )
+ for update_request in update_requests:
+ for id_batch in batch_generator(
+ items=update_request.ids,
+ batch_size=_BATCH_SIZE,
+ ):
+ self.client.set_payload(
+ collection_name=self.collection,
+ payload={ALLOWED_USERS: update_request.allowed_users},
+ points=id_batch,
+ )
+
def get_from_id(self, object_id: str) -> InferenceChunk | None:
matches, _ = self.client.scroll(
collection_name=self.collection,
diff --git a/backend/danswer/datastores/qdrant/utils.py b/backend/danswer/datastores/qdrant/utils.py
new file mode 100644
index 000000000..6d4e7b7ea
--- /dev/null
+++ b/backend/danswer/datastores/qdrant/utils.py
@@ -0,0 +1,12 @@
+from typing import Any
+
+from qdrant_client.http.models import Record
+
+
+def get_payload_from_record(record: Record, is_required: bool = True) -> dict[str, Any]:
+ if record.payload is None and is_required:
+ raise RuntimeError(
+ "Qdrant Index is corrupted, Document found with no metadata."
+ )
+
+ return record.payload or {}
diff --git a/backend/danswer/datastores/typesense/store.py b/backend/danswer/datastores/typesense/store.py
index bdf9ac8ce..d47492e65 100644
--- a/backend/danswer/datastores/typesense/store.py
+++ b/backend/danswer/datastores/typesense/store.py
@@ -1,6 +1,7 @@
import json
from functools import partial
from typing import Any
+from typing import cast
from uuid import UUID
import typesense # type: ignore
@@ -22,17 +23,27 @@ from danswer.configs.constants import SECTION_CONTINUATION
from danswer.configs.constants import SEMANTIC_IDENTIFIER
from danswer.configs.constants import SOURCE_LINKS
from danswer.configs.constants import SOURCE_TYPE
+from danswer.connectors.models import IndexAttemptMetadata
+from danswer.connectors.utils import batch_generator
+from danswer.datastores.datastore_utils import CrossConnectorDocumentMetadata
from danswer.datastores.datastore_utils import DEFAULT_BATCH_SIZE
from danswer.datastores.datastore_utils import get_uuid_from_chunk
-from danswer.datastores.datastore_utils import update_doc_user_map
+from danswer.datastores.datastore_utils import (
+ update_cross_connector_document_metadata_map,
+)
+from danswer.datastores.interfaces import ChunkInsertionRecord
from danswer.datastores.interfaces import IndexFilter
from danswer.datastores.interfaces import KeywordIndex
+from danswer.datastores.interfaces import UpdateRequest
from danswer.utils.clients import get_typesense_client
from danswer.utils.logger import setup_logger
logger = setup_logger()
+# how many points we want to delete/update at a time
+_BATCH_SIZE = 200
+
def check_typesense_collection_exist(
collection_name: str = TYPESENSE_DEFAULT_COLLECTION,
@@ -70,21 +81,26 @@ def create_typesense_collection(
ts_client.collections.create(collection_schema)
-def get_typesense_document_whitelists(
+def get_typesense_document_cross_connector_metadata(
doc_chunk_id: str, collection_name: str, ts_client: typesense.Client
-) -> tuple[bool, list[str], list[str]]:
+) -> CrossConnectorDocumentMetadata | None:
"""Returns whether the document already exists and the users/group whitelists"""
try:
- document = (
- ts_client.collections[collection_name].documents[doc_chunk_id].retrieve()
+ document = cast(
+ dict[str, Any],
+ ts_client.collections[collection_name].documents[doc_chunk_id].retrieve(),
)
except ObjectNotFound:
- return False, [], []
+ return None
if document[ALLOWED_USERS] is None or document[ALLOWED_GROUPS] is None:
raise RuntimeError(
"Typesense Index is corrupted, Document found with no access lists."
)
- return True, document[ALLOWED_USERS], document[ALLOWED_GROUPS]
+ return CrossConnectorDocumentMetadata(
+ allowed_users=document[ALLOWED_USERS],
+ allowed_user_groups=document[ALLOWED_GROUPS],
+ already_in_index=True,
+ )
def delete_typesense_doc_chunks(
@@ -100,38 +116,49 @@ def delete_typesense_doc_chunks(
def index_typesense_chunks(
chunks: list[IndexChunk | EmbeddedIndexChunk],
- user_id: UUID | None,
+ index_attempt_metadata: IndexAttemptMetadata,
collection: str,
client: typesense.Client | None = None,
batch_upsert: bool = True,
-) -> int:
- user_str = PUBLIC_DOC_PAT if user_id is None else str(user_id)
+) -> list[ChunkInsertionRecord]:
ts_client: typesense.Client = client if client else get_typesense_client()
+ insertion_records: list[ChunkInsertionRecord] = []
new_documents: list[dict[str, Any]] = []
- doc_user_map: dict[str, dict[str, list[str]]] = {}
- docs_deleted = 0
+ cross_connector_document_metadata_map: dict[
+ str, CrossConnectorDocumentMetadata
+ ] = {}
for chunk in chunks:
document = chunk.source_document
- doc_user_map, delete_doc = update_doc_user_map(
- chunk,
- doc_user_map,
- partial(
- get_typesense_document_whitelists,
+ (
+ cross_connector_document_metadata_map,
+ should_delete_doc,
+ ) = update_cross_connector_document_metadata_map(
+ chunk=chunk,
+ cross_connector_document_metadata_map=cross_connector_document_metadata_map,
+ doc_store_cross_connector_document_metadata_fetch_fn=partial(
+ get_typesense_document_cross_connector_metadata,
collection_name=collection,
ts_client=ts_client,
),
- user_str,
+ index_attempt_metadata=index_attempt_metadata,
)
- if delete_doc:
+ if should_delete_doc:
# Processing the first chunk of the doc and the doc exists
- docs_deleted += 1
delete_typesense_doc_chunks(document.id, collection, ts_client)
+ typesense_id = str(get_uuid_from_chunk(chunk))
+ insertion_records.append(
+ ChunkInsertionRecord(
+ document_id=document.id,
+ store_id=typesense_id,
+ already_existed=should_delete_doc,
+ )
+ )
new_documents.append(
{
- "id": str(get_uuid_from_chunk(chunk)), # No minichunks for typesense
+ "id": typesense_id, # No minichunks for typesense
DOCUMENT_ID: document.id,
CHUNK_ID: chunk.chunk_id,
BLURB: chunk.blurb,
@@ -140,8 +167,12 @@ def index_typesense_chunks(
SOURCE_LINKS: json.dumps(chunk.source_links),
SEMANTIC_IDENTIFIER: document.semantic_identifier,
SECTION_CONTINUATION: chunk.section_continuation,
- ALLOWED_USERS: doc_user_map[document.id][ALLOWED_USERS],
- ALLOWED_GROUPS: doc_user_map[document.id][ALLOWED_GROUPS],
+ ALLOWED_USERS: cross_connector_document_metadata_map[
+ document.id
+ ].allowed_users,
+ ALLOWED_GROUPS: cross_connector_document_metadata_map[
+ document.id
+ ].allowed_user_groups,
METADATA: json.dumps(document.metadata),
}
)
@@ -170,7 +201,7 @@ def index_typesense_chunks(
for document in new_documents
]
- return len(doc_user_map.keys()) - docs_deleted
+ return insertion_records
def _build_typesense_filters(
@@ -208,14 +239,39 @@ class TypesenseIndex(KeywordIndex):
self.collection = collection
self.ts_client = get_typesense_client()
- def index(self, chunks: list[IndexChunk], user_id: UUID | None) -> int:
+ def index(
+ self, chunks: list[IndexChunk], index_attempt_metadata: IndexAttemptMetadata
+ ) -> list[ChunkInsertionRecord]:
return index_typesense_chunks(
chunks=chunks,
- user_id=user_id,
+ index_attempt_metadata=index_attempt_metadata,
collection=self.collection,
client=self.ts_client,
)
+ def delete(self, ids: list[str]) -> None:
+ logger.info(f"Deleting {len(ids)} documents from Typesense")
+ for id_batch in batch_generator(items=ids, batch_size=_BATCH_SIZE):
+ self.ts_client.collections[self.collection].documents.delete(
+ {"filter_by": f'id:[{",".join(id_batch)}]'}
+ )
+
+ def update(self, update_requests: list[UpdateRequest]) -> None:
+ logger.info(
+ f"Updating {len(update_requests)} documents' allowed_users in Typesense"
+ )
+ for update_request in update_requests:
+ for id_batch in batch_generator(
+ items=update_request.ids, batch_size=_BATCH_SIZE
+ ):
+ typesense_updates = [
+ {"id": doc_id, ALLOWED_USERS: update_request.allowed_users}
+ for doc_id in id_batch
+ ]
+ self.ts_client.collections[self.collection].documents.import_(
+ typesense_updates, {"action": "update"}
+ )
+
def keyword_search(
self,
query: str,
diff --git a/backend/danswer/db/connector.py b/backend/danswer/db/connector.py
index 2113a8a17..ffdcc22c6 100644
--- a/backend/danswer/db/connector.py
+++ b/backend/danswer/db/connector.py
@@ -128,7 +128,6 @@ def delete_connector(
)
db_session.delete(connector)
- db_session.commit()
return StatusResponse(
success=True, message="Connector deleted successfully", data=connector_id
)
diff --git a/backend/danswer/db/connector_credential_pair.py b/backend/danswer/db/connector_credential_pair.py
index 803078c4c..ca8638294 100644
--- a/backend/danswer/db/connector_credential_pair.py
+++ b/backend/danswer/db/connector_credential_pair.py
@@ -1,6 +1,7 @@
from datetime import datetime
from fastapi import HTTPException
+from sqlalchemy import delete
from sqlalchemy import select
from sqlalchemy.orm import Session
@@ -79,6 +80,18 @@ def update_connector_credential_pair(
db_session.commit()
+def delete_connector_credential_pair(
+ db_session: Session,
+ connector_id: int,
+ credential_id: int,
+) -> None:
+ stmt = delete(ConnectorCredentialPair).where(
+ ConnectorCredentialPair.connector_id == connector_id,
+ ConnectorCredentialPair.credential_id == credential_id,
+ )
+ db_session.execute(stmt)
+
+
def add_credential_to_connector(
connector_id: int,
credential_id: int,
@@ -115,7 +128,6 @@ def add_credential_to_connector(
association = ConnectorCredentialPair(
connector_id=connector_id,
credential_id=credential_id,
- last_attempt_status=IndexingStatus.NOT_STARTED,
)
db_session.add(association)
db_session.commit()
diff --git a/backend/danswer/db/credentials.py b/backend/danswer/db/credentials.py
index 61d110c06..30980876d 100644
--- a/backend/danswer/db/credentials.py
+++ b/backend/danswer/db/credentials.py
@@ -15,22 +15,6 @@ from danswer.utils.logger import setup_logger
logger = setup_logger()
-def mask_string(sensitive_str: str) -> str:
- return "****...**" + sensitive_str[-4:]
-
-
-def mask_credential_dict(credential_dict: dict[str, Any]) -> dict[str, str]:
- masked_creds = {}
- for key, val in credential_dict.items():
- if not isinstance(val, str):
- raise ValueError(
- "Unable to mask credentials of type other than string, cannot process request."
- )
-
- masked_creds[key] = mask_string(val)
- return masked_creds
-
-
def fetch_credentials(
user: User | None,
db_session: Session,
diff --git a/backend/danswer/db/deletion_attempt.py b/backend/danswer/db/deletion_attempt.py
new file mode 100644
index 000000000..224b2c44d
--- /dev/null
+++ b/backend/danswer/db/deletion_attempt.py
@@ -0,0 +1,85 @@
+from sqlalchemy import and_
+from sqlalchemy import delete
+from sqlalchemy import desc
+from sqlalchemy import select
+from sqlalchemy.orm import Session
+
+from danswer.db.models import ConnectorCredentialPair
+from danswer.db.models import DeletionAttempt
+from danswer.db.models import DeletionStatus
+from danswer.db.models import IndexingStatus
+
+
+def check_deletion_attempt_is_allowed(
+ connector_credential_pair: ConnectorCredentialPair,
+) -> bool:
+ """
+ To be deletable:
+ (1) connector should be disabled
+ (2) there should be no in-progress/planned index attempts
+ """
+ return bool(
+ connector_credential_pair.connector.disabled
+ and (
+ connector_credential_pair.last_attempt_status != IndexingStatus.IN_PROGRESS
+ and connector_credential_pair.last_attempt_status
+ != IndexingStatus.NOT_STARTED
+ )
+ )
+
+
+def create_deletion_attempt(
+ connector_id: int,
+ credential_id: int,
+ db_session: Session,
+) -> int:
+ new_attempt = DeletionAttempt(
+ connector_id=connector_id,
+ credential_id=credential_id,
+ status=DeletionStatus.NOT_STARTED,
+ )
+ db_session.add(new_attempt)
+ db_session.commit()
+
+ return new_attempt.id
+
+
+def get_not_started_index_attempts(db_session: Session) -> list[DeletionAttempt]:
+ stmt = select(DeletionAttempt).where(
+ DeletionAttempt.status == DeletionStatus.NOT_STARTED
+ )
+ not_started_deletion_attempts = db_session.scalars(stmt)
+ return list(not_started_deletion_attempts.all())
+
+
+def get_deletion_attempts(
+ db_session: Session,
+ connector_ids: list[int] | None = None,
+ statuses: list[DeletionStatus] | None = None,
+ ordered_by_time_updated: bool = False,
+ limit: int | None = None,
+) -> list[DeletionAttempt]:
+ stmt = select(DeletionAttempt)
+ if connector_ids:
+ stmt = stmt.where(DeletionAttempt.connector_id.in_(connector_ids))
+ if statuses:
+ stmt = stmt.where(DeletionAttempt.status.in_(statuses))
+ if ordered_by_time_updated:
+ stmt = stmt.order_by(desc(DeletionAttempt.time_updated))
+ if limit:
+ stmt = stmt.limit(limit)
+
+ deletion_attempts = db_session.scalars(stmt)
+ return list(deletion_attempts.all())
+
+
+def delete_deletion_attempts(
+ db_session: Session, connector_id: int, credential_id: int
+) -> None:
+ stmt = delete(DeletionAttempt).where(
+ and_(
+ DeletionAttempt.connector_id == connector_id,
+ DeletionAttempt.credential_id == credential_id,
+ )
+ )
+ db_session.execute(stmt)
diff --git a/backend/danswer/db/document.py b/backend/danswer/db/document.py
new file mode 100644
index 000000000..4e8c49804
--- /dev/null
+++ b/backend/danswer/db/document.py
@@ -0,0 +1,185 @@
+from collections.abc import Sequence
+from dataclasses import dataclass
+
+from sqlalchemy import and_
+from sqlalchemy import delete
+from sqlalchemy import func
+from sqlalchemy import select
+from sqlalchemy.dialects.postgresql import insert
+from sqlalchemy.orm import Session
+
+from danswer.datastores.interfaces import ChunkMetadata
+from danswer.db.models import Chunk
+from danswer.db.models import Document
+from danswer.db.models import DocumentByConnectorCredentialPair
+from danswer.db.utils import model_to_dict
+from danswer.utils.logger import setup_logger
+
+logger = setup_logger()
+
+
+def get_chunks_with_single_connector_credential_pair(
+ db_session: Session,
+ connector_id: int,
+ credential_id: int,
+) -> Sequence[Chunk]:
+ initial_doc_ids_stmt = select(DocumentByConnectorCredentialPair.id).where(
+ and_(
+ DocumentByConnectorCredentialPair.connector_id == connector_id,
+ DocumentByConnectorCredentialPair.credential_id == credential_id,
+ )
+ )
+
+ trimmed_doc_ids_stmt = (
+ select(Document.id)
+ .join(
+ DocumentByConnectorCredentialPair,
+ DocumentByConnectorCredentialPair.id == Document.id,
+ )
+ .where(Document.id.in_(initial_doc_ids_stmt))
+ .group_by(Document.id)
+ .having(func.count(DocumentByConnectorCredentialPair.id) == 1)
+ )
+
+ stmt = select(Chunk).where(Chunk.document_id.in_(trimmed_doc_ids_stmt))
+ return db_session.scalars(stmt).all()
+
+
+def get_document_by_connector_credential_pairs_indexed_by_multiple(
+ db_session: Session,
+ connector_id: int,
+ credential_id: int,
+) -> Sequence[DocumentByConnectorCredentialPair]:
+ initial_doc_ids_stmt = select(DocumentByConnectorCredentialPair.id).where(
+ and_(
+ DocumentByConnectorCredentialPair.connector_id == connector_id,
+ DocumentByConnectorCredentialPair.credential_id == credential_id,
+ )
+ )
+
+ trimmed_doc_ids_stmt = (
+ select(Document.id)
+ .join(
+ DocumentByConnectorCredentialPair,
+ DocumentByConnectorCredentialPair.id == Document.id,
+ )
+ .where(Document.id.in_(initial_doc_ids_stmt))
+ .group_by(Document.id)
+ .having(func.count(DocumentByConnectorCredentialPair.id) > 1)
+ )
+
+ stmt = select(DocumentByConnectorCredentialPair).where(
+ DocumentByConnectorCredentialPair.id.in_(trimmed_doc_ids_stmt)
+ )
+
+ return db_session.execute(stmt).scalars().all()
+
+
+def get_chunk_ids_for_document_ids(
+ db_session: Session, document_ids: list[str]
+) -> Sequence[str]:
+ stmt = select(Chunk.id).where(Chunk.document_id.in_(document_ids))
+ return db_session.execute(stmt).scalars().all()
+
+
+def upsert_documents(
+ db_session: Session, document_metadata_batch: list[ChunkMetadata]
+) -> None:
+ """NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause."""
+ seen_document_ids: set[str] = set()
+ for document_metadata in document_metadata_batch:
+ if document_metadata.document_id not in seen_document_ids:
+ seen_document_ids.add(document_metadata.document_id)
+
+ insert_stmt = insert(Document).values(
+ [model_to_dict(Document(id=doc_id)) for doc_id in seen_document_ids]
+ )
+ # for now, there are no columns to update. If more metadata is added, then this
+ # needs to change to an `on_conflict_do_update`
+ on_conflict_stmt = insert_stmt.on_conflict_do_nothing()
+ db_session.execute(on_conflict_stmt)
+ db_session.commit()
+
+
+def upsert_document_by_connector_credential_pair(
+ db_session: Session, document_metadata_batch: list[ChunkMetadata]
+) -> None:
+ """NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause."""
+ insert_stmt = insert(DocumentByConnectorCredentialPair).values(
+ [
+ model_to_dict(
+ DocumentByConnectorCredentialPair(
+ id=document_metadata.document_id,
+ connector_id=document_metadata.connector_id,
+ credential_id=document_metadata.credential_id,
+ )
+ )
+ for document_metadata in document_metadata_batch
+ ]
+ )
+ # for now, there are no columns to update. If more metadata is added, then this
+ # needs to change to an `on_conflict_do_update`
+ on_conflict_stmt = insert_stmt.on_conflict_do_nothing()
+ db_session.execute(on_conflict_stmt)
+ db_session.commit()
+
+
+def upsert_chunks(
+ db_session: Session, document_metadata_batch: list[ChunkMetadata]
+) -> None:
+ """NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause."""
+ insert_stmt = insert(Chunk).values(
+ [
+ model_to_dict(
+ Chunk(
+ id=document_metadata.store_id,
+ document_id=document_metadata.document_id,
+ document_store_type=document_metadata.document_store_type,
+ )
+ )
+ for document_metadata in document_metadata_batch
+ ]
+ )
+ on_conflict_stmt = insert_stmt.on_conflict_do_update(
+ index_elements=["id", "document_store_type"],
+ set_=dict(document_id=insert_stmt.excluded.document_id),
+ )
+ db_session.execute(on_conflict_stmt)
+ db_session.commit()
+
+
+def upsert_documents_complete(
+ db_session: Session, document_metadata_batch: list[ChunkMetadata]
+) -> None:
+ upsert_documents(db_session, document_metadata_batch)
+ upsert_document_by_connector_credential_pair(db_session, document_metadata_batch)
+ upsert_chunks(db_session, document_metadata_batch)
+ logger.info(
+ f"Upserted {len(document_metadata_batch)} document store entries into DB"
+ )
+
+
+def delete_document_store_entries(db_session: Session, document_ids: list[str]) -> None:
+ db_session.execute(delete(Chunk).where(Chunk.document_id.in_(document_ids)))
+
+
+def delete_document_by_connector_credential_pair(
+ db_session: Session, document_ids: list[str]
+) -> None:
+ db_session.execute(
+ delete(DocumentByConnectorCredentialPair).where(
+ DocumentByConnectorCredentialPair.id.in_(document_ids)
+ )
+ )
+
+
+def delete_documents(db_session: Session, document_ids: list[str]) -> None:
+ db_session.execute(delete(Document).where(Document.id.in_(document_ids)))
+
+
+def delete_documents_complete(db_session: Session, document_ids: list[str]) -> None:
+ logger.info(f"Deleting {len(document_ids)} documents from the DB")
+ delete_document_store_entries(db_session, document_ids)
+ delete_document_by_connector_credential_pair(db_session, document_ids)
+ delete_documents(db_session, document_ids)
+ db_session.commit()
diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py
index 30502d73c..ee3cdc95c 100644
--- a/backend/danswer/db/index_attempt.py
+++ b/backend/danswer/db/index_attempt.py
@@ -1,3 +1,4 @@
+from sqlalchemy import delete
from sqlalchemy import desc
from sqlalchemy import select
from sqlalchemy.orm import Session
@@ -86,3 +87,15 @@ def get_last_successful_attempt(
stmt = stmt.order_by(desc(IndexAttempt.time_created))
return db_session.execute(stmt).scalars().first()
+
+
+def delete_index_attempts(
+ connector_id: int,
+ credential_id: int,
+ db_session: Session,
+) -> None:
+ stmt = delete(IndexAttempt).where(
+ IndexAttempt.connector_id == connector_id,
+ IndexAttempt.credential_id == credential_id,
+ )
+ db_session.execute(stmt)
diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py
index 815904840..d48446c93 100644
--- a/backend/danswer/db/models.py
+++ b/backend/danswer/db/models.py
@@ -2,6 +2,7 @@ import datetime
from enum import Enum as PyEnum
from typing import Any
from typing import List
+from typing import Optional
from uuid import UUID
from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID
@@ -24,6 +25,7 @@ from sqlalchemy.orm import relationship
from danswer.auth.schemas import UserRole
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
+from danswer.datastores.interfaces import StoreType
class IndexingStatus(str, PyEnum):
@@ -33,6 +35,14 @@ class IndexingStatus(str, PyEnum):
FAILED = "failed"
+# these may differ in the future, which is why we're okay with this duplication
+class DeletionStatus(str, PyEnum):
+ NOT_STARTED = "not_started"
+ IN_PROGRESS = "in_progress"
+ SUCCESS = "success"
+ FAILED = "failed"
+
+
class Base(DeclarativeBase):
pass
@@ -75,7 +85,9 @@ class ConnectorCredentialPair(Base):
last_successful_index_time: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True), default=None
)
- last_attempt_status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
+ last_attempt_status: Mapped[IndexingStatus | None] = mapped_column(
+ Enum(IndexingStatus)
+ )
total_docs_indexed: Mapped[int] = mapped_column(Integer, default=0)
connector: Mapped["Connector"] = relationship(
@@ -112,9 +124,15 @@ class Connector(Base):
back_populates="connector",
cascade="all, delete-orphan",
)
+ documents_by_connector: Mapped[
+ List["DocumentByConnectorCredentialPair"]
+ ] = relationship("DocumentByConnectorCredentialPair", back_populates="connector")
index_attempts: Mapped[List["IndexAttempt"]] = relationship(
"IndexAttempt", back_populates="connector"
)
+ deletion_attempt: Mapped[Optional["DeletionAttempt"]] = relationship(
+ "DeletionAttempt", back_populates="connector"
+ )
class Credential(Base):
@@ -136,9 +154,15 @@ class Credential(Base):
back_populates="credential",
cascade="all, delete-orphan",
)
+ documents_by_credential: Mapped[
+ List["DocumentByConnectorCredentialPair"]
+ ] = relationship("DocumentByConnectorCredentialPair", back_populates="credential")
index_attempts: Mapped[List["IndexAttempt"]] = relationship(
"IndexAttempt", back_populates="credential"
)
+ deletion_attempt: Mapped[Optional["DeletionAttempt"]] = relationship(
+ "DeletionAttempt", back_populates="credential"
+ )
user: Mapped[User] = relationship("User", back_populates="credentials")
@@ -190,3 +214,98 @@ class IndexAttempt(Base):
f"time_created={self.time_created!r}, "
f"time_updated={self.time_updated!r}, "
)
+
+
+class DeletionAttempt(Base):
+ """Represents an attempt to delete all documents indexed by a specific
+ connector / credential pair.
+ """
+
+ __tablename__ = "deletion_attempt"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ connector_id: Mapped[int] = mapped_column(
+ ForeignKey("connector.id"),
+ )
+ credential_id: Mapped[int] = mapped_column(
+ ForeignKey("credential.id"),
+ )
+ status: Mapped[DeletionStatus] = mapped_column(Enum(DeletionStatus))
+ num_docs_deleted: Mapped[int] = mapped_column(Integer, default=0)
+ error_msg: Mapped[str | None] = mapped_column(
+ String(), default=None
+ ) # only filled if status = "failed"
+ time_created: Mapped[datetime.datetime] = mapped_column(
+ DateTime(timezone=True),
+ server_default=func.now(),
+ )
+ time_updated: Mapped[datetime.datetime] = mapped_column(
+ DateTime(timezone=True),
+ server_default=func.now(),
+ onupdate=func.now(),
+ )
+
+ connector: Mapped[Connector] = relationship(
+ "Connector", back_populates="deletion_attempt"
+ )
+ credential: Mapped[Credential] = relationship(
+ "Credential", back_populates="deletion_attempt"
+ )
+
+
+class Document(Base):
+ """Represents a single documents from a source. This is used to store
+ document level metadata so we don't need to duplicate it in a bunch of
+ DocumentByConnectorCredentialPair's/Chunk's for documents
+ that are split into many chunks and/or indexed by many connector / credential
+ pairs."""
+
+ __tablename__ = "document"
+
+ # this should correspond to the ID of the document (as is passed around
+ # in Danswer)
+ id: Mapped[str] = mapped_column(String, primary_key=True)
+
+ document_store_entries: Mapped["Chunk"] = relationship(
+ "Chunk", back_populates="document"
+ )
+
+
+class DocumentByConnectorCredentialPair(Base):
+ """Represents an indexing of a document by a specific connector / credential
+ pair"""
+
+ __tablename__ = "document_by_connector_credential_pair"
+
+ id: Mapped[str] = mapped_column(ForeignKey("document.id"), primary_key=True)
+ connector_id: Mapped[int] = mapped_column(
+ ForeignKey("connector.id"), primary_key=True
+ )
+ credential_id: Mapped[int] = mapped_column(
+ ForeignKey("credential.id"), primary_key=True
+ )
+
+ connector: Mapped[Connector] = relationship(
+ "Connector", back_populates="documents_by_connector"
+ )
+ credential: Mapped[Credential] = relationship(
+ "Credential", back_populates="documents_by_credential"
+ )
+
+
+class Chunk(Base):
+ """A row represents a single entry in a document store (e.g. a single chunk
+ in Qdrant/Typesense)"""
+
+ __tablename__ = "chunk"
+
+ # this should correspond to the ID in the document store
+ id: Mapped[str] = mapped_column(String, primary_key=True)
+ document_store_type: Mapped[StoreType] = mapped_column(
+ Enum(StoreType), primary_key=True
+ )
+ document_id: Mapped[str] = mapped_column(ForeignKey("document.id"))
+
+ document: Mapped[Document] = relationship(
+ "Document", back_populates="document_store_entries"
+ )
diff --git a/backend/danswer/db/utils.py b/backend/danswer/db/utils.py
new file mode 100644
index 000000000..c188543c4
--- /dev/null
+++ b/backend/danswer/db/utils.py
@@ -0,0 +1,9 @@
+from typing import Any
+
+from sqlalchemy import inspect
+
+from danswer.db.models import Base
+
+
+def model_to_dict(model: Base) -> dict[str, Any]:
+ return {c.key: getattr(model, c.key) for c in inspect(model).mapper.column_attrs} # type: ignore
diff --git a/backend/danswer/server/manage.py b/backend/danswer/server/manage.py
index 0e1a40d62..181c327d4 100644
--- a/backend/danswer/server/manage.py
+++ b/backend/danswer/server/manage.py
@@ -9,6 +9,7 @@ from fastapi import Request
from fastapi import Response
from fastapi import UploadFile
from fastapi_users.db import SQLAlchemyUserDatabase
+from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
@@ -36,17 +37,23 @@ from danswer.db.connector import fetch_connectors
from danswer.db.connector import get_connector_credential_ids
from danswer.db.connector import update_connector
from danswer.db.connector_credential_pair import add_credential_to_connector
+from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.connector_credential_pair import remove_credential_from_connector
from danswer.db.credentials import create_credential
from danswer.db.credentials import delete_credential
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.credentials import fetch_credentials
-from danswer.db.credentials import mask_credential_dict
from danswer.db.credentials import update_credential
+from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
+from danswer.db.deletion_attempt import create_deletion_attempt
+from danswer.db.deletion_attempt import get_deletion_attempts
from danswer.db.engine import get_session
from danswer.db.engine import get_sqlalchemy_async_engine
from danswer.db.index_attempt import create_index_attempt
+from danswer.db.models import DeletionAttempt
+from danswer.db.models import DeletionStatus
+from danswer.db.models import IndexingStatus
from danswer.db.models import User
from danswer.direct_qa import check_model_api_key_is_valid
from danswer.direct_qa import get_default_backend_qa_model
@@ -58,10 +65,12 @@ from danswer.server.models import ApiKey
from danswer.server.models import AuthStatus
from danswer.server.models import AuthUrl
from danswer.server.models import ConnectorBase
+from danswer.server.models import ConnectorCredentialPairIdentifier
from danswer.server.models import ConnectorIndexingStatus
from danswer.server.models import ConnectorSnapshot
from danswer.server.models import CredentialBase
from danswer.server.models import CredentialSnapshot
+from danswer.server.models import DeletionAttemptSnapshot
from danswer.server.models import FileUploadResponse
from danswer.server.models import GDriveCallback
from danswer.server.models import GoogleAppCredentials
@@ -179,18 +188,42 @@ def get_connector_indexing_status(
) -> list[ConnectorIndexingStatus]:
indexing_statuses: list[ConnectorIndexingStatus] = []
+ # TODO: make this one query
cc_pairs = get_connector_credential_pairs(db_session)
+ deletion_attempts_by_connector: dict[int, list[DeletionAttempt]] = {
+ cc_pair.connector.id: [] for cc_pair in cc_pairs
+ }
+ for deletion_attempt in get_deletion_attempts(
+ db_session=db_session,
+ connector_ids=[cc_pair.connector.id for cc_pair in cc_pairs],
+ ordered_by_time_updated=True,
+ ):
+ deletion_attempts_by_connector[deletion_attempt.connector_id].append(
+ deletion_attempt
+ )
+
for cc_pair in cc_pairs:
connector = cc_pair.connector
credential = cc_pair.credential
+ deletion_attemts = deletion_attempts_by_connector.get(connector.id, [])
indexing_statuses.append(
ConnectorIndexingStatus(
connector=ConnectorSnapshot.from_connector_db_model(connector),
+ credential=CredentialSnapshot.from_credential_db_model(credential),
public_doc=credential.public_doc,
owner=credential.user.email if credential.user else "",
last_status=cc_pair.last_attempt_status,
last_success=cc_pair.last_successful_index_time,
docs_indexed=cc_pair.total_docs_indexed,
+ deletion_attempts=[
+ DeletionAttemptSnapshot.from_deletion_attempt_db_model(
+ deletion_attempt
+ )
+ for deletion_attempt in deletion_attemts
+ ],
+ is_deletable=check_deletion_attempt_is_allowed(
+ connector_credential_pair=cc_pair
+ ),
)
)
@@ -244,7 +277,8 @@ def delete_connector_by_id(
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> StatusResponse[int]:
- return delete_connector(connector_id, db_session)
+ with db_session.begin():
+ return delete_connector(db_session=db_session, connector_id=connector_id)
@router.post("/admin/connector/run-once")
@@ -373,6 +407,62 @@ def delete_genai_api_key(
get_dynamic_config_store().delete(GEN_AI_API_KEY_STORAGE_KEY)
+@router.post("/admin/deletion-attempt")
+def create_deletion_attempt_for_connector_id(
+ connector_credential_pair_identifier: ConnectorCredentialPairIdentifier,
+ _: User = Depends(current_admin_user),
+ db_session: Session = Depends(get_session),
+) -> None:
+ connector_id = connector_credential_pair_identifier.connector_id
+ credential_id = connector_credential_pair_identifier.credential_id
+
+ cc_pair = get_connector_credential_pair(
+ db_session=db_session,
+ connector_id=connector_id,
+ credential_id=credential_id,
+ )
+ if cc_pair is None:
+ raise HTTPException(
+ status_code=404,
+ detail=f"Connector with ID '{connector_id}' and credential ID "
+ f"'{credential_id}' does not exist. Has it already been deleted?",
+ )
+
+ if not check_deletion_attempt_is_allowed(connector_credential_pair=cc_pair):
+ raise HTTPException(
+ status_code=400,
+ detail=f"Connector with ID '{connector_id}' and credential ID "
+ f"'{credential_id}' is not deletable. It must be both disabled AND have"
+ "no ongoing / planned indexing attempts.",
+ )
+
+ create_deletion_attempt(
+ connector_id=connector_id,
+ credential_id=credential_id,
+ db_session=db_session,
+ )
+
+
+@router.get("/admin/deletion-attempt/{connector_id}")
+def get_deletion_attempts_for_connector_id(
+ connector_id: int,
+ _: User = Depends(current_admin_user),
+ db_session: Session = Depends(get_session),
+) -> list[DeletionAttemptSnapshot]:
+ deletion_attempts = get_deletion_attempts(
+ db_session=db_session, connector_ids=[connector_id]
+ )
+ return [
+ DeletionAttemptSnapshot(
+ connector_id=connector_id,
+ status=deletion_attempt.status,
+ error_msg=deletion_attempt.error_msg,
+ num_docs_deleted=deletion_attempt.num_docs_deleted,
+ )
+ for deletion_attempt in deletion_attempts
+ ]
+
+
"""Endpoints for basic users"""
@@ -468,16 +558,7 @@ def get_credentials(
) -> list[CredentialSnapshot]:
credentials = fetch_credentials(user, db_session)
return [
- CredentialSnapshot(
- id=credential.id,
- credential_json=mask_credential_dict(credential.credential_json)
- if MASK_CREDENTIAL_PREFIX
- else credential.credential_json,
- user_id=credential.user_id,
- public_doc=credential.public_doc,
- time_created=credential.time_created,
- time_updated=credential.time_updated,
- )
+ CredentialSnapshot.from_credential_db_model(credential)
for credential in credentials
]
@@ -495,16 +576,7 @@ def get_credential_by_id(
detail=f"Credential {credential_id} does not exist or does not belong to user",
)
- return CredentialSnapshot(
- id=credential.id,
- credential_json=mask_credential_dict(credential.credential_json)
- if MASK_CREDENTIAL_PREFIX
- else credential.credential_json,
- user_id=credential.user_id,
- public_doc=credential.public_doc,
- time_created=credential.time_created,
- time_updated=credential.time_updated,
- )
+ return CredentialSnapshot.from_credential_db_model(credential)
@router.post("/credential")
diff --git a/backend/danswer/server/models.py b/backend/danswer/server/models.py
index bfb8b94b4..a2735053a 100644
--- a/backend/danswer/server/models.py
+++ b/backend/danswer/server/models.py
@@ -9,13 +9,18 @@ from uuid import UUID
from pydantic import BaseModel
from pydantic.generics import GenericModel
+from danswer.configs.app_configs import MASK_CREDENTIAL_PREFIX
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
from danswer.datastores.interfaces import IndexFilter
from danswer.db.models import Connector
+from danswer.db.models import Credential
+from danswer.db.models import DeletionAttempt
+from danswer.db.models import DeletionStatus
from danswer.db.models import IndexingStatus
from danswer.search.models import QueryFlow
from danswer.search.models import SearchType
+from danswer.server.utils import mask_credential_dict
DataT = TypeVar("DataT")
@@ -119,6 +124,24 @@ class IndexAttemptRequest(BaseModel):
connector_specific_config: dict[str, Any]
+class DeletionAttemptSnapshot(BaseModel):
+ connector_id: int
+ status: DeletionStatus
+ error_msg: str | None
+ num_docs_deleted: int
+
+ @classmethod
+ def from_deletion_attempt_db_model(
+ cls, deletion_attempt: DeletionAttempt
+ ) -> "DeletionAttemptSnapshot":
+ return DeletionAttemptSnapshot(
+ connector_id=deletion_attempt.connector_id,
+ status=deletion_attempt.status,
+ error_msg=deletion_attempt.error_msg,
+ num_docs_deleted=deletion_attempt.num_docs_deleted,
+ )
+
+
class ConnectorBase(BaseModel):
name: str
source: DocumentSource
@@ -152,17 +175,6 @@ class ConnectorSnapshot(ConnectorBase):
)
-class ConnectorIndexingStatus(BaseModel):
- """Represents the latest indexing status of a connector"""
-
- connector: ConnectorSnapshot
- owner: str
- public_doc: bool
- last_status: IndexingStatus
- last_success: datetime | None
- docs_indexed: int
-
-
class RunConnectorRequest(BaseModel):
connector_id: int
credential_ids: list[int] | None
@@ -179,6 +191,38 @@ class CredentialSnapshot(CredentialBase):
time_created: datetime
time_updated: datetime
+ @classmethod
+ def from_credential_db_model(cls, credential: Credential) -> "CredentialSnapshot":
+ return CredentialSnapshot(
+ id=credential.id,
+ credential_json=mask_credential_dict(credential.credential_json)
+ if MASK_CREDENTIAL_PREFIX
+ else credential.credential_json,
+ user_id=credential.user_id,
+ public_doc=credential.public_doc,
+ time_created=credential.time_created,
+ time_updated=credential.time_updated,
+ )
+
+
+class ConnectorIndexingStatus(BaseModel):
+ """Represents the latest indexing status of a connector"""
+
+ connector: ConnectorSnapshot
+ credential: CredentialSnapshot
+ owner: str
+ public_doc: bool
+ last_status: IndexingStatus | None
+ last_success: datetime | None
+ docs_indexed: int
+ deletion_attempts: list[DeletionAttemptSnapshot]
+ is_deletable: bool
+
+
+class ConnectorCredentialPairIdentifier(BaseModel):
+ connector_id: int
+ credential_id: int
+
class ApiKey(BaseModel):
api_key: str
diff --git a/backend/danswer/server/utils.py b/backend/danswer/server/utils.py
new file mode 100644
index 000000000..4a3dc82f6
--- /dev/null
+++ b/backend/danswer/server/utils.py
@@ -0,0 +1,17 @@
+from typing import Any
+
+
+def mask_string(sensitive_str: str) -> str:
+ return "****...**" + sensitive_str[-4:]
+
+
+def mask_credential_dict(credential_dict: dict[str, Any]) -> dict[str, str]:
+ masked_creds = {}
+ for key, val in credential_dict.items():
+ if not isinstance(val, str):
+ raise ValueError(
+ "Unable to mask credentials of type other than string, cannot process request."
+ )
+
+ masked_creds[key] = mask_string(val)
+ return masked_creds
diff --git a/backend/scripts/reset_postgres.py b/backend/scripts/reset_postgres.py
index ace7cd128..19f1f0ca2 100644
--- a/backend/scripts/reset_postgres.py
+++ b/backend/scripts/reset_postgres.py
@@ -29,6 +29,9 @@ def wipe_all_rows(database: str) -> None:
table_names = cur.fetchall()
# have to delete from these first to not run into psycopg2.errors.ForeignKeyViolation
+ cur.execute(f"DELETE FROM chunk")
+ cur.execute(f"DELETE FROM document_by_connector_credential_pair")
+ cur.execute(f"DELETE FROM document")
cur.execute(f"DELETE FROM connector_credential_pair")
cur.execute(f"DELETE FROM index_attempt")
conn.commit()
diff --git a/backend/supervisord.conf b/backend/supervisord.conf
index 42d4470c8..126e854a7 100644
--- a/backend/supervisord.conf
+++ b/backend/supervisord.conf
@@ -10,6 +10,13 @@ redirect_stderr=true
stdout_logfile_maxbytes=52428800
autorestart=true
+[program:connector_deletion]
+command=python danswer/background/connector_deletion.py
+stdout_logfile=/var/log/connector_deletion.log
+redirect_stderr=true
+stdout_logfile_maxbytes=52428800
+autorestart=true
+
[program:file_deletion]
command=python danswer/background/file_deletion.py
stdout_logfile=/var/log/file_deletion.log
diff --git a/web/src/app/admin/connectors/bookstack/page.tsx b/web/src/app/admin/connectors/bookstack/page.tsx
index 70a7a7f42..bc92d9dac 100644
--- a/web/src/app/admin/connectors/bookstack/page.tsx
+++ b/web/src/app/admin/connectors/bookstack/page.tsx
@@ -27,7 +27,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
- } = useSWR
- The following files failed to be indexed. Please contact an
- administrator to resolve this issue.
- File
- Upload Files
@@ -201,81 +180,41 @@ export default function File() {
- In Progress File Indexing
-
-
- Successful File Indexing
-
-
- Failed File Indexing
-
- File
+
N/A
+ ), + } + : { credential: "" }; + return { + status: ( +