Support deletion of documents when a connector is deleted (#271)

This commit is contained in:
Chris Weaver 2023-08-09 00:53:42 -07:00 committed by GitHub
parent b6dec6dcdb
commit 89f71ac335
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1961 additions and 390 deletions

View File

@ -153,6 +153,15 @@ _For Windows:_
powershell -Command " $env:PYTHONPATH='.'; $env:TYPESENSE_API_KEY='typesense_api_key'; $env:DYNAMIC_CONFIG_DIR_PATH='./dynamic_config_storage'; python danswer/background/update.py "
```
To run the background job which handles deletion of connectors, navigate to `danswer/backend` and run:
```bash
PYTHONPATH=. TYPESENSE_API_KEY=typesense_api_key DYNAMIC_CONFIG_DIR_PATH=./dynamic_config_storage python danswer/background/connector_deletion.py
```
_For Windows:_
```bash
powershell -Command " $env:PYTHONPATH='.'; $env:TYPESENSE_API_KEY='typesense_api_key'; $env:DYNAMIC_CONFIG_DIR_PATH='./dynamic_config_storage'; python danswer/background/connector_deletion.py "
```
Note: if you need finer logging, add the additional environment variable `LOG_LEVEL=DEBUG` to the relevant services.
### Formatting and Linting

View File

@ -0,0 +1,50 @@
"""Make 'last_attempt_status' nullable
Revision ID: b082fec533f0
Revises: df0c7ad8a076
Create Date: 2023-08-06 12:05:47.087325
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "b082fec533f0"
down_revision = "df0c7ad8a076"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"connector_credential_pair",
"last_attempt_status",
existing_type=postgresql.ENUM(
"NOT_STARTED",
"IN_PROGRESS",
"SUCCESS",
"FAILED",
name="indexingstatus",
),
nullable=True,
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"connector_credential_pair",
"last_attempt_status",
existing_type=postgresql.ENUM(
"NOT_STARTED",
"IN_PROGRESS",
"SUCCESS",
"FAILED",
name="indexingstatus",
),
nullable=False,
)
# ### end Alembic commands ###

View File

@ -0,0 +1,115 @@
"""Added deletion_attempt table
Revision ID: df0c7ad8a076
Revises: d7111c1238cd
Create Date: 2023-08-05 13:35:39.609619
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "df0c7ad8a076"
down_revision = "d7111c1238cd"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"document",
sa.Column("id", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"chunk",
sa.Column("id", sa.String(), nullable=False),
sa.Column(
"document_store_type",
sa.Enum(
"VECTOR",
"KEYWORD",
name="documentstoretype",
),
nullable=False,
),
sa.Column("document_id", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["document_id"],
["document.id"],
),
sa.PrimaryKeyConstraint("id", "document_store_type"),
)
op.create_table(
"deletion_attempt",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("connector_id", sa.Integer(), nullable=False),
sa.Column("credential_id", sa.Integer(), nullable=False),
sa.Column(
"status",
sa.Enum(
"NOT_STARTED",
"IN_PROGRESS",
"SUCCESS",
"FAILED",
name="deletionstatus",
),
nullable=False,
),
sa.Column("num_docs_deleted", sa.Integer(), nullable=False),
sa.Column("error_msg", sa.String(), nullable=True),
sa.Column(
"time_created",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.Column(
"time_updated",
sa.DateTime(timezone=True),
server_default=sa.text("now()"),
nullable=False,
),
sa.ForeignKeyConstraint(
["connector_id"],
["connector.id"],
),
sa.ForeignKeyConstraint(
["credential_id"],
["credential.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"document_by_connector_credential_pair",
sa.Column("id", sa.String(), nullable=False),
sa.Column("connector_id", sa.Integer(), nullable=False),
sa.Column("credential_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
["connector_id"],
["connector.id"],
),
sa.ForeignKeyConstraint(
["credential_id"],
["credential.id"],
),
sa.ForeignKeyConstraint(
["id"],
["document.id"],
),
sa.PrimaryKeyConstraint("id", "connector_id", "credential_id"),
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("document_by_connector_credential_pair")
op.drop_table("deletion_attempt")
op.drop_table("chunk")
op.drop_table("document")
sa.Enum(name="deletionstatus").drop(op.get_bind(), checkfirst=False)
sa.Enum(name="documentstoretype").drop(op.get_bind(), checkfirst=False)
# ### end Alembic commands ###

View File

@ -0,0 +1,296 @@
"""
To delete a connector / credential pair:
(1) find all documents associated with connector / credential pair where there
this the is only connector / credential pair that has indexed it
(2) delete all documents from document stores
(3) delete all entries from postgres
(4) find all documents associated with connector / credential pair where there
are multiple connector / credential pairs that have indexed it
(5) update document store entries to remove access associated with the
connector / credential pair from the access list
(6) delete all relevant entries from postgres
"""
import time
from collections import defaultdict
from datetime import datetime
from sqlalchemy.orm import Session
from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.datastores.interfaces import KeywordIndex
from danswer.datastores.interfaces import StoreType
from danswer.datastores.interfaces import UpdateRequest
from danswer.datastores.interfaces import VectorIndex
from danswer.datastores.qdrant.store import QdrantIndex
from danswer.datastores.typesense.store import TypesenseIndex
from danswer.db.connector import fetch_connector_by_id
from danswer.db.connector_credential_pair import delete_connector_credential_pair
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
from danswer.db.deletion_attempt import delete_deletion_attempts
from danswer.db.deletion_attempt import get_deletion_attempts
from danswer.db.document import delete_document_by_connector_credential_pair
from danswer.db.document import delete_documents_complete
from danswer.db.document import get_chunk_ids_for_document_ids
from danswer.db.document import (
get_chunks_with_single_connector_credential_pair,
)
from danswer.db.document import (
get_document_by_connector_credential_pairs_indexed_by_multiple,
)
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import delete_index_attempts
from danswer.db.models import Credential
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.utils.logger import setup_logger
logger = setup_logger()
def _delete_connector_credential_pair(
db_session: Session,
vector_index: VectorIndex,
keyword_index: KeywordIndex,
deletion_attempt: DeletionAttempt,
) -> int:
connector_id = deletion_attempt.connector_id
credential_id = deletion_attempt.credential_id
def _delete_singly_indexed_docs() -> int:
# if a document store entry is only indexed by this connector_credential_pair, delete it
num_docs_deleted = 0
chunks_to_delete = get_chunks_with_single_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
if chunks_to_delete:
document_ids: set[str] = set()
vector_chunk_ids_to_delete: list[str] = []
keyword_chunk_ids_to_delete: list[str] = []
for chunk in chunks_to_delete:
document_ids.add(chunk.document_id)
if chunk.document_store_type == StoreType.KEYWORD:
keyword_chunk_ids_to_delete.append(chunk.id)
else:
vector_chunk_ids_to_delete.append(chunk.id)
vector_index.delete(ids=vector_chunk_ids_to_delete)
keyword_index.delete(ids=keyword_chunk_ids_to_delete)
# removes all `Chunk`, `DocumentByConnectorCredentialPair`, and `Document`
# rows from the DB
delete_documents_complete(
db_session=db_session,
document_ids=list(document_ids),
)
num_docs_deleted += len(document_ids)
return num_docs_deleted
num_docs_deleted = _delete_singly_indexed_docs()
def _update_multi_indexed_docs() -> None:
# if a document is indexed by multiple connector_credential_pairs, we should
# update it's access rather than outright delete it
document_by_connector_credential_pairs_to_update = (
get_document_by_connector_credential_pairs_indexed_by_multiple(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
)
def _get_user(
credential: Credential,
) -> str:
if credential.public_doc:
return PUBLIC_DOC_PAT
return str(credential.user.id)
# find out which documents need to be updated and what their new allowed_users
# should be. This is a bit slow as it requires looping through all the documents
to_be_deleted_user = _get_user(deletion_attempt.credential)
document_ids_not_needing_update: set[str] = set()
document_id_to_allowed_users: dict[str, list[str]] = defaultdict(list)
for (
document_by_connector_credential_pair
) in document_by_connector_credential_pairs_to_update:
document_id = document_by_connector_credential_pair.id
user = _get_user(document_by_connector_credential_pair.credential)
document_id_to_allowed_users[document_id].append(user)
# if there's another connector / credential pair which has indexed this
# document with the same access, we don't need to update it since removing
# the access from this connector / credential pair won't change anything
if (
document_by_connector_credential_pair.connector_id != connector_id
or document_by_connector_credential_pair.credential_id != credential_id
) and user == to_be_deleted_user:
document_ids_not_needing_update.add(document_id)
# categorize into groups of updates to try and batch them more efficiently
update_groups: dict[tuple[str, ...], list[str]] = {}
for document_id, allowed_users_lst in document_id_to_allowed_users.items():
# if document_id in document_ids_not_needing_update:
# continue
allowed_users_lst.remove(to_be_deleted_user)
allowed_users = tuple(sorted(set(allowed_users_lst)))
update_groups[allowed_users] = update_groups.get(allowed_users, []) + [
document_id
]
# actually perform the updates in the document store
update_requests = [
UpdateRequest(
ids=list(
get_chunk_ids_for_document_ids(
db_session=db_session, document_ids=document_ids
)
),
allowed_users=list(allowed_users),
)
for allowed_users, document_ids in update_groups.items()
]
vector_index.update(update_requests=update_requests)
keyword_index.update(update_requests=update_requests)
# delete the `document_by_connector_credential_pair` rows for the connector / credential pair
delete_document_by_connector_credential_pair(
db_session=db_session,
document_ids=list(document_id_to_allowed_users.keys()),
)
_update_multi_indexed_docs()
def _cleanup() -> None:
# cleanup everything else up
# we cannot undo the deletion of the document store entries if something
# goes wrong since they happen outside the postgres world. Best we can do
# is keep everything else around and mark the deletion attempt as failed.
# If it's a transient failure, re-deleting the connector / credential pair should
# fix the weird state.
# TODO: lock anything to do with this connector via transaction isolation
# NOTE: we have to delete index_attempts and deletion_attempts since they both
# have foreign key columns to the connector
delete_index_attempts(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
delete_deletion_attempts(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
delete_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
# if there are no credentials left, delete the connector
connector = fetch_connector_by_id(
db_session=db_session,
connector_id=connector_id,
)
if not connector or not len(connector.credentials):
logger.debug("Found no credentials left for connector, deleting connector")
db_session.delete(connector)
db_session.commit()
_cleanup()
logger.info(
"Successfully deleted connector_credential_pair with connector_id:"
f" '{connector_id}' and credential_id: '{credential_id}'. Deleted {num_docs_deleted} docs."
)
return num_docs_deleted
def _run_deletion(db_session: Session) -> None:
# NOTE: makes the assumption that there is only one deletion job running at a time
deletion_attempts = get_deletion_attempts(
db_session, statuses=[DeletionStatus.NOT_STARTED], limit=1
)
if not deletion_attempts:
logger.info("No deletion attempts to run")
return
deletion_attempt = deletion_attempts[0]
# validate that the connector / credential pair is deletable
cc_pair = get_connector_credential_pair(
db_session=db_session,
connector_id=deletion_attempt.connector_id,
credential_id=deletion_attempt.credential_id,
)
if not cc_pair or not check_deletion_attempt_is_allowed(
connector_credential_pair=cc_pair
):
error_msg = (
"Cannot run deletion attempt - connector_credential_pair is not deletable. "
"This is likely because there is an ongoing / planned indexing attempt OR the "
"connector is not disabled."
)
logger.error(error_msg)
deletion_attempt.status = DeletionStatus.FAILED
deletion_attempt.error_msg = error_msg
db_session.commit()
return
# kick off the actual deletion process
deletion_attempt.status = DeletionStatus.IN_PROGRESS
db_session.commit()
try:
num_docs_deleted = _delete_connector_credential_pair(
db_session=db_session,
vector_index=QdrantIndex(),
keyword_index=TypesenseIndex(),
deletion_attempt=deletion_attempt,
)
except Exception as e:
logger.exception(f"Failed to delete connector_credential_pair due to {e}")
deletion_attempt.status = DeletionStatus.FAILED
deletion_attempt.error_msg = str(e)
db_session.commit()
return
deletion_attempt.status = DeletionStatus.SUCCESS
deletion_attempt.num_docs_deleted = num_docs_deleted
db_session.commit()
def _cleanup_deletion_jobs(db_session: Session) -> None:
"""Cleanup any deletion jobs that were in progress but failed to complete
NOTE: makes the assumption that there is only one deletion job running at a time.
If multiple deletion jobs can be run at once, then this behavior no longer makes
sense."""
deletion_attempts = get_deletion_attempts(
db_session,
statuses=[DeletionStatus.IN_PROGRESS],
)
for deletion_attempt in deletion_attempts:
deletion_attempt.status = DeletionStatus.FAILED
db_session.commit()
def _update_loop(delay: int = 10) -> None:
engine = get_sqlalchemy_engine()
while True:
start = time.time()
start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"Running connector_deletion, current UTC time: {start_time_utc}")
try:
with Session(engine) as db_session:
_run_deletion(db_session)
_cleanup_deletion_jobs(db_session)
except Exception as e:
logger.exception(f"Failed to run connector_deletion due to {e}")
sleep_time = delay - (time.time() - start)
if sleep_time > 0:
time.sleep(sleep_time)
if __name__ == "__main__":
_update_loop()

View File

@ -7,6 +7,7 @@ from sqlalchemy.orm import Session
from danswer.connectors.factory import instantiate_connector
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.models import IndexAttemptMetadata
from danswer.connectors.models import InputType
from danswer.datastores.indexing_pipeline import build_indexing_pipeline
from danswer.db.connector import disable_connector
@ -188,7 +189,12 @@ def run_indexing_jobs(db_session: Session) -> None:
None if db_credential.public_doc else db_credential.user_id
)
new_docs, total_batch_chunks = indexing_pipeline(
documents=doc_batch, user_id=index_user_id
documents=doc_batch,
index_attempt_metadata=IndexAttemptMetadata(
user_id=index_user_id,
connector_id=db_connector.id,
credential_id=db_credential.id,
),
)
net_doc_change += new_docs
chunk_count += total_batch_chunks

View File

@ -166,7 +166,7 @@ def get_all_files_batched(
batch_size=batch_size,
)
yield from batch_generator(
generator=valid_files,
items=valid_files,
batch_size=batch_size,
pre_batch_yield=lambda batch_files: logger.info(
f"Parseable Documents in batch: {[file['name'] for file in batch_files]}"

View File

@ -1,6 +1,7 @@
from dataclasses import dataclass
from enum import Enum
from typing import Any
from uuid import UUID
from pydantic import BaseModel
@ -36,13 +37,8 @@ class InputType(str, Enum):
EVENT = "event" # e.g. registered an endpoint as a listener, and processing connector events
class ConnectorDescriptor(BaseModel):
source: DocumentSource
# how the raw data being indexed is procured
input_type: InputType
# what is passed into the __init__ of the connector described by `source`
# and `input_type`
connector_specific_config: dict[str, Any]
class Config:
arbitrary_types_allowed = True
@dataclass
class IndexAttemptMetadata:
user_id: UUID | None
connector_id: int
credential_id: int

View File

@ -1,6 +1,6 @@
from collections.abc import Callable
from collections.abc import Generator
from collections.abc import Iterator
from collections.abc import Iterable
from itertools import islice
from typing import TypeVar
@ -8,12 +8,13 @@ T = TypeVar("T")
def batch_generator(
generator: Iterator[T],
items: Iterable[T],
batch_size: int,
pre_batch_yield: Callable[[list[T]], None] | None = None,
) -> Generator[list[T], None, None]:
iterable = iter(items)
while True:
batch = list(islice(generator, batch_size))
batch = list(islice(iterable, batch_size))
if not batch:
return

View File

@ -1,12 +1,18 @@
import uuid
from collections.abc import Callable
from copy import deepcopy
from typing import TypeVar
from pydantic import BaseModel
from danswer.chunking.models import EmbeddedIndexChunk
from danswer.chunking.models import IndexChunk
from danswer.chunking.models import InferenceChunk
from danswer.configs.constants import ALLOWED_GROUPS
from danswer.configs.constants import ALLOWED_USERS
from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.connectors.models import Document
from danswer.connectors.models import IndexAttemptMetadata
DEFAULT_BATCH_SIZE = 30
@ -29,44 +35,87 @@ def get_uuid_from_chunk(
return uuid.uuid5(uuid.NAMESPACE_X500, unique_identifier_string)
# Takes the chunk identifier returns whether the chunk exists and the user/group whitelists
WhitelistCallable = Callable[[str], tuple[bool, list[str], list[str]]]
class CrossConnectorDocumentMetadata(BaseModel):
"""Represents metadata about a single document. This is needed since the
`Document` class represents a document from a single connector, but that same
document may be indexed by multiple connectors."""
allowed_users: list[str]
allowed_user_groups: list[str]
already_in_index: bool
def update_doc_user_map(
# Takes the chunk identifier returns the existing metaddata about that chunk
CrossConnectorDocumentMetadataFetchCallable = Callable[
[str], CrossConnectorDocumentMetadata | None
]
T = TypeVar("T")
def _add_if_not_exists(l: list[T], item: T) -> list[T]:
if item not in l:
return l
return l + [item]
def update_cross_connector_document_metadata_map(
chunk: IndexChunk | EmbeddedIndexChunk,
doc_whitelist_map: dict[str, dict[str, list[str]]],
doc_store_whitelist_fnc: WhitelistCallable,
user_str: str,
) -> tuple[dict[str, dict[str, list[str]]], bool]:
"""Returns an updated document id to whitelists mapping and if the document's chunks need to be wiped."""
doc_whitelist_map = deepcopy(doc_whitelist_map)
cross_connector_document_metadata_map: dict[str, CrossConnectorDocumentMetadata],
doc_store_cross_connector_document_metadata_fetch_fn: CrossConnectorDocumentMetadataFetchCallable,
index_attempt_metadata: IndexAttemptMetadata,
) -> tuple[dict[str, CrossConnectorDocumentMetadata], bool]:
"""Returns an updated document_id -> CrossConnectorDocumentMetadata map and
if the document's chunks need to be wiped."""
user_str = (
PUBLIC_DOC_PAT
if index_attempt_metadata.user_id is None
else str(index_attempt_metadata.user_id)
)
cross_connector_document_metadata_map = deepcopy(
cross_connector_document_metadata_map
)
first_chunk_uuid = str(get_uuid_from_chunk(chunk))
document = chunk.source_document
if document.id not in doc_whitelist_map:
first_chunk_found, whitelist_users, whitelist_groups = doc_store_whitelist_fnc(
first_chunk_uuid
if document.id not in cross_connector_document_metadata_map:
document_metadata_in_doc_store = (
doc_store_cross_connector_document_metadata_fetch_fn(first_chunk_uuid)
)
if not first_chunk_found:
doc_whitelist_map[document.id] = {
ALLOWED_USERS: [user_str],
# TODO introduce groups logic here
ALLOWED_GROUPS: whitelist_groups,
}
if not document_metadata_in_doc_store:
cross_connector_document_metadata_map[
document.id
] = CrossConnectorDocumentMetadata(
allowed_users=[user_str],
allowed_user_groups=[],
already_in_index=False,
)
# First chunk does not exist so document does not exist, no need for deletion
return doc_whitelist_map, False
return cross_connector_document_metadata_map, False
else:
if user_str not in whitelist_users:
whitelist_users.append(user_str)
# TODO introduce groups logic here
doc_whitelist_map[document.id] = {
ALLOWED_USERS: whitelist_users,
ALLOWED_GROUPS: whitelist_groups,
}
cross_connector_document_metadata_map[
document.id
] = CrossConnectorDocumentMetadata(
allowed_users=_add_if_not_exists(
document_metadata_in_doc_store.allowed_users, user_str
),
allowed_user_groups=document_metadata_in_doc_store.allowed_user_groups,
already_in_index=True,
)
# First chunk exists, but with update, there may be less total chunks now
# Must delete rest of document chunks
return doc_whitelist_map, True
return cross_connector_document_metadata_map, True
existing_document_metadata = cross_connector_document_metadata_map[document.id]
cross_connector_document_metadata_map[document.id] = CrossConnectorDocumentMetadata(
allowed_users=_add_if_not_exists(
existing_document_metadata.allowed_users, user_str
),
allowed_user_groups=existing_document_metadata.allowed_user_groups,
already_in_index=existing_document_metadata.already_in_index,
)
# If document is already in the mapping, don't delete again
return doc_whitelist_map, False
return cross_connector_document_metadata_map, False

View File

@ -1,15 +1,22 @@
from functools import partial
from itertools import chain
from typing import Protocol
from uuid import UUID
from sqlalchemy.orm import Session
from danswer.chunking.chunk import Chunker
from danswer.chunking.chunk import DefaultChunker
from danswer.connectors.models import Document
from danswer.connectors.models import IndexAttemptMetadata
from danswer.datastores.interfaces import ChunkInsertionRecord
from danswer.datastores.interfaces import ChunkMetadata
from danswer.datastores.interfaces import KeywordIndex
from danswer.datastores.interfaces import StoreType
from danswer.datastores.interfaces import VectorIndex
from danswer.datastores.qdrant.store import QdrantIndex
from danswer.datastores.typesense.store import TypesenseIndex
from danswer.db.document import upsert_documents_complete
from danswer.db.engine import get_sqlalchemy_engine
from danswer.search.models import Embedder
from danswer.search.semantic_search import DefaultEmbedder
from danswer.utils.logger import setup_logger
@ -19,11 +26,44 @@ logger = setup_logger()
class IndexingPipelineProtocol(Protocol):
def __call__(
self, documents: list[Document], user_id: UUID | None
self, documents: list[Document], index_attempt_metadata: IndexAttemptMetadata
) -> tuple[int, int]:
...
def _upsert_insertion_records(
insertion_records: list[ChunkInsertionRecord],
index_attempt_metadata: IndexAttemptMetadata,
document_store_type: StoreType,
) -> None:
with Session(get_sqlalchemy_engine()) as session:
upsert_documents_complete(
db_session=session,
document_metadata_batch=[
ChunkMetadata(
connector_id=index_attempt_metadata.connector_id,
credential_id=index_attempt_metadata.credential_id,
document_id=insertion_record.document_id,
store_id=insertion_record.store_id,
document_store_type=document_store_type,
)
for insertion_record in insertion_records
],
)
def _get_net_new_documents(
insertion_records: list[ChunkInsertionRecord],
) -> int:
net_new_documents = 0
seen_documents: set[str] = set()
for insertion_record in insertion_records:
if insertion_record.document_id not in seen_documents:
net_new_documents += 1
seen_documents.add(insertion_record.document_id)
return net_new_documents
def _indexing_pipeline(
*,
chunker: Chunker,
@ -31,18 +71,37 @@ def _indexing_pipeline(
vector_index: VectorIndex,
keyword_index: KeywordIndex,
documents: list[Document],
user_id: UUID | None,
index_attempt_metadata: IndexAttemptMetadata,
) -> tuple[int, int]:
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
Note that the documents should already be batched at this point so that it does not inflate the
memory requirements"""
# TODO: make entire indexing pipeline async to not block the entire process
# when running on async endpoints
chunks = list(chain(*[chunker.chunk(document) for document in documents]))
chunks = list(chain(*[chunker.chunk(document=document) for document in documents]))
# TODO keyword indexing can occur at same time as embedding
net_doc_count_keyword = keyword_index.index(chunks, user_id)
chunks_with_embeddings = embedder.embed(chunks)
net_doc_count_vector = vector_index.index(chunks_with_embeddings, user_id)
keyword_store_insertion_records = keyword_index.index(
chunks=chunks, index_attempt_metadata=index_attempt_metadata
)
_upsert_insertion_records(
insertion_records=keyword_store_insertion_records,
index_attempt_metadata=index_attempt_metadata,
document_store_type=StoreType.KEYWORD,
)
net_doc_count_keyword = _get_net_new_documents(
insertion_records=keyword_store_insertion_records
)
chunks_with_embeddings = embedder.embed(chunks=chunks)
vector_store_insertion_records = vector_index.index(
chunks=chunks_with_embeddings, index_attempt_metadata=index_attempt_metadata
)
_upsert_insertion_records(
insertion_records=vector_store_insertion_records,
index_attempt_metadata=index_attempt_metadata,
document_store_type=StoreType.VECTOR,
)
net_doc_count_vector = _get_net_new_documents(
insertion_records=vector_store_insertion_records
)
if net_doc_count_vector != net_doc_count_keyword:
logger.warning("Document count change from keyword/vector indices don't align")
net_new_docs = max(net_doc_count_keyword, net_doc_count_vector)

View File

@ -1,4 +1,7 @@
import abc
from dataclasses import dataclass
from enum import Enum
from typing import Any
from typing import Generic
from typing import TypeVar
from uuid import UUID
@ -7,20 +10,65 @@ from danswer.chunking.models import BaseChunk
from danswer.chunking.models import EmbeddedIndexChunk
from danswer.chunking.models import IndexChunk
from danswer.chunking.models import InferenceChunk
from danswer.connectors.models import IndexAttemptMetadata
T = TypeVar("T", bound=BaseChunk)
IndexFilter = dict[str, str | list[str] | None]
class DocumentIndex(Generic[T], abc.ABC):
class StoreType(str, Enum):
VECTOR = "vector"
KEYWORD = "keyword"
@dataclass
class ChunkInsertionRecord:
document_id: str
store_id: str
already_existed: bool
@dataclass
class ChunkMetadata:
connector_id: int
credential_id: int
document_id: str
store_id: str
document_store_type: StoreType
@dataclass
class UpdateRequest:
ids: list[str]
# all other fields will be left alone
allowed_users: list[str]
class Indexable(Generic[T], abc.ABC):
@abc.abstractmethod
def index(self, chunks: list[T], user_id: UUID | None) -> int:
"""Indexes document chunks into the Document Index and return the number of new documents"""
def index(
self, chunks: list[T], index_attempt_metadata: IndexAttemptMetadata
) -> list[ChunkInsertionRecord]:
"""Indexes document chunks into the Document Index and return the IDs of all the documents indexed"""
raise NotImplementedError
class VectorIndex(DocumentIndex[EmbeddedIndexChunk], abc.ABC):
class Deletable(abc.ABC):
@abc.abstractmethod
def delete(self, ids: list[str]) -> None:
"""Removes the specified documents from the Index"""
raise NotImplementedError
class Updatable(abc.ABC):
@abc.abstractmethod
def update(self, update_requests: list[UpdateRequest]) -> None:
"""Updates metadata for the specified documents in the Index"""
raise NotImplementedError
class VectorIndex(Indexable[EmbeddedIndexChunk], Deletable, Updatable, abc.ABC):
@abc.abstractmethod
def semantic_retrieval(
self,
@ -32,7 +80,7 @@ class VectorIndex(DocumentIndex[EmbeddedIndexChunk], abc.ABC):
raise NotImplementedError
class KeywordIndex(DocumentIndex[IndexChunk], abc.ABC):
class KeywordIndex(Indexable[IndexChunk], Deletable, Updatable, abc.ABC):
@abc.abstractmethod
def keyword_search(
self,

View File

@ -19,15 +19,20 @@ from danswer.configs.constants import CHUNK_ID
from danswer.configs.constants import CONTENT
from danswer.configs.constants import DOCUMENT_ID
from danswer.configs.constants import METADATA
from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.configs.constants import SECTION_CONTINUATION
from danswer.configs.constants import SEMANTIC_IDENTIFIER
from danswer.configs.constants import SOURCE_LINKS
from danswer.configs.constants import SOURCE_TYPE
from danswer.configs.model_configs import DOC_EMBEDDING_DIM
from danswer.connectors.models import IndexAttemptMetadata
from danswer.datastores.datastore_utils import CrossConnectorDocumentMetadata
from danswer.datastores.datastore_utils import DEFAULT_BATCH_SIZE
from danswer.datastores.datastore_utils import get_uuid_from_chunk
from danswer.datastores.datastore_utils import update_doc_user_map
from danswer.datastores.datastore_utils import (
update_cross_connector_document_metadata_map,
)
from danswer.datastores.interfaces import ChunkInsertionRecord
from danswer.datastores.qdrant.utils import get_payload_from_record
from danswer.utils.clients import get_qdrant_client
from danswer.utils.logger import setup_logger
@ -51,9 +56,9 @@ def create_qdrant_collection(
raise RuntimeError("Could not create Qdrant collection")
def get_qdrant_document_whitelists(
def get_qdrant_document_cross_connector_metadata(
doc_chunk_id: str, collection_name: str, q_client: QdrantClient
) -> tuple[bool, list[str], list[str]]:
) -> CrossConnectorDocumentMetadata | None:
"""Get whether a document is found and the existing whitelists"""
results = q_client.retrieve(
collection_name=collection_name,
@ -61,13 +66,13 @@ def get_qdrant_document_whitelists(
with_payload=[ALLOWED_USERS, ALLOWED_GROUPS],
)
if len(results) == 0:
return False, [], []
payload = results[0].payload
if not payload:
raise RuntimeError(
"Qdrant Index is corrupted, Document found with no access lists."
)
return True, payload[ALLOWED_USERS], payload[ALLOWED_GROUPS]
return None
payload = get_payload_from_record(results[0])
return CrossConnectorDocumentMetadata(
allowed_users=payload[ALLOWED_USERS],
allowed_user_groups=payload[ALLOWED_GROUPS],
already_in_index=True,
)
def delete_qdrant_doc_chunks(
@ -91,42 +96,53 @@ def delete_qdrant_doc_chunks(
def index_qdrant_chunks(
chunks: list[EmbeddedIndexChunk],
user_id: UUID | None,
index_attempt_metadata: IndexAttemptMetadata,
collection: str,
client: QdrantClient | None = None,
batch_upsert: bool = True,
) -> int:
) -> list[ChunkInsertionRecord]:
# Public documents will have the PUBLIC string in ALLOWED_USERS
# If credential that kicked this off has no user associated, either Auth is off or the doc is public
user_str = PUBLIC_DOC_PAT if user_id is None else str(user_id)
q_client: QdrantClient = client if client else get_qdrant_client()
point_structs: list[PointStruct] = []
insertion_records: list[ChunkInsertionRecord] = []
# Maps document id to dict of whitelists for users/groups each containing list of users/groups as strings
doc_user_map: dict[str, dict[str, list[str]]] = {}
docs_deleted = 0
cross_connector_document_metadata_map: dict[
str, CrossConnectorDocumentMetadata
] = {}
for chunk in chunks:
document = chunk.source_document
doc_user_map, delete_doc = update_doc_user_map(
chunk,
doc_user_map,
partial(
get_qdrant_document_whitelists,
(
cross_connector_document_metadata_map,
should_delete_doc,
) = update_cross_connector_document_metadata_map(
chunk=chunk,
cross_connector_document_metadata_map=cross_connector_document_metadata_map,
doc_store_cross_connector_document_metadata_fetch_fn=partial(
get_qdrant_document_cross_connector_metadata,
collection_name=collection,
q_client=q_client,
),
user_str,
index_attempt_metadata=index_attempt_metadata,
)
if delete_doc:
if should_delete_doc:
# Processing the first chunk of the doc and the doc exists
docs_deleted += 1
delete_qdrant_doc_chunks(document.id, collection, q_client)
point_structs.extend(
[
for minichunk_ind, embedding in enumerate(chunk.embeddings):
qdrant_id = str(get_uuid_from_chunk(chunk, minichunk_ind))
insertion_records.append(
ChunkInsertionRecord(
document_id=document.id,
store_id=qdrant_id,
already_existed=should_delete_doc,
)
)
point_structs.append(
PointStruct(
id=str(get_uuid_from_chunk(chunk, minichunk_ind)),
id=qdrant_id,
payload={
DOCUMENT_ID: document.id,
CHUNK_ID: chunk.chunk_id,
@ -136,15 +152,17 @@ def index_qdrant_chunks(
SOURCE_LINKS: chunk.source_links,
SEMANTIC_IDENTIFIER: document.semantic_identifier,
SECTION_CONTINUATION: chunk.section_continuation,
ALLOWED_USERS: doc_user_map[document.id][ALLOWED_USERS],
ALLOWED_GROUPS: doc_user_map[document.id][ALLOWED_GROUPS],
ALLOWED_USERS: cross_connector_document_metadata_map[
document.id
].allowed_users,
ALLOWED_GROUPS: cross_connector_document_metadata_map[
document.id
].allowed_user_groups,
METADATA: json.dumps(document.metadata),
},
vector=embedding,
)
for minichunk_ind, embedding in enumerate(chunk.embeddings)
]
)
)
if batch_upsert:
point_struct_batches = [
@ -179,4 +197,4 @@ def index_qdrant_chunks(
f"Document batch of size {len(point_structs)} indexing status: {index_results.status}"
)
return len(doc_user_map.keys()) - docs_deleted
return insertion_records

View File

@ -1,3 +1,4 @@
from typing import Any
from uuid import UUID
from qdrant_client.http.exceptions import ResponseHandlingException
@ -14,8 +15,12 @@ from danswer.configs.app_configs import QDRANT_DEFAULT_COLLECTION
from danswer.configs.constants import ALLOWED_USERS
from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.configs.model_configs import SEARCH_DISTANCE_CUTOFF
from danswer.connectors.models import IndexAttemptMetadata
from danswer.connectors.utils import batch_generator
from danswer.datastores.datastore_utils import get_uuid_from_chunk
from danswer.datastores.interfaces import ChunkInsertionRecord
from danswer.datastores.interfaces import IndexFilter
from danswer.datastores.interfaces import UpdateRequest
from danswer.datastores.interfaces import VectorIndex
from danswer.datastores.qdrant.indexing import index_qdrant_chunks
from danswer.search.search_utils import get_default_embedding_model
@ -25,6 +30,9 @@ from danswer.utils.timing import log_function_time
logger = setup_logger()
# how many points we want to delete/update at a time
_BATCH_SIZE = 200
def _build_qdrant_filters(
user_id: UUID | None, filters: list[IndexFilter] | None
@ -78,10 +86,14 @@ class QdrantIndex(VectorIndex):
self.collection = collection
self.client = get_qdrant_client()
def index(self, chunks: list[EmbeddedIndexChunk], user_id: UUID | None) -> int:
def index(
self,
chunks: list[EmbeddedIndexChunk],
index_attempt_metadata: IndexAttemptMetadata,
) -> list[ChunkInsertionRecord]:
return index_qdrant_chunks(
chunks=chunks,
user_id=user_id,
index_attempt_metadata=index_attempt_metadata,
collection=self.collection,
client=self.client,
)
@ -145,6 +157,29 @@ class QdrantIndex(VectorIndex):
return found_inference_chunks
def delete(self, ids: list[str]) -> None:
logger.info(f"Deleting {len(ids)} documents from Qdrant")
for id_batch in batch_generator(items=ids, batch_size=_BATCH_SIZE):
self.client.delete(
collection_name=self.collection,
points_selector=id_batch,
)
def update(self, update_requests: list[UpdateRequest]) -> None:
logger.info(
f"Updating {len(update_requests)} documents' allowed_users in Qdrant"
)
for update_request in update_requests:
for id_batch in batch_generator(
items=update_request.ids,
batch_size=_BATCH_SIZE,
):
self.client.set_payload(
collection_name=self.collection,
payload={ALLOWED_USERS: update_request.allowed_users},
points=id_batch,
)
def get_from_id(self, object_id: str) -> InferenceChunk | None:
matches, _ = self.client.scroll(
collection_name=self.collection,

View File

@ -0,0 +1,12 @@
from typing import Any
from qdrant_client.http.models import Record
def get_payload_from_record(record: Record, is_required: bool = True) -> dict[str, Any]:
if record.payload is None and is_required:
raise RuntimeError(
"Qdrant Index is corrupted, Document found with no metadata."
)
return record.payload or {}

View File

@ -1,6 +1,7 @@
import json
from functools import partial
from typing import Any
from typing import cast
from uuid import UUID
import typesense # type: ignore
@ -22,17 +23,27 @@ from danswer.configs.constants import SECTION_CONTINUATION
from danswer.configs.constants import SEMANTIC_IDENTIFIER
from danswer.configs.constants import SOURCE_LINKS
from danswer.configs.constants import SOURCE_TYPE
from danswer.connectors.models import IndexAttemptMetadata
from danswer.connectors.utils import batch_generator
from danswer.datastores.datastore_utils import CrossConnectorDocumentMetadata
from danswer.datastores.datastore_utils import DEFAULT_BATCH_SIZE
from danswer.datastores.datastore_utils import get_uuid_from_chunk
from danswer.datastores.datastore_utils import update_doc_user_map
from danswer.datastores.datastore_utils import (
update_cross_connector_document_metadata_map,
)
from danswer.datastores.interfaces import ChunkInsertionRecord
from danswer.datastores.interfaces import IndexFilter
from danswer.datastores.interfaces import KeywordIndex
from danswer.datastores.interfaces import UpdateRequest
from danswer.utils.clients import get_typesense_client
from danswer.utils.logger import setup_logger
logger = setup_logger()
# how many points we want to delete/update at a time
_BATCH_SIZE = 200
def check_typesense_collection_exist(
collection_name: str = TYPESENSE_DEFAULT_COLLECTION,
@ -70,21 +81,26 @@ def create_typesense_collection(
ts_client.collections.create(collection_schema)
def get_typesense_document_whitelists(
def get_typesense_document_cross_connector_metadata(
doc_chunk_id: str, collection_name: str, ts_client: typesense.Client
) -> tuple[bool, list[str], list[str]]:
) -> CrossConnectorDocumentMetadata | None:
"""Returns whether the document already exists and the users/group whitelists"""
try:
document = (
ts_client.collections[collection_name].documents[doc_chunk_id].retrieve()
document = cast(
dict[str, Any],
ts_client.collections[collection_name].documents[doc_chunk_id].retrieve(),
)
except ObjectNotFound:
return False, [], []
return None
if document[ALLOWED_USERS] is None or document[ALLOWED_GROUPS] is None:
raise RuntimeError(
"Typesense Index is corrupted, Document found with no access lists."
)
return True, document[ALLOWED_USERS], document[ALLOWED_GROUPS]
return CrossConnectorDocumentMetadata(
allowed_users=document[ALLOWED_USERS],
allowed_user_groups=document[ALLOWED_GROUPS],
already_in_index=True,
)
def delete_typesense_doc_chunks(
@ -100,38 +116,49 @@ def delete_typesense_doc_chunks(
def index_typesense_chunks(
chunks: list[IndexChunk | EmbeddedIndexChunk],
user_id: UUID | None,
index_attempt_metadata: IndexAttemptMetadata,
collection: str,
client: typesense.Client | None = None,
batch_upsert: bool = True,
) -> int:
user_str = PUBLIC_DOC_PAT if user_id is None else str(user_id)
) -> list[ChunkInsertionRecord]:
ts_client: typesense.Client = client if client else get_typesense_client()
insertion_records: list[ChunkInsertionRecord] = []
new_documents: list[dict[str, Any]] = []
doc_user_map: dict[str, dict[str, list[str]]] = {}
docs_deleted = 0
cross_connector_document_metadata_map: dict[
str, CrossConnectorDocumentMetadata
] = {}
for chunk in chunks:
document = chunk.source_document
doc_user_map, delete_doc = update_doc_user_map(
chunk,
doc_user_map,
partial(
get_typesense_document_whitelists,
(
cross_connector_document_metadata_map,
should_delete_doc,
) = update_cross_connector_document_metadata_map(
chunk=chunk,
cross_connector_document_metadata_map=cross_connector_document_metadata_map,
doc_store_cross_connector_document_metadata_fetch_fn=partial(
get_typesense_document_cross_connector_metadata,
collection_name=collection,
ts_client=ts_client,
),
user_str,
index_attempt_metadata=index_attempt_metadata,
)
if delete_doc:
if should_delete_doc:
# Processing the first chunk of the doc and the doc exists
docs_deleted += 1
delete_typesense_doc_chunks(document.id, collection, ts_client)
typesense_id = str(get_uuid_from_chunk(chunk))
insertion_records.append(
ChunkInsertionRecord(
document_id=document.id,
store_id=typesense_id,
already_existed=should_delete_doc,
)
)
new_documents.append(
{
"id": str(get_uuid_from_chunk(chunk)), # No minichunks for typesense
"id": typesense_id, # No minichunks for typesense
DOCUMENT_ID: document.id,
CHUNK_ID: chunk.chunk_id,
BLURB: chunk.blurb,
@ -140,8 +167,12 @@ def index_typesense_chunks(
SOURCE_LINKS: json.dumps(chunk.source_links),
SEMANTIC_IDENTIFIER: document.semantic_identifier,
SECTION_CONTINUATION: chunk.section_continuation,
ALLOWED_USERS: doc_user_map[document.id][ALLOWED_USERS],
ALLOWED_GROUPS: doc_user_map[document.id][ALLOWED_GROUPS],
ALLOWED_USERS: cross_connector_document_metadata_map[
document.id
].allowed_users,
ALLOWED_GROUPS: cross_connector_document_metadata_map[
document.id
].allowed_user_groups,
METADATA: json.dumps(document.metadata),
}
)
@ -170,7 +201,7 @@ def index_typesense_chunks(
for document in new_documents
]
return len(doc_user_map.keys()) - docs_deleted
return insertion_records
def _build_typesense_filters(
@ -208,14 +239,39 @@ class TypesenseIndex(KeywordIndex):
self.collection = collection
self.ts_client = get_typesense_client()
def index(self, chunks: list[IndexChunk], user_id: UUID | None) -> int:
def index(
self, chunks: list[IndexChunk], index_attempt_metadata: IndexAttemptMetadata
) -> list[ChunkInsertionRecord]:
return index_typesense_chunks(
chunks=chunks,
user_id=user_id,
index_attempt_metadata=index_attempt_metadata,
collection=self.collection,
client=self.ts_client,
)
def delete(self, ids: list[str]) -> None:
logger.info(f"Deleting {len(ids)} documents from Typesense")
for id_batch in batch_generator(items=ids, batch_size=_BATCH_SIZE):
self.ts_client.collections[self.collection].documents.delete(
{"filter_by": f'id:[{",".join(id_batch)}]'}
)
def update(self, update_requests: list[UpdateRequest]) -> None:
logger.info(
f"Updating {len(update_requests)} documents' allowed_users in Typesense"
)
for update_request in update_requests:
for id_batch in batch_generator(
items=update_request.ids, batch_size=_BATCH_SIZE
):
typesense_updates = [
{"id": doc_id, ALLOWED_USERS: update_request.allowed_users}
for doc_id in id_batch
]
self.ts_client.collections[self.collection].documents.import_(
typesense_updates, {"action": "update"}
)
def keyword_search(
self,
query: str,

View File

@ -128,7 +128,6 @@ def delete_connector(
)
db_session.delete(connector)
db_session.commit()
return StatusResponse(
success=True, message="Connector deleted successfully", data=connector_id
)

View File

@ -1,6 +1,7 @@
from datetime import datetime
from fastapi import HTTPException
from sqlalchemy import delete
from sqlalchemy import select
from sqlalchemy.orm import Session
@ -79,6 +80,18 @@ def update_connector_credential_pair(
db_session.commit()
def delete_connector_credential_pair(
db_session: Session,
connector_id: int,
credential_id: int,
) -> None:
stmt = delete(ConnectorCredentialPair).where(
ConnectorCredentialPair.connector_id == connector_id,
ConnectorCredentialPair.credential_id == credential_id,
)
db_session.execute(stmt)
def add_credential_to_connector(
connector_id: int,
credential_id: int,
@ -115,7 +128,6 @@ def add_credential_to_connector(
association = ConnectorCredentialPair(
connector_id=connector_id,
credential_id=credential_id,
last_attempt_status=IndexingStatus.NOT_STARTED,
)
db_session.add(association)
db_session.commit()

View File

@ -15,22 +15,6 @@ from danswer.utils.logger import setup_logger
logger = setup_logger()
def mask_string(sensitive_str: str) -> str:
return "****...**" + sensitive_str[-4:]
def mask_credential_dict(credential_dict: dict[str, Any]) -> dict[str, str]:
masked_creds = {}
for key, val in credential_dict.items():
if not isinstance(val, str):
raise ValueError(
"Unable to mask credentials of type other than string, cannot process request."
)
masked_creds[key] = mask_string(val)
return masked_creds
def fetch_credentials(
user: User | None,
db_session: Session,

View File

@ -0,0 +1,85 @@
from sqlalchemy import and_
from sqlalchemy import delete
from sqlalchemy import desc
from sqlalchemy import select
from sqlalchemy.orm import Session
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.db.models import IndexingStatus
def check_deletion_attempt_is_allowed(
connector_credential_pair: ConnectorCredentialPair,
) -> bool:
"""
To be deletable:
(1) connector should be disabled
(2) there should be no in-progress/planned index attempts
"""
return bool(
connector_credential_pair.connector.disabled
and (
connector_credential_pair.last_attempt_status != IndexingStatus.IN_PROGRESS
and connector_credential_pair.last_attempt_status
!= IndexingStatus.NOT_STARTED
)
)
def create_deletion_attempt(
connector_id: int,
credential_id: int,
db_session: Session,
) -> int:
new_attempt = DeletionAttempt(
connector_id=connector_id,
credential_id=credential_id,
status=DeletionStatus.NOT_STARTED,
)
db_session.add(new_attempt)
db_session.commit()
return new_attempt.id
def get_not_started_index_attempts(db_session: Session) -> list[DeletionAttempt]:
stmt = select(DeletionAttempt).where(
DeletionAttempt.status == DeletionStatus.NOT_STARTED
)
not_started_deletion_attempts = db_session.scalars(stmt)
return list(not_started_deletion_attempts.all())
def get_deletion_attempts(
db_session: Session,
connector_ids: list[int] | None = None,
statuses: list[DeletionStatus] | None = None,
ordered_by_time_updated: bool = False,
limit: int | None = None,
) -> list[DeletionAttempt]:
stmt = select(DeletionAttempt)
if connector_ids:
stmt = stmt.where(DeletionAttempt.connector_id.in_(connector_ids))
if statuses:
stmt = stmt.where(DeletionAttempt.status.in_(statuses))
if ordered_by_time_updated:
stmt = stmt.order_by(desc(DeletionAttempt.time_updated))
if limit:
stmt = stmt.limit(limit)
deletion_attempts = db_session.scalars(stmt)
return list(deletion_attempts.all())
def delete_deletion_attempts(
db_session: Session, connector_id: int, credential_id: int
) -> None:
stmt = delete(DeletionAttempt).where(
and_(
DeletionAttempt.connector_id == connector_id,
DeletionAttempt.credential_id == credential_id,
)
)
db_session.execute(stmt)

View File

@ -0,0 +1,185 @@
from collections.abc import Sequence
from dataclasses import dataclass
from sqlalchemy import and_
from sqlalchemy import delete
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from danswer.datastores.interfaces import ChunkMetadata
from danswer.db.models import Chunk
from danswer.db.models import Document
from danswer.db.models import DocumentByConnectorCredentialPair
from danswer.db.utils import model_to_dict
from danswer.utils.logger import setup_logger
logger = setup_logger()
def get_chunks_with_single_connector_credential_pair(
db_session: Session,
connector_id: int,
credential_id: int,
) -> Sequence[Chunk]:
initial_doc_ids_stmt = select(DocumentByConnectorCredentialPair.id).where(
and_(
DocumentByConnectorCredentialPair.connector_id == connector_id,
DocumentByConnectorCredentialPair.credential_id == credential_id,
)
)
trimmed_doc_ids_stmt = (
select(Document.id)
.join(
DocumentByConnectorCredentialPair,
DocumentByConnectorCredentialPair.id == Document.id,
)
.where(Document.id.in_(initial_doc_ids_stmt))
.group_by(Document.id)
.having(func.count(DocumentByConnectorCredentialPair.id) == 1)
)
stmt = select(Chunk).where(Chunk.document_id.in_(trimmed_doc_ids_stmt))
return db_session.scalars(stmt).all()
def get_document_by_connector_credential_pairs_indexed_by_multiple(
db_session: Session,
connector_id: int,
credential_id: int,
) -> Sequence[DocumentByConnectorCredentialPair]:
initial_doc_ids_stmt = select(DocumentByConnectorCredentialPair.id).where(
and_(
DocumentByConnectorCredentialPair.connector_id == connector_id,
DocumentByConnectorCredentialPair.credential_id == credential_id,
)
)
trimmed_doc_ids_stmt = (
select(Document.id)
.join(
DocumentByConnectorCredentialPair,
DocumentByConnectorCredentialPair.id == Document.id,
)
.where(Document.id.in_(initial_doc_ids_stmt))
.group_by(Document.id)
.having(func.count(DocumentByConnectorCredentialPair.id) > 1)
)
stmt = select(DocumentByConnectorCredentialPair).where(
DocumentByConnectorCredentialPair.id.in_(trimmed_doc_ids_stmt)
)
return db_session.execute(stmt).scalars().all()
def get_chunk_ids_for_document_ids(
db_session: Session, document_ids: list[str]
) -> Sequence[str]:
stmt = select(Chunk.id).where(Chunk.document_id.in_(document_ids))
return db_session.execute(stmt).scalars().all()
def upsert_documents(
db_session: Session, document_metadata_batch: list[ChunkMetadata]
) -> None:
"""NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause."""
seen_document_ids: set[str] = set()
for document_metadata in document_metadata_batch:
if document_metadata.document_id not in seen_document_ids:
seen_document_ids.add(document_metadata.document_id)
insert_stmt = insert(Document).values(
[model_to_dict(Document(id=doc_id)) for doc_id in seen_document_ids]
)
# for now, there are no columns to update. If more metadata is added, then this
# needs to change to an `on_conflict_do_update`
on_conflict_stmt = insert_stmt.on_conflict_do_nothing()
db_session.execute(on_conflict_stmt)
db_session.commit()
def upsert_document_by_connector_credential_pair(
db_session: Session, document_metadata_batch: list[ChunkMetadata]
) -> None:
"""NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause."""
insert_stmt = insert(DocumentByConnectorCredentialPair).values(
[
model_to_dict(
DocumentByConnectorCredentialPair(
id=document_metadata.document_id,
connector_id=document_metadata.connector_id,
credential_id=document_metadata.credential_id,
)
)
for document_metadata in document_metadata_batch
]
)
# for now, there are no columns to update. If more metadata is added, then this
# needs to change to an `on_conflict_do_update`
on_conflict_stmt = insert_stmt.on_conflict_do_nothing()
db_session.execute(on_conflict_stmt)
db_session.commit()
def upsert_chunks(
db_session: Session, document_metadata_batch: list[ChunkMetadata]
) -> None:
"""NOTE: this function is Postgres specific. Not all DBs support the ON CONFLICT clause."""
insert_stmt = insert(Chunk).values(
[
model_to_dict(
Chunk(
id=document_metadata.store_id,
document_id=document_metadata.document_id,
document_store_type=document_metadata.document_store_type,
)
)
for document_metadata in document_metadata_batch
]
)
on_conflict_stmt = insert_stmt.on_conflict_do_update(
index_elements=["id", "document_store_type"],
set_=dict(document_id=insert_stmt.excluded.document_id),
)
db_session.execute(on_conflict_stmt)
db_session.commit()
def upsert_documents_complete(
db_session: Session, document_metadata_batch: list[ChunkMetadata]
) -> None:
upsert_documents(db_session, document_metadata_batch)
upsert_document_by_connector_credential_pair(db_session, document_metadata_batch)
upsert_chunks(db_session, document_metadata_batch)
logger.info(
f"Upserted {len(document_metadata_batch)} document store entries into DB"
)
def delete_document_store_entries(db_session: Session, document_ids: list[str]) -> None:
db_session.execute(delete(Chunk).where(Chunk.document_id.in_(document_ids)))
def delete_document_by_connector_credential_pair(
db_session: Session, document_ids: list[str]
) -> None:
db_session.execute(
delete(DocumentByConnectorCredentialPair).where(
DocumentByConnectorCredentialPair.id.in_(document_ids)
)
)
def delete_documents(db_session: Session, document_ids: list[str]) -> None:
db_session.execute(delete(Document).where(Document.id.in_(document_ids)))
def delete_documents_complete(db_session: Session, document_ids: list[str]) -> None:
logger.info(f"Deleting {len(document_ids)} documents from the DB")
delete_document_store_entries(db_session, document_ids)
delete_document_by_connector_credential_pair(db_session, document_ids)
delete_documents(db_session, document_ids)
db_session.commit()

View File

@ -1,3 +1,4 @@
from sqlalchemy import delete
from sqlalchemy import desc
from sqlalchemy import select
from sqlalchemy.orm import Session
@ -86,3 +87,15 @@ def get_last_successful_attempt(
stmt = stmt.order_by(desc(IndexAttempt.time_created))
return db_session.execute(stmt).scalars().first()
def delete_index_attempts(
connector_id: int,
credential_id: int,
db_session: Session,
) -> None:
stmt = delete(IndexAttempt).where(
IndexAttempt.connector_id == connector_id,
IndexAttempt.credential_id == credential_id,
)
db_session.execute(stmt)

View File

@ -2,6 +2,7 @@ import datetime
from enum import Enum as PyEnum
from typing import Any
from typing import List
from typing import Optional
from uuid import UUID
from fastapi_users.db import SQLAlchemyBaseOAuthAccountTableUUID
@ -24,6 +25,7 @@ from sqlalchemy.orm import relationship
from danswer.auth.schemas import UserRole
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
from danswer.datastores.interfaces import StoreType
class IndexingStatus(str, PyEnum):
@ -33,6 +35,14 @@ class IndexingStatus(str, PyEnum):
FAILED = "failed"
# these may differ in the future, which is why we're okay with this duplication
class DeletionStatus(str, PyEnum):
NOT_STARTED = "not_started"
IN_PROGRESS = "in_progress"
SUCCESS = "success"
FAILED = "failed"
class Base(DeclarativeBase):
pass
@ -75,7 +85,9 @@ class ConnectorCredentialPair(Base):
last_successful_index_time: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True), default=None
)
last_attempt_status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
last_attempt_status: Mapped[IndexingStatus | None] = mapped_column(
Enum(IndexingStatus)
)
total_docs_indexed: Mapped[int] = mapped_column(Integer, default=0)
connector: Mapped["Connector"] = relationship(
@ -112,9 +124,15 @@ class Connector(Base):
back_populates="connector",
cascade="all, delete-orphan",
)
documents_by_connector: Mapped[
List["DocumentByConnectorCredentialPair"]
] = relationship("DocumentByConnectorCredentialPair", back_populates="connector")
index_attempts: Mapped[List["IndexAttempt"]] = relationship(
"IndexAttempt", back_populates="connector"
)
deletion_attempt: Mapped[Optional["DeletionAttempt"]] = relationship(
"DeletionAttempt", back_populates="connector"
)
class Credential(Base):
@ -136,9 +154,15 @@ class Credential(Base):
back_populates="credential",
cascade="all, delete-orphan",
)
documents_by_credential: Mapped[
List["DocumentByConnectorCredentialPair"]
] = relationship("DocumentByConnectorCredentialPair", back_populates="credential")
index_attempts: Mapped[List["IndexAttempt"]] = relationship(
"IndexAttempt", back_populates="credential"
)
deletion_attempt: Mapped[Optional["DeletionAttempt"]] = relationship(
"DeletionAttempt", back_populates="credential"
)
user: Mapped[User] = relationship("User", back_populates="credentials")
@ -190,3 +214,98 @@ class IndexAttempt(Base):
f"time_created={self.time_created!r}, "
f"time_updated={self.time_updated!r}, "
)
class DeletionAttempt(Base):
"""Represents an attempt to delete all documents indexed by a specific
connector / credential pair.
"""
__tablename__ = "deletion_attempt"
id: Mapped[int] = mapped_column(primary_key=True)
connector_id: Mapped[int] = mapped_column(
ForeignKey("connector.id"),
)
credential_id: Mapped[int] = mapped_column(
ForeignKey("credential.id"),
)
status: Mapped[DeletionStatus] = mapped_column(Enum(DeletionStatus))
num_docs_deleted: Mapped[int] = mapped_column(Integer, default=0)
error_msg: Mapped[str | None] = mapped_column(
String(), default=None
) # only filled if status = "failed"
time_created: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
)
time_updated: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
)
connector: Mapped[Connector] = relationship(
"Connector", back_populates="deletion_attempt"
)
credential: Mapped[Credential] = relationship(
"Credential", back_populates="deletion_attempt"
)
class Document(Base):
"""Represents a single documents from a source. This is used to store
document level metadata so we don't need to duplicate it in a bunch of
DocumentByConnectorCredentialPair's/Chunk's for documents
that are split into many chunks and/or indexed by many connector / credential
pairs."""
__tablename__ = "document"
# this should correspond to the ID of the document (as is passed around
# in Danswer)
id: Mapped[str] = mapped_column(String, primary_key=True)
document_store_entries: Mapped["Chunk"] = relationship(
"Chunk", back_populates="document"
)
class DocumentByConnectorCredentialPair(Base):
"""Represents an indexing of a document by a specific connector / credential
pair"""
__tablename__ = "document_by_connector_credential_pair"
id: Mapped[str] = mapped_column(ForeignKey("document.id"), primary_key=True)
connector_id: Mapped[int] = mapped_column(
ForeignKey("connector.id"), primary_key=True
)
credential_id: Mapped[int] = mapped_column(
ForeignKey("credential.id"), primary_key=True
)
connector: Mapped[Connector] = relationship(
"Connector", back_populates="documents_by_connector"
)
credential: Mapped[Credential] = relationship(
"Credential", back_populates="documents_by_credential"
)
class Chunk(Base):
"""A row represents a single entry in a document store (e.g. a single chunk
in Qdrant/Typesense)"""
__tablename__ = "chunk"
# this should correspond to the ID in the document store
id: Mapped[str] = mapped_column(String, primary_key=True)
document_store_type: Mapped[StoreType] = mapped_column(
Enum(StoreType), primary_key=True
)
document_id: Mapped[str] = mapped_column(ForeignKey("document.id"))
document: Mapped[Document] = relationship(
"Document", back_populates="document_store_entries"
)

View File

@ -0,0 +1,9 @@
from typing import Any
from sqlalchemy import inspect
from danswer.db.models import Base
def model_to_dict(model: Base) -> dict[str, Any]:
return {c.key: getattr(model, c.key) for c in inspect(model).mapper.column_attrs} # type: ignore

View File

@ -9,6 +9,7 @@ from fastapi import Request
from fastapi import Response
from fastapi import UploadFile
from fastapi_users.db import SQLAlchemyUserDatabase
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Session
@ -36,17 +37,23 @@ from danswer.db.connector import fetch_connectors
from danswer.db.connector import get_connector_credential_ids
from danswer.db.connector import update_connector
from danswer.db.connector_credential_pair import add_credential_to_connector
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.connector_credential_pair import get_connector_credential_pairs
from danswer.db.connector_credential_pair import remove_credential_from_connector
from danswer.db.credentials import create_credential
from danswer.db.credentials import delete_credential
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.credentials import fetch_credentials
from danswer.db.credentials import mask_credential_dict
from danswer.db.credentials import update_credential
from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
from danswer.db.deletion_attempt import create_deletion_attempt
from danswer.db.deletion_attempt import get_deletion_attempts
from danswer.db.engine import get_session
from danswer.db.engine import get_sqlalchemy_async_engine
from danswer.db.index_attempt import create_index_attempt
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.db.models import IndexingStatus
from danswer.db.models import User
from danswer.direct_qa import check_model_api_key_is_valid
from danswer.direct_qa import get_default_backend_qa_model
@ -58,10 +65,12 @@ from danswer.server.models import ApiKey
from danswer.server.models import AuthStatus
from danswer.server.models import AuthUrl
from danswer.server.models import ConnectorBase
from danswer.server.models import ConnectorCredentialPairIdentifier
from danswer.server.models import ConnectorIndexingStatus
from danswer.server.models import ConnectorSnapshot
from danswer.server.models import CredentialBase
from danswer.server.models import CredentialSnapshot
from danswer.server.models import DeletionAttemptSnapshot
from danswer.server.models import FileUploadResponse
from danswer.server.models import GDriveCallback
from danswer.server.models import GoogleAppCredentials
@ -179,18 +188,42 @@ def get_connector_indexing_status(
) -> list[ConnectorIndexingStatus]:
indexing_statuses: list[ConnectorIndexingStatus] = []
# TODO: make this one query
cc_pairs = get_connector_credential_pairs(db_session)
deletion_attempts_by_connector: dict[int, list[DeletionAttempt]] = {
cc_pair.connector.id: [] for cc_pair in cc_pairs
}
for deletion_attempt in get_deletion_attempts(
db_session=db_session,
connector_ids=[cc_pair.connector.id for cc_pair in cc_pairs],
ordered_by_time_updated=True,
):
deletion_attempts_by_connector[deletion_attempt.connector_id].append(
deletion_attempt
)
for cc_pair in cc_pairs:
connector = cc_pair.connector
credential = cc_pair.credential
deletion_attemts = deletion_attempts_by_connector.get(connector.id, [])
indexing_statuses.append(
ConnectorIndexingStatus(
connector=ConnectorSnapshot.from_connector_db_model(connector),
credential=CredentialSnapshot.from_credential_db_model(credential),
public_doc=credential.public_doc,
owner=credential.user.email if credential.user else "",
last_status=cc_pair.last_attempt_status,
last_success=cc_pair.last_successful_index_time,
docs_indexed=cc_pair.total_docs_indexed,
deletion_attempts=[
DeletionAttemptSnapshot.from_deletion_attempt_db_model(
deletion_attempt
)
for deletion_attempt in deletion_attemts
],
is_deletable=check_deletion_attempt_is_allowed(
connector_credential_pair=cc_pair
),
)
)
@ -244,7 +277,8 @@ def delete_connector_by_id(
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> StatusResponse[int]:
return delete_connector(connector_id, db_session)
with db_session.begin():
return delete_connector(db_session=db_session, connector_id=connector_id)
@router.post("/admin/connector/run-once")
@ -373,6 +407,62 @@ def delete_genai_api_key(
get_dynamic_config_store().delete(GEN_AI_API_KEY_STORAGE_KEY)
@router.post("/admin/deletion-attempt")
def create_deletion_attempt_for_connector_id(
connector_credential_pair_identifier: ConnectorCredentialPairIdentifier,
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> None:
connector_id = connector_credential_pair_identifier.connector_id
credential_id = connector_credential_pair_identifier.credential_id
cc_pair = get_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
if cc_pair is None:
raise HTTPException(
status_code=404,
detail=f"Connector with ID '{connector_id}' and credential ID "
f"'{credential_id}' does not exist. Has it already been deleted?",
)
if not check_deletion_attempt_is_allowed(connector_credential_pair=cc_pair):
raise HTTPException(
status_code=400,
detail=f"Connector with ID '{connector_id}' and credential ID "
f"'{credential_id}' is not deletable. It must be both disabled AND have"
"no ongoing / planned indexing attempts.",
)
create_deletion_attempt(
connector_id=connector_id,
credential_id=credential_id,
db_session=db_session,
)
@router.get("/admin/deletion-attempt/{connector_id}")
def get_deletion_attempts_for_connector_id(
connector_id: int,
_: User = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> list[DeletionAttemptSnapshot]:
deletion_attempts = get_deletion_attempts(
db_session=db_session, connector_ids=[connector_id]
)
return [
DeletionAttemptSnapshot(
connector_id=connector_id,
status=deletion_attempt.status,
error_msg=deletion_attempt.error_msg,
num_docs_deleted=deletion_attempt.num_docs_deleted,
)
for deletion_attempt in deletion_attempts
]
"""Endpoints for basic users"""
@ -468,16 +558,7 @@ def get_credentials(
) -> list[CredentialSnapshot]:
credentials = fetch_credentials(user, db_session)
return [
CredentialSnapshot(
id=credential.id,
credential_json=mask_credential_dict(credential.credential_json)
if MASK_CREDENTIAL_PREFIX
else credential.credential_json,
user_id=credential.user_id,
public_doc=credential.public_doc,
time_created=credential.time_created,
time_updated=credential.time_updated,
)
CredentialSnapshot.from_credential_db_model(credential)
for credential in credentials
]
@ -495,16 +576,7 @@ def get_credential_by_id(
detail=f"Credential {credential_id} does not exist or does not belong to user",
)
return CredentialSnapshot(
id=credential.id,
credential_json=mask_credential_dict(credential.credential_json)
if MASK_CREDENTIAL_PREFIX
else credential.credential_json,
user_id=credential.user_id,
public_doc=credential.public_doc,
time_created=credential.time_created,
time_updated=credential.time_updated,
)
return CredentialSnapshot.from_credential_db_model(credential)
@router.post("/credential")

View File

@ -9,13 +9,18 @@ from uuid import UUID
from pydantic import BaseModel
from pydantic.generics import GenericModel
from danswer.configs.app_configs import MASK_CREDENTIAL_PREFIX
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
from danswer.datastores.interfaces import IndexFilter
from danswer.db.models import Connector
from danswer.db.models import Credential
from danswer.db.models import DeletionAttempt
from danswer.db.models import DeletionStatus
from danswer.db.models import IndexingStatus
from danswer.search.models import QueryFlow
from danswer.search.models import SearchType
from danswer.server.utils import mask_credential_dict
DataT = TypeVar("DataT")
@ -119,6 +124,24 @@ class IndexAttemptRequest(BaseModel):
connector_specific_config: dict[str, Any]
class DeletionAttemptSnapshot(BaseModel):
connector_id: int
status: DeletionStatus
error_msg: str | None
num_docs_deleted: int
@classmethod
def from_deletion_attempt_db_model(
cls, deletion_attempt: DeletionAttempt
) -> "DeletionAttemptSnapshot":
return DeletionAttemptSnapshot(
connector_id=deletion_attempt.connector_id,
status=deletion_attempt.status,
error_msg=deletion_attempt.error_msg,
num_docs_deleted=deletion_attempt.num_docs_deleted,
)
class ConnectorBase(BaseModel):
name: str
source: DocumentSource
@ -152,17 +175,6 @@ class ConnectorSnapshot(ConnectorBase):
)
class ConnectorIndexingStatus(BaseModel):
"""Represents the latest indexing status of a connector"""
connector: ConnectorSnapshot
owner: str
public_doc: bool
last_status: IndexingStatus
last_success: datetime | None
docs_indexed: int
class RunConnectorRequest(BaseModel):
connector_id: int
credential_ids: list[int] | None
@ -179,6 +191,38 @@ class CredentialSnapshot(CredentialBase):
time_created: datetime
time_updated: datetime
@classmethod
def from_credential_db_model(cls, credential: Credential) -> "CredentialSnapshot":
return CredentialSnapshot(
id=credential.id,
credential_json=mask_credential_dict(credential.credential_json)
if MASK_CREDENTIAL_PREFIX
else credential.credential_json,
user_id=credential.user_id,
public_doc=credential.public_doc,
time_created=credential.time_created,
time_updated=credential.time_updated,
)
class ConnectorIndexingStatus(BaseModel):
"""Represents the latest indexing status of a connector"""
connector: ConnectorSnapshot
credential: CredentialSnapshot
owner: str
public_doc: bool
last_status: IndexingStatus | None
last_success: datetime | None
docs_indexed: int
deletion_attempts: list[DeletionAttemptSnapshot]
is_deletable: bool
class ConnectorCredentialPairIdentifier(BaseModel):
connector_id: int
credential_id: int
class ApiKey(BaseModel):
api_key: str

View File

@ -0,0 +1,17 @@
from typing import Any
def mask_string(sensitive_str: str) -> str:
return "****...**" + sensitive_str[-4:]
def mask_credential_dict(credential_dict: dict[str, Any]) -> dict[str, str]:
masked_creds = {}
for key, val in credential_dict.items():
if not isinstance(val, str):
raise ValueError(
"Unable to mask credentials of type other than string, cannot process request."
)
masked_creds[key] = mask_string(val)
return masked_creds

View File

@ -29,6 +29,9 @@ def wipe_all_rows(database: str) -> None:
table_names = cur.fetchall()
# have to delete from these first to not run into psycopg2.errors.ForeignKeyViolation
cur.execute(f"DELETE FROM chunk")
cur.execute(f"DELETE FROM document_by_connector_credential_pair")
cur.execute(f"DELETE FROM document")
cur.execute(f"DELETE FROM connector_credential_pair")
cur.execute(f"DELETE FROM index_attempt")
conn.commit()

View File

@ -10,6 +10,13 @@ redirect_stderr=true
stdout_logfile_maxbytes=52428800
autorestart=true
[program:connector_deletion]
command=python danswer/background/connector_deletion.py
stdout_logfile=/var/log/connector_deletion.log
redirect_stderr=true
stdout_logfile_maxbytes=52428800
autorestart=true
[program:file_deletion]
command=python danswer/background/file_deletion.py
stdout_logfile=/var/log/file_deletion.log

View File

@ -27,7 +27,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -55,7 +55,10 @@ const Main = () => {
return <div>Failed to load credentials</div>;
}
const bookstackConnectorIndexingStatuses = connectorIndexingStatuses.filter(
const bookstackConnectorIndexingStatuses: ConnectorIndexingStatus<
BookstackConfig,
BookstackCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "bookstack"
);

View File

@ -27,7 +27,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -55,7 +55,10 @@ const Main = () => {
return <div>Failed to load credentials</div>;
}
const confluenceConnectorIndexingStatuses = connectorIndexingStatuses.filter(
const confluenceConnectorIndexingStatuses: ConnectorIndexingStatus<
ConfluenceConfig,
ConfluenceCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "confluence"
);

View File

@ -12,21 +12,16 @@ import { useState } from "react";
import { Button } from "@/components/Button";
import { Popup, PopupSpec } from "@/components/admin/connectors/Popup";
import { createConnector, runConnector } from "@/lib/connector";
import { BasicTable } from "@/components/admin/connectors/BasicTable";
import { CheckCircle, XCircle } from "@phosphor-icons/react";
import { Spinner } from "@/components/Spinner";
const COLUMNS = [
{ header: "File names", key: "fileNames" },
{ header: "Status", key: "status" },
];
import { SingleUseConnectorsTable } from "@/components/admin/connectors/table/SingleUseConnectorsTable";
import { LoadingAnimation } from "@/components/Loading";
const getNameFromPath = (path: string) => {
const pathParts = path.split("/");
return pathParts[pathParts.length - 1];
};
export default function File() {
const Main = () => {
const [selectedFiles, setSelectedFiles] = useState<File[]>([]);
const [filesAreUploading, setFilesAreUploading] = useState<boolean>(false);
const [popup, setPopup] = useState<{
@ -42,42 +37,26 @@ export default function File() {
const { mutate } = useSWRConfig();
const { data: connectorIndexingStatuses } = useSWR<
ConnectorIndexingStatus<any>[]
>("/api/manage/admin/connector/indexing-status", fetcher);
const {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
const fileIndexingStatuses: ConnectorIndexingStatus<FileConfig>[] =
if (!connectorIndexingStatuses && isConnectorIndexingStatusesLoading) {
return <LoadingAnimation text="Loading" />;
}
const fileIndexingStatuses: ConnectorIndexingStatus<FileConfig, {}>[] =
connectorIndexingStatuses?.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "file"
) ?? [];
const inProgressFileIndexingStatuses =
fileIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.last_status === "in_progress" ||
connectorIndexingStatus.last_status === "not_started"
) ?? [];
const successfulFileIndexingStatuses = fileIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.last_status === "success"
);
const failedFileIndexingStatuses = fileIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.last_status === "failed"
);
return (
<div className="mx-auto container">
<div className="mb-4">
<HealthCheckBanner />
</div>
<div className="border-solid border-gray-600 border-b pb-2 mb-4 flex">
<FileIcon size={32} />
<h1 className="text-3xl font-bold pl-2">File</h1>
</div>
<div>
{popup && <Popup message={popup.message} type={popup.type} />}
{filesAreUploading && <Spinner />}
<h2 className="font-bold mb-2 mt-6 ml-auto mr-auto">Upload Files</h2>
@ -201,81 +180,41 @@ export default function File() {
</div>
</div>
{inProgressFileIndexingStatuses.length > 0 && (
<>
<h2 className="font-bold mb-2 mt-6 ml-auto mr-auto">
In Progress File Indexing
</h2>
<BasicTable
columns={COLUMNS}
data={inProgressFileIndexingStatuses.map(
(connectorIndexingStatus) => {
return {
fileNames:
connectorIndexingStatus.connector.connector_specific_config.file_locations
.map(getNameFromPath)
.join(", "),
status: "In Progress",
};
}
)}
/>
</>
)}
{successfulFileIndexingStatuses.length > 0 && (
<>
<h2 className="font-bold mb-2 mt-6 ml-auto mr-auto">
Successful File Indexing
</h2>
<BasicTable
columns={COLUMNS}
data={successfulFileIndexingStatuses.map(
(connectorIndexingStatus) => {
return {
fileNames:
connectorIndexingStatus.connector.connector_specific_config.file_locations
.map(getNameFromPath)
.join(", "),
status: (
<div className="text-emerald-600 flex">
<CheckCircle className="my-auto mr-1" size="18" /> Success
</div>
),
};
}
)}
/>
</>
)}
{failedFileIndexingStatuses.length > 0 && (
<>
<h2 className="font-bold mb-2 mt-6 ml-auto mr-auto">
Failed File Indexing
</h2>
<p className="text-sm mb-3">
The following files failed to be indexed. Please contact an
administrator to resolve this issue.
</p>
<BasicTable
columns={COLUMNS}
data={failedFileIndexingStatuses.map((connectorIndexingStatus) => {
return {
fileNames:
connectorIndexingStatus.connector.connector_specific_config.file_locations
{fileIndexingStatuses.length > 0 && (
<div className="mt-6">
<SingleUseConnectorsTable<FileConfig, {}>
connectorIndexingStatuses={fileIndexingStatuses}
specialColumns={[
{
header: "File Names",
key: "file_names",
getValue: (connector) =>
connector.connector_specific_config.file_locations
.map(getNameFromPath)
.join(", "),
status: (
<div className="text-red-600 flex">
<XCircle className="my-auto mr-1" size="18" /> Failed
</div>
),
};
})}
},
]}
onUpdate={() =>
mutate("/api/manage/admin/connector/indexing-status")
}
/>
</>
</div>
)}
</div>
);
};
export default function File() {
return (
<div className="mx-auto container">
<div className="mb-4">
<HealthCheckBanner />
</div>
<div className="border-solid border-gray-600 border-b pb-2 mb-4 flex">
<FileIcon size={32} />
<h1 className="text-3xl font-bold pl-2">File</h1>
</div>
<Main />
</div>
);
}

View File

@ -24,7 +24,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -53,11 +53,13 @@ const Main = () => {
return <div>Failed to load credentials</div>;
}
const githubConnectorIndexingStatuses: ConnectorIndexingStatus<GithubConfig>[] =
connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "github"
);
const githubConnectorIndexingStatuses: ConnectorIndexingStatus<
GithubConfig,
GithubCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "github"
);
const githubCredential = credentialsData.filter(
(credential) => credential.credential_json?.github_access_token
)[0];

View File

@ -4,13 +4,21 @@ import { PopupSpec } from "@/components/admin/connectors/Popup";
import { StatusRow } from "@/components/admin/connectors/table/ConnectorsTable";
import { PencilIcon } from "@/components/icons/icons";
import { deleteConnector } from "@/lib/connector";
import { GoogleDriveConfig, ConnectorIndexingStatus } from "@/lib/types";
import {
GoogleDriveConfig,
ConnectorIndexingStatus,
GoogleDriveCredentialJson,
} from "@/lib/types";
import { useSWRConfig } from "swr";
import { useState } from "react";
import { ConnectorEditPopup } from "./ConnectorEditPopup";
import { DeleteColumn } from "@/components/admin/connectors/table/DeleteColumn";
interface EditableColumnProps {
connectorIndexingStatus: ConnectorIndexingStatus<GoogleDriveConfig>;
connectorIndexingStatus: ConnectorIndexingStatus<
GoogleDriveConfig,
GoogleDriveCredentialJson
>;
}
const EditableColumn = ({ connectorIndexingStatus }: EditableColumnProps) => {
@ -45,7 +53,10 @@ const EditableColumn = ({ connectorIndexingStatus }: EditableColumnProps) => {
};
interface TableProps {
googleDriveConnectorIndexingStatuses: ConnectorIndexingStatus<GoogleDriveConfig>[];
googleDriveConnectorIndexingStatuses: ConnectorIndexingStatus<
GoogleDriveConfig,
GoogleDriveCredentialJson
>[];
setPopup: (popupSpec: PopupSpec | null) => void;
}
@ -134,28 +145,13 @@ export const GoogleDriveConnectorsTable = ({
/>
),
delete: (
<Button
onClick={() => {
deleteConnector(connectorIndexingStatus.connector.id).then(
(errorMsg) => {
if (errorMsg) {
setPopup({
message: `Unable to delete existing connector - ${errorMsg}`,
type: "error",
});
} else {
setPopup({
message: "Successfully deleted connector!",
type: "success",
});
mutate("/api/manage/admin/connector/indexing-status");
}
}
);
}}
>
Delete Connector
</Button>
<DeleteColumn
connectorIndexingStatus={connectorIndexingStatus}
setPopup={setPopup}
onUpdate={() =>
mutate("/api/manage/admin/connector/indexing-status")
}
/>
),
})
)}

View File

@ -102,8 +102,14 @@ interface GoogleDriveConnectorManagementProps {
googleDrivePublicCredential:
| Credential<GoogleDriveCredentialJson>
| undefined;
googleDriveConnectorIndexingStatus: ConnectorIndexingStatus<GoogleDriveConfig> | null;
googleDriveConnectorIndexingStatuses: ConnectorIndexingStatus<GoogleDriveConfig>[];
googleDriveConnectorIndexingStatus: ConnectorIndexingStatus<
GoogleDriveConfig,
GoogleDriveCredentialJson
> | null;
googleDriveConnectorIndexingStatuses: ConnectorIndexingStatus<
GoogleDriveConfig,
GoogleDriveCredentialJson
>[];
credentialIsLinked: boolean;
setPopup: (popupSpec: PopupSpec | null) => void;
}
@ -323,7 +329,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -389,11 +395,13 @@ const Main = () => {
(credential) =>
credential.credential_json?.google_drive_tokens && credential.public_doc
);
const googleDriveConnectorIndexingStatuses: ConnectorIndexingStatus<GoogleDriveConfig>[] =
connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "google_drive"
);
const googleDriveConnectorIndexingStatuses: ConnectorIndexingStatus<
GoogleDriveConfig,
GoogleDriveCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "google_drive"
);
const googleDriveConnectorIndexingStatus =
googleDriveConnectorIndexingStatuses[0];

View File

@ -7,7 +7,6 @@ import { HealthCheckBanner } from "@/components/health/healthcheck";
import { CredentialForm } from "@/components/admin/connectors/CredentialForm";
import {
Credential,
ProductboardConfig,
ConnectorIndexingStatus,
GuruConfig,
GuruCredentialJson,
@ -28,7 +27,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -55,11 +54,13 @@ const Main = () => {
return <div>Failed to load credentials</div>;
}
const guruConnectorIndexingStatuses: ConnectorIndexingStatus<GuruConfig>[] =
connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "guru"
);
const guruConnectorIndexingStatuses: ConnectorIndexingStatus<
GuruConfig,
GuruCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "guru"
);
const guruCredential: Credential<GuruCredentialJson> = credentialsData.filter(
(credential) => credential.credential_json?.guru_user
)[0];

View File

@ -27,7 +27,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -54,7 +54,10 @@ const Main = () => {
return <div>Failed to load credentials</div>;
}
const jiraConnectorIndexingStatuses = connectorIndexingStatuses.filter(
const jiraConnectorIndexingStatuses: ConnectorIndexingStatus<
JiraConfig,
JiraCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "jira"
);

View File

@ -27,7 +27,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -55,7 +55,10 @@ const Main = () => {
return <div>Failed to load credentials</div>;
}
const notionConnectorIndexingStatuses = connectorIndexingStatuses.filter(
const notionConnectorIndexingStatuses: ConnectorIndexingStatus<
NotionConfig,
NotionCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "notion"
);

View File

@ -27,7 +27,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -54,11 +54,13 @@ const Main = () => {
return <div>Failed to load credentials</div>;
}
const productboardConnectorIndexingStatuses =
connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "productboard"
);
const productboardConnectorIndexingStatuses: ConnectorIndexingStatus<
ProductboardConfig,
ProductboardCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "productboard"
);
const productboardCredential = credentialsData.filter(
(credential) => credential.credential_json?.productboard_access_token
)[0];

View File

@ -27,7 +27,7 @@ const Main = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -54,11 +54,13 @@ const Main = () => {
return <div>Failed to load credentials</div>;
}
const slabConnectorIndexingStatuses: ConnectorIndexingStatus<SlabConfig>[] =
connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "slab"
);
const slabConnectorIndexingStatuses: ConnectorIndexingStatus<
SlabConfig,
SlabCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "slab"
);
const slabCredential = credentialsData.filter(
(credential) => credential.credential_json?.slab_bot_token
)[0];

View File

@ -27,7 +27,7 @@ const MainSection = () => {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
@ -56,11 +56,13 @@ const MainSection = () => {
return <div>Failed to load credentials</div>;
}
const slackConnectorIndexingStatuses: ConnectorIndexingStatus<SlackConfig>[] =
connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "slack"
);
const slackConnectorIndexingStatuses: ConnectorIndexingStatus<
SlackConfig,
SlackCredentialJson
>[] = connectorIndexingStatuses.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "slack"
);
const slackCredential = credentialsData.filter(
(credential) => credential.credential_json?.slack_bot_token
)[0];
@ -142,7 +144,7 @@ const MainSection = () => {
<b>10</b> minutes.
</p>
<div className="mb-2">
<ConnectorsTable
<ConnectorsTable<SlackConfig, SlackCredentialJson>
connectorIndexingStatuses={slackConnectorIndexingStatuses}
liveCredential={slackCredential}
getCredential={(credential) =>

View File

@ -20,12 +20,12 @@ export default function Web() {
data: connectorIndexingStatuses,
isLoading: isConnectorIndexingStatusesLoading,
error: isConnectorIndexingStatusesError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher
);
const webIndexingStatuses: ConnectorIndexingStatus<WebConfig>[] =
const webIndexingStatuses: ConnectorIndexingStatus<WebConfig, {}>[] =
connectorIndexingStatuses?.filter(
(connectorIndexingStatus) =>
connectorIndexingStatus.connector.source === "web"

View File

@ -13,7 +13,7 @@ import { HealthCheckBanner } from "@/components/health/healthcheck";
import { ConnectorIndexingStatus } from "@/lib/types";
const getSourceDisplay = (
connectorIndexingStatus: ConnectorIndexingStatus<any>
connectorIndexingStatus: ConnectorIndexingStatus<any, any>
) => {
const connector = connectorIndexingStatus.connector;
const sourceMetadata = getSourceMetadata(connector.source);
@ -64,7 +64,7 @@ function Main() {
data: indexAttemptData,
isLoading: indexAttemptIsLoading,
error: indexAttemptIsError,
} = useSWR<ConnectorIndexingStatus<any>[]>(
} = useSWR<ConnectorIndexingStatus<any, any>[]>(
"/api/manage/admin/connector/indexing-status",
fetcher,
{ refreshInterval: 30000 } // 30 seconds

View File

@ -1,5 +1,5 @@
import { HealthCheckBanner } from "@/components/health/healthcheck";
import {DISABLE_AUTH, OAUTH_NAME} from "@/lib/constants";
import { DISABLE_AUTH, OAUTH_NAME } from "@/lib/constants";
import { User } from "@/lib/types";
import { getGoogleOAuthUrlSS, getCurrentUserSS } from "@/lib/userSS";
import { redirect } from "next/navigation";

View File

@ -3,25 +3,31 @@ import { BasicTable } from "@/components/admin/connectors/BasicTable";
import { Popup, PopupSpec } from "@/components/admin/connectors/Popup";
import { useState } from "react";
import { LinkBreakIcon, LinkIcon, TrashIcon } from "@/components/icons/icons";
import { deleteConnector, updateConnector } from "@/lib/connector";
import { updateConnector } from "@/lib/connector";
import { AttachCredentialButtonForTable } from "@/components/admin/connectors/buttons/AttachCredentialButtonForTable";
import { scheduleDeletionJobForConnector } from "@/lib/documentDeletion";
import { DeleteColumn } from "./DeleteColumn";
interface StatusRowProps<ConnectorConfigType> {
connectorIndexingStatus: ConnectorIndexingStatus<ConnectorConfigType>;
interface StatusRowProps<ConnectorConfigType, ConnectorCredentialType> {
connectorIndexingStatus: ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
>;
hasCredentialsIssue: boolean;
setPopup: (popupSpec: PopupSpec | null) => void;
onUpdate: () => void;
}
export function StatusRow<ConnectorConfigType>({
export function StatusRow<ConnectorConfigType, ConnectorCredentialType>({
connectorIndexingStatus,
hasCredentialsIssue,
setPopup,
onUpdate,
}: StatusRowProps<ConnectorConfigType>) {
}: StatusRowProps<ConnectorConfigType, ConnectorCredentialType>) {
const [statusHovered, setStatusHovered] = useState<boolean>(false);
const connector = connectorIndexingStatus.connector;
let shouldDisplayDisabledToggle = !hasCredentialsIssue;
let statusDisplay;
switch (connectorIndexingStatus.last_status) {
case "failed":
@ -32,12 +38,21 @@ export function StatusRow<ConnectorConfigType>({
}
if (connector.disabled) {
statusDisplay = <div className="text-red-700">Disabled</div>;
connectorIndexingStatus.deletion_attempts.forEach((deletionAttempt) => {
if (
deletionAttempt.status === "in_progress" ||
deletionAttempt.status === "not_started"
) {
statusDisplay = <div className="text-red-700">Deleting...</div>;
shouldDisplayDisabledToggle = false;
}
});
}
return (
<div className="flex">
{statusDisplay}
{!hasCredentialsIssue && (
{shouldDisplayDisabledToggle && (
<div
className="cursor-pointer ml-1 my-auto relative"
onMouseEnter={() => setStatusHovered(true)}
@ -89,7 +104,10 @@ interface ColumnSpecification<ConnectorConfigType> {
}
interface ConnectorsTableProps<ConnectorConfigType, ConnectorCredentialType> {
connectorIndexingStatuses: ConnectorIndexingStatus<ConnectorConfigType>[];
connectorIndexingStatuses: ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
>[];
liveCredential?: Credential<ConnectorCredentialType> | null;
getCredential?: (
credential: Credential<ConnectorCredentialType>
@ -140,6 +158,7 @@ export function ConnectorsTable<ConnectorConfigType, ConnectorCredentialType>({
columns={columns}
data={connectorIndexingStatuses.map((connectorIndexingStatus) => {
const connector = connectorIndexingStatus.connector;
// const credential = connectorIndexingStatus.credential;
const hasValidCredentials =
liveCredential &&
connector.credential_ids.includes(liveCredential.id);
@ -170,30 +189,11 @@ export function ConnectorsTable<ConnectorConfigType, ConnectorCredentialType>({
/>
),
remove: (
<div
className="cursor-pointer mx-auto"
onClick={() => {
deleteConnector(connector.id).then((errorMsg) => {
if (errorMsg) {
setPopup({
message: `Unable to delete existing connector - ${errorMsg}`,
type: "error",
});
} else {
setPopup({
message: "Successfully deleted connector",
type: "success",
});
}
setTimeout(() => {
setPopup(null);
}, 4000);
onUpdate();
});
}}
>
<TrashIcon />
</div>
<DeleteColumn
connectorIndexingStatus={connectorIndexingStatus}
setPopup={setPopup}
onUpdate={onUpdate}
/>
),
...credential,
...(specialColumns

View File

@ -0,0 +1,78 @@
import { InfoIcon, TrashIcon } from "@/components/icons/icons";
import { scheduleDeletionJobForConnector } from "@/lib/documentDeletion";
import { ConnectorIndexingStatus } from "@/lib/types";
import { PopupSpec } from "../Popup";
import { useState } from "react";
interface Props<ConnectorConfigType, ConnectorCredentialType> {
connectorIndexingStatus: ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
>;
setPopup: (popupSpec: PopupSpec | null) => void;
onUpdate: () => void;
}
export function DeleteColumn<ConnectorConfigType, ConnectorCredentialType>({
connectorIndexingStatus,
setPopup,
onUpdate,
}: Props<ConnectorConfigType, ConnectorCredentialType>) {
const [deleteHovered, setDeleteHovered] = useState<boolean>(false);
console.log(deleteHovered);
const connector = connectorIndexingStatus.connector;
const credential = connectorIndexingStatus.credential;
return (
<div
className="relative"
onMouseEnter={() => setDeleteHovered(true)}
onMouseLeave={() => setDeleteHovered(false)}
>
{connectorIndexingStatus.is_deletable ? (
<div
className="cursor-pointer mx-auto flex"
onClick={async () => {
const deletionScheduleError = await scheduleDeletionJobForConnector(
connector.id,
credential.id
);
if (deletionScheduleError) {
setPopup({
message:
"Failed to schedule deletion of connector - " +
deletionScheduleError,
type: "error",
});
} else {
setPopup({
message: "Scheduled deletion of connector!",
type: "success",
});
}
setTimeout(() => {
setPopup(null);
}, 4000);
onUpdate();
}}
>
<TrashIcon />
</div>
) : (
<div>
{deleteHovered && (
<div className="flex flex-nowrap absolute mt-8 top-0 left-0 bg-gray-700 px-3 py-2 rounded shadow-lg text-xs">
<InfoIcon className="flex flex-shrink-0 text-blue-300 mr-2" />
In order to delete a connector it must be disabled and have no
ongoing / planned index jobs.
</div>
)}
<div className="flex mx-auto text-xs">
<TrashIcon className="my-auto flex flex-shrink-0 text-gray-600 mr-2" />
</div>
</div>
)}
</div>
);
}

View File

@ -0,0 +1,189 @@
import {
Connector,
ConnectorIndexingStatus,
Credential,
DeletionAttemptSnapshot,
ValidStatuses,
} from "@/lib/types";
import { BasicTable } from "@/components/admin/connectors/BasicTable";
import { Popup } from "@/components/admin/connectors/Popup";
import { useState } from "react";
import { TrashIcon } from "@/components/icons/icons";
import { updateConnector } from "@/lib/connector";
import { AttachCredentialButtonForTable } from "@/components/admin/connectors/buttons/AttachCredentialButtonForTable";
import { scheduleDeletionJobForConnector } from "@/lib/documentDeletion";
const SingleUseConnectorStatus = ({
indexingStatus,
deletionAttempts,
}: {
indexingStatus: ValidStatuses | null;
deletionAttempts: DeletionAttemptSnapshot[];
}) => {
for (let deletionAttempt of deletionAttempts) {
if (
deletionAttempt.status === "in_progress" ||
deletionAttempt.status === "not_started"
) {
return <div className="text-red-500">Deleting...</div>;
}
}
if (!indexingStatus || indexingStatus === "not_started") {
return <div className="text-gray-600">Not Started</div>;
}
if (indexingStatus === "in_progress") {
return <div className="text-gray-400">In Progress</div>;
}
if (indexingStatus === "success") {
return <div className="text-emerald-600">Success!</div>;
}
return <div className="text-red-700">Failed</div>;
};
interface ColumnSpecification<ConnectorConfigType> {
header: string;
key: string;
getValue: (connector: Connector<ConnectorConfigType>) => JSX.Element | string;
}
interface ConnectorsTableProps<ConnectorConfigType, ConnectorCredentialType> {
connectorIndexingStatuses: ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
>[];
liveCredential?: Credential<ConnectorCredentialType> | null;
getCredential?: (
credential: Credential<ConnectorCredentialType>
) => JSX.Element | string;
onUpdate: () => void;
onCredentialLink?: (connectorId: number) => void;
specialColumns?: ColumnSpecification<ConnectorConfigType>[];
}
export function SingleUseConnectorsTable<
ConnectorConfigType,
ConnectorCredentialType
>({
connectorIndexingStatuses,
liveCredential,
getCredential,
specialColumns,
onUpdate,
onCredentialLink,
}: ConnectorsTableProps<ConnectorConfigType, ConnectorCredentialType>) {
const [popup, setPopup] = useState<{
message: string;
type: "success" | "error";
} | null>(null);
const connectorIncludesCredential =
getCredential !== undefined && onCredentialLink !== undefined;
const columns = [
...(specialColumns ?? []),
{
header: "Status",
key: "status",
},
];
if (connectorIncludesCredential) {
columns.push({
header: "Credential",
key: "credential",
});
}
columns.push({
header: "Remove",
key: "remove",
});
return (
<>
{popup && <Popup message={popup.message} type={popup.type} />}
<BasicTable
columns={columns}
data={connectorIndexingStatuses.map((connectorIndexingStatus) => {
const connector = connectorIndexingStatus.connector;
// const credential = connectorIndexingStatus.credential;
const hasValidCredentials =
liveCredential &&
connector.credential_ids.includes(liveCredential.id);
const credential = connectorIncludesCredential
? {
credential: hasValidCredentials ? (
<div className="max-w-sm truncate">
{getCredential(liveCredential)}
</div>
) : liveCredential ? (
<AttachCredentialButtonForTable
onClick={() => onCredentialLink(connector.id)}
/>
) : (
<p className="text-red-700">N/A</p>
),
}
: { credential: "" };
return {
status: (
<SingleUseConnectorStatus
indexingStatus={connectorIndexingStatus.last_status}
deletionAttempts={connectorIndexingStatus.deletion_attempts}
/>
),
remove: (
<div
className="cursor-pointer mx-auto flex"
onClick={async () => {
// for one-time, just disable the connector at deletion time
// this is required before deletion can happen
await updateConnector({
...connector,
disabled: !connector.disabled,
});
const deletionScheduleError =
await scheduleDeletionJobForConnector(
connector.id,
connectorIndexingStatus.credential.id
);
if (deletionScheduleError) {
setPopup({
message:
"Failed to schedule deletion of connector - " +
deletionScheduleError,
type: "error",
});
} else {
setPopup({
message: "Scheduled deletion of connector!",
type: "success",
});
}
setTimeout(() => {
setPopup(null);
}, 4000);
onUpdate();
}}
>
<TrashIcon />
</div>
),
...credential,
...(specialColumns
? Object.fromEntries(
specialColumns.map(({ key, getValue }, i) => [
key,
getValue(connector),
])
)
: {}),
};
})}
/>
</>
);
}

View File

@ -70,7 +70,6 @@ export const SearchResultsDisplay: React.FC<SearchResultsDisplayProps> = ({
const shouldDisplayQA =
searchResponse.suggestedFlowType === FlowType.QUESTION_ANSWER ||
defaultOverrides.forceDisplayQA;
console.log(shouldDisplayQA);
return (
<>

View File

@ -0,0 +1,22 @@
export const scheduleDeletionJobForConnector = async (
connectorId: number,
credentialId: number
) => {
// Will schedule a background job which will:
// 1. Remove all documents indexed by the connector / credential pair
// 2. Remove the connector (if this is the only pair using the connector)
const response = await fetch(`/api/manage/admin/deletion-attempt`, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
connector_id: connectorId,
credential_id: credentialId,
}),
});
if (response.ok) {
return null;
}
return (await response.json()).detail;
};

View File

@ -21,6 +21,11 @@ export type ValidSources =
| "guru"
| "file";
export type ValidInputTypes = "load_state" | "poll" | "event";
export type ValidStatuses =
| "success"
| "failed"
| "in_progress"
| "not_started";
// CONNECTORS
export interface ConnectorBase<T> {
@ -82,13 +87,19 @@ export interface FileConfig {
export interface NotionConfig {}
export interface ConnectorIndexingStatus<T> {
connector: Connector<T>;
export interface ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
> {
connector: Connector<ConnectorConfigType>;
credential: Credential<ConnectorCredentialType>;
public_doc: boolean;
owner: string;
last_status: "success" | "failed" | "in_progress" | "not_started";
last_status: ValidStatuses | null;
last_success: string | null;
docs_indexed: number;
deletion_attempts: DeletionAttemptSnapshot[];
is_deletable: boolean;
}
// CREDENTIALS
@ -147,3 +158,12 @@ export interface GuruCredentialJson {
guru_user: string;
guru_user_token: string;
}
// DELETION
export interface DeletionAttemptSnapshot {
connector_id: number;
status: ValidStatuses;
error_msg?: string;
num_docs_deleted: number;
}