EE Connector Deletion Bugfix + Refactor (#2042)

---------

Co-authored-by: Weves <chrisweaver101@gmail.com>
This commit is contained in:
Nathan Schwerdfeger 2024-08-11 20:33:07 -07:00 committed by GitHub
parent 79523f2e0a
commit c7e5b11c63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
49 changed files with 998 additions and 800 deletions

View File

@ -0,0 +1,80 @@
"""Moved status to connector credential pair
Revision ID: 4a951134c801
Revises: 7477a5f5d728
Create Date: 2024-08-10 19:20:34.527559
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "4a951134c801"
down_revision = "7477a5f5d728"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"connector_credential_pair",
sa.Column(
"status",
sa.Enum(
"ACTIVE",
"PAUSED",
"DELETING",
name="connectorcredentialpairstatus",
native_enum=False,
),
nullable=True,
),
)
# Update status of connector_credential_pair based on connector's disabled status
op.execute(
"""
UPDATE connector_credential_pair
SET status = CASE
WHEN (
SELECT disabled
FROM connector
WHERE connector.id = connector_credential_pair.connector_id
) = FALSE THEN 'ACTIVE'
ELSE 'PAUSED'
END
"""
)
# Make the status column not nullable after setting values
op.alter_column("connector_credential_pair", "status", nullable=False)
op.drop_column("connector", "disabled")
def downgrade() -> None:
op.add_column(
"connector",
sa.Column("disabled", sa.BOOLEAN(), autoincrement=False, nullable=True),
)
# Update disabled status of connector based on connector_credential_pair's status
op.execute(
"""
UPDATE connector
SET disabled = CASE
WHEN EXISTS (
SELECT 1
FROM connector_credential_pair
WHERE connector_credential_pair.connector_id = connector.id
AND connector_credential_pair.status = 'ACTIVE'
) THEN FALSE
ELSE TRUE
END
"""
)
# Make the disabled column not nullable after setting values
op.alter_column("connector", "disabled", nullable=False)
op.drop_column("connector_credential_pair", "status")

View File

@ -5,19 +5,16 @@ from danswer.access.utils import prefix_user
from danswer.configs.constants import PUBLIC_DOC_PAT
from danswer.db.document import get_acccess_info_for_documents
from danswer.db.models import User
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from danswer.utils.variable_functionality import fetch_versioned_implementation
def _get_access_for_documents(
document_ids: list[str],
db_session: Session,
cc_pair_to_delete: ConnectorCredentialPairIdentifier | None = None,
) -> dict[str, DocumentAccess]:
document_access_info = get_acccess_info_for_documents(
db_session=db_session,
document_ids=document_ids,
cc_pair_to_delete=cc_pair_to_delete,
)
return {
document_id: DocumentAccess.build(user_ids, [], is_public)
@ -28,14 +25,13 @@ def _get_access_for_documents(
def get_access_for_documents(
document_ids: list[str],
db_session: Session,
cc_pair_to_delete: ConnectorCredentialPairIdentifier | None = None,
) -> dict[str, DocumentAccess]:
"""Fetches all access information for the given documents."""
versioned_get_access_for_documents_fn = fetch_versioned_implementation(
"danswer.access.access", "_get_access_for_documents"
)
return versioned_get_access_for_documents_fn(
document_ids, db_session, cc_pair_to_delete
document_ids, db_session
) # type: ignore

View File

@ -5,6 +5,7 @@ from celery import Celery # type: ignore
from sqlalchemy.orm import Session
from danswer.background.celery.celery_utils import extract_ids_from_runnable_connector
from danswer.background.celery.celery_utils import should_kick_off_deletion_of_cc_pair
from danswer.background.celery.celery_utils import should_prune_cc_pair
from danswer.background.celery.celery_utils import should_sync_doc_set
from danswer.background.connector_deletion import delete_connector_credential_pair
@ -270,6 +271,26 @@ def check_for_document_sets_sync_task() -> None:
)
@celery_app.task(
name="check_for_cc_pair_deletion_task",
soft_time_limit=JOB_TIMEOUT,
)
def check_for_cc_pair_deletion_task() -> None:
"""Runs periodically to check if any deletion tasks should be run"""
with Session(get_sqlalchemy_engine()) as db_session:
# check if any document sets are not synced
cc_pairs = get_connector_credential_pairs(db_session)
for cc_pair in cc_pairs:
if should_kick_off_deletion_of_cc_pair(cc_pair, db_session):
logger.info(f"Deleting the {cc_pair.name} connector credential pair")
cleanup_connector_credential_pair_task.apply_async(
kwargs=dict(
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
),
)
@celery_app.task(
name="check_for_prune_task",
soft_time_limit=JOB_TIMEOUT,
@ -305,6 +326,12 @@ celery_app.conf.beat_schedule = {
"task": "check_for_document_sets_sync_task",
"schedule": timedelta(seconds=5),
},
"check-for-cc-pair-deletion": {
"task": "check_for_cc_pair_deletion_task",
# don't need to check too often, since we kick off a deletion initially
# during the API call that actually marks the CC pair for deletion
"schedule": timedelta(minutes=1),
},
}
celery_app.conf.beat_schedule.update(
{

View File

@ -16,10 +16,14 @@ from danswer.connectors.interfaces import IdConnector
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.models import Document
from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
from danswer.db.engine import get_db_current_time
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.models import Connector
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import Credential
from danswer.db.models import DocumentSet
from danswer.db.models import TaskQueueState
from danswer.db.tasks import check_task_is_live_and_not_timed_out
from danswer.db.tasks import get_latest_task
from danswer.db.tasks import get_latest_task_by_type
@ -31,22 +35,52 @@ logger = setup_logger()
def get_deletion_status(
connector_id: int, credential_id: int, db_session: Session
) -> DeletionAttemptSnapshot | None:
) -> TaskQueueState | None:
cleanup_task_name = name_cc_cleanup_task(
connector_id=connector_id, credential_id=credential_id
)
task_state = get_latest_task(task_name=cleanup_task_name, db_session=db_session)
return get_latest_task(task_name=cleanup_task_name, db_session=db_session)
if not task_state:
def get_deletion_attempt_snapshot(
connector_id: int, credential_id: int, db_session: Session
) -> DeletionAttemptSnapshot | None:
deletion_task = get_deletion_status(connector_id, credential_id, db_session)
if not deletion_task:
return None
return DeletionAttemptSnapshot(
connector_id=connector_id,
credential_id=credential_id,
status=task_state.status,
status=deletion_task.status,
)
def should_kick_off_deletion_of_cc_pair(
cc_pair: ConnectorCredentialPair, db_session: Session
) -> bool:
if cc_pair.status != ConnectorCredentialPairStatus.DELETING:
return False
if check_deletion_attempt_is_allowed(cc_pair, db_session):
return False
deletion_task = get_deletion_status(
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
db_session=db_session,
)
if deletion_task and check_task_is_live_and_not_timed_out(
deletion_task,
db_session,
# 1 hour timeout
timeout=60 * 60,
):
return False
return True
def should_sync_doc_set(document_set: DocumentSet, db_session: Session) -> bool:
if document_set.is_up_to_date:
return False

View File

@ -10,8 +10,6 @@ are multiple connector / credential pairs that have indexed it
connector / credential pair from the access list
(6) delete all relevant entries from postgres
"""
import time
from sqlalchemy.orm import Session
from danswer.access.access import get_access_for_documents
@ -24,10 +22,8 @@ from danswer.db.document import delete_documents_complete__no_commit
from danswer.db.document import get_document_connector_cnts
from danswer.db.document import get_documents_for_connector_credential_pair
from danswer.db.document import prepare_to_modify_documents
from danswer.db.document_set import get_document_sets_by_ids
from danswer.db.document_set import (
mark_cc_pair__document_set_relationships_to_be_deleted__no_commit,
)
from danswer.db.document_set import delete_document_set_cc_pair_relationship__no_commit
from danswer.db.document_set import fetch_document_sets_for_documents
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.index_attempt import delete_index_attempts
from danswer.db.models import ConnectorCredentialPair
@ -35,6 +31,10 @@ from danswer.document_index.interfaces import DocumentIndex
from danswer.document_index.interfaces import UpdateRequest
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from danswer.utils.logger import setup_logger
from danswer.utils.variable_functionality import (
fetch_versioned_implementation_with_fallback,
)
from danswer.utils.variable_functionality import noop_fallback
logger = setup_logger()
@ -78,25 +78,37 @@ def delete_connector_credential_pair_batch(
document_ids_to_update = [
document_id for document_id, cnt in document_connector_cnts if cnt > 1
]
# maps document id to list of document set names
new_doc_sets_for_documents: dict[str, set[str]] = {
document_id_and_document_set_names_tuple[0]: set(
document_id_and_document_set_names_tuple[1]
)
for document_id_and_document_set_names_tuple in fetch_document_sets_for_documents(
db_session=db_session,
document_ids=document_ids_to_update,
)
}
# determine future ACLs for documents in batch
access_for_documents = get_access_for_documents(
document_ids=document_ids_to_update,
db_session=db_session,
cc_pair_to_delete=ConnectorCredentialPairIdentifier(
connector_id=connector_id,
credential_id=credential_id,
),
)
# update Vespa
logger.debug(f"Updating documents: {document_ids_to_update}")
update_requests = [
UpdateRequest(
document_ids=[document_id],
access=access,
document_sets=new_doc_sets_for_documents[document_id],
)
for document_id, access in access_for_documents.items()
]
logger.debug(f"Updating documents: {document_ids_to_update}")
document_index.update(update_requests=update_requests)
# clean up Postgres
delete_document_by_connector_credential_pair__no_commit(
db_session=db_session,
document_ids=document_ids_to_update,
@ -108,48 +120,6 @@ def delete_connector_credential_pair_batch(
db_session.commit()
def cleanup_synced_entities(
cc_pair: ConnectorCredentialPair, db_session: Session
) -> None:
"""Updates the document sets associated with the connector / credential pair,
then relies on the document set sync script to kick off Celery jobs which will
sync these updates to Vespa.
Waits until the document sets are synced before returning."""
logger.info(f"Cleaning up Document Sets for CC Pair with ID: '{cc_pair.id}'")
document_sets_ids_to_sync = list(
mark_cc_pair__document_set_relationships_to_be_deleted__no_commit(
cc_pair_id=cc_pair.id,
db_session=db_session,
)
)
db_session.commit()
# wait till all document sets are synced before continuing
while True:
all_synced = True
document_sets = get_document_sets_by_ids(
db_session=db_session, document_set_ids=document_sets_ids_to_sync
)
for document_set in document_sets:
if not document_set.is_up_to_date:
all_synced = False
if all_synced:
break
# wait for 30 seconds before checking again
db_session.commit() # end transaction
logger.info(
f"Document sets '{document_sets_ids_to_sync}' not synced yet, waiting 30s"
)
time.sleep(30)
logger.info(
f"Finished cleaning up Document Sets for CC Pair with ID: '{cc_pair.id}'"
)
def delete_connector_credential_pair(
db_session: Session,
document_index: DocumentIndex,
@ -177,17 +147,33 @@ def delete_connector_credential_pair(
)
num_docs_deleted += len(documents)
# Clean up document sets / access information from Postgres
# and sync these updates to Vespa
# TODO: add user group cleanup with `fetch_versioned_implementation`
cleanup_synced_entities(cc_pair, db_session)
# clean up the rest of the related Postgres entities
# index attempts
delete_index_attempts(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
# document sets
delete_document_set_cc_pair_relationship__no_commit(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
# user groups
cleanup_user_groups = fetch_versioned_implementation_with_fallback(
"danswer.db.user_group",
"delete_user_group_cc_pair_relationship__no_commit",
noop_fallback,
)
cleanup_user_groups(
cc_pair_id=cc_pair.id,
db_session=db_session,
)
# finally, delete the cc-pair
delete_connector_credential_pair__no_commit(
db_session=db_session,
connector_id=connector_id,

View File

@ -14,10 +14,10 @@ from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.models import IndexAttemptMetadata
from danswer.connectors.models import InputType
from danswer.db.connector import disable_connector
from danswer.db.connector_credential_pair import get_last_successful_attempt_time
from danswer.db.connector_credential_pair import update_connector_credential_pair
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.index_attempt import get_index_attempt
from danswer.db.index_attempt import mark_attempt_failed
from danswer.db.index_attempt import mark_attempt_in_progress
@ -61,7 +61,14 @@ def _get_document_generator(
)
except Exception as e:
logger.exception(f"Unable to instantiate connector due to {e}")
disable_connector(attempt.connector_credential_pair.connector.id, db_session)
# since we failed to even instantiate the connector, we pause the CCPair since
# it will never succeed
update_connector_credential_pair(
db_session=db_session,
connector_id=attempt.connector_credential_pair.connector.id,
credential_id=attempt.connector_credential_pair.credential.id,
status=ConnectorCredentialPairStatus.PAUSED,
)
raise e
if task == InputType.LOAD_STATE:
@ -130,6 +137,7 @@ def _run_indexing(
db_session=db_session,
)
db_cc_pair = index_attempt.connector_credential_pair
db_connector = index_attempt.connector_credential_pair.connector
db_credential = index_attempt.connector_credential_pair.credential
@ -181,7 +189,7 @@ def _run_indexing(
# contents still need to be initially pulled.
db_session.refresh(db_connector)
if (
db_connector.disabled
db_cc_pair.status == ConnectorCredentialPairStatus.PAUSED
and db_embedding_model.status != IndexModelStatus.FUTURE
):
# let the `except` block handle this
@ -246,7 +254,7 @@ def _run_indexing(
# to give better clarity in the UI, as the next run will never happen.
if (
ind == 0
or db_connector.disabled
or db_cc_pair.status == ConnectorCredentialPairStatus.PAUSED
or index_attempt.status != IndexingStatus.IN_PROGRESS
):
mark_attempt_failed(
@ -258,8 +266,8 @@ def _run_indexing(
if is_primary:
update_connector_credential_pair(
db_session=db_session,
connector_id=index_attempt.connector_credential_pair.connector.id,
credential_id=index_attempt.connector_credential_pair.credential.id,
connector_id=db_connector.id,
credential_id=db_credential.id,
net_docs=net_doc_change,
)
raise e

View File

@ -25,13 +25,14 @@ from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.engine import get_db_current_time
from danswer.db.engine import get_sqlalchemy_engine
from danswer.db.engine import init_sqlalchemy_engine
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_index_attempt
from danswer.db.index_attempt import get_inprogress_index_attempts
from danswer.db.index_attempt import get_last_attempt_for_cc_pair
from danswer.db.index_attempt import get_not_started_index_attempts
from danswer.db.index_attempt import mark_attempt_failed
from danswer.db.models import Connector
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import EmbeddingModel
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
@ -57,12 +58,14 @@ _UNEXPECTED_STATE_FAILURE_REASON = (
def _should_create_new_indexing(
connector: Connector,
cc_pair: ConnectorCredentialPair,
last_index: IndexAttempt | None,
model: EmbeddingModel,
secondary_index_building: bool,
db_session: Session,
) -> bool:
connector = cc_pair.connector
# User can still manually create single indexing attempts via the UI for the
# currently in use index
if DISABLE_INDEX_UPDATE_ON_SWAP:
@ -89,10 +92,10 @@ def _should_create_new_indexing(
return False
return True
# If the connector is disabled or is the ingestion API, don't index
# If the connector is paused or is the ingestion API, don't index
# NOTE: during an embedding model switch over, the following logic
# is bypassed by the above check for a future model
if connector.disabled or connector.id == 0:
if cc_pair.status == ConnectorCredentialPairStatus.PAUSED or connector.id == 0:
return False
if not last_index:
@ -187,7 +190,7 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None:
cc_pair.id, model.id, db_session
)
if not _should_create_new_indexing(
connector=cc_pair.connector,
cc_pair=cc_pair,
last_index=last_attempt,
model=model,
secondary_index_building=len(embedding_models) > 1,

View File

@ -1,6 +1,5 @@
from typing import cast
from fastapi import HTTPException
from sqlalchemy import and_
from sqlalchemy import exists
from sqlalchemy import func
@ -34,15 +33,12 @@ def fetch_connectors(
db_session: Session,
sources: list[DocumentSource] | None = None,
input_types: list[InputType] | None = None,
disabled_status: bool | None = None,
) -> list[Connector]:
stmt = select(Connector)
if sources is not None:
stmt = stmt.where(Connector.source.in_(sources))
if input_types is not None:
stmt = stmt.where(Connector.input_type.in_(input_types))
if disabled_status is not None:
stmt = stmt.where(Connector.disabled == disabled_status)
results = db_session.scalars(stmt)
return list(results.all())
@ -97,7 +93,6 @@ def create_connector(
refresh_freq=connector_data.refresh_freq,
indexing_start=connector_data.indexing_start,
prune_freq=connector_data.prune_freq,
disabled=connector_data.disabled,
)
db_session.add(connector)
db_session.commit()
@ -131,33 +126,18 @@ def update_connector(
if connector_data.prune_freq is not None
else DEFAULT_PRUNING_FREQ
)
connector.disabled = connector_data.disabled
db_session.commit()
return connector
def disable_connector(
connector_id: int,
db_session: Session,
) -> StatusResponse[int]:
connector = fetch_connector_by_id(connector_id, db_session)
if connector is None:
raise HTTPException(status_code=404, detail="Connector does not exist")
connector.disabled = True
db_session.commit()
return StatusResponse(
success=True, message="Connector deleted successfully", data=connector_id
)
def delete_connector(
connector_id: int,
db_session: Session,
) -> StatusResponse[int]:
"""Currently unused due to foreign key restriction from IndexAttempt
Use disable_connector instead"""
"""Only used in special cases (e.g. a connector is in a bad state and we need to delete it).
Be VERY careful using this, as it could lead to a bad state if not used correctly.
"""
connector = fetch_connector_by_id(connector_id, db_session)
if connector is None:
return StatusResponse(
@ -188,11 +168,9 @@ def fetch_latest_index_attempt_by_connector(
latest_index_attempts: list[IndexAttempt] = []
if source:
connectors = fetch_connectors(
db_session, sources=[source], disabled_status=False
)
connectors = fetch_connectors(db_session, sources=[source])
else:
connectors = fetch_connectors(db_session, disabled_status=False)
connectors = fetch_connectors(db_session)
if not connectors:
return []
@ -261,7 +239,6 @@ def create_initial_default_connector(db_session: Session) -> None:
default_connector.source != DocumentSource.INGESTION_API
or default_connector.input_type != InputType.LOAD_STATE
or default_connector.refresh_freq is not None
or default_connector.disabled
or default_connector.name != "Ingestion API"
or default_connector.connector_specific_config != {}
or default_connector.prune_freq is not None
@ -273,7 +250,6 @@ def create_initial_default_connector(db_session: Session) -> None:
default_connector.source = DocumentSource.INGESTION_API
default_connector.input_type = InputType.LOAD_STATE
default_connector.refresh_freq = None
default_connector.disabled = False
default_connector.name = "Ingestion API"
default_connector.connector_specific_config = {}
default_connector.prune_freq = None
@ -289,7 +265,6 @@ def create_initial_default_connector(db_session: Session) -> None:
connector_specific_config={},
refresh_freq=None,
prune_freq=None,
disabled=False,
)
db_session.add(connector)
db_session.commit()

View File

@ -9,6 +9,7 @@ from sqlalchemy.orm import Session
from danswer.configs.constants import DocumentSource
from danswer.db.connector import fetch_connector_by_id
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import EmbeddingModel
from danswer.db.models import IndexAttempt
@ -26,7 +27,9 @@ def get_connector_credential_pairs(
) -> list[ConnectorCredentialPair]:
stmt = select(ConnectorCredentialPair)
if not include_disabled:
stmt = stmt.where(ConnectorCredentialPair.connector.disabled == False) # noqa
stmt = stmt.where(
ConnectorCredentialPair.status == ConnectorCredentialPairStatus.ACTIVE
) # noqa
results = db_session.scalars(stmt)
return list(results.all())
@ -109,10 +112,56 @@ def get_last_successful_attempt_time(
return attempt.time_started.timestamp()
"""Updates"""
def _update_connector_credential_pair(
db_session: Session,
cc_pair: ConnectorCredentialPair,
status: ConnectorCredentialPairStatus | None = None,
net_docs: int | None = None,
run_dt: datetime | None = None,
) -> None:
# simply don't update last_successful_index_time if run_dt is not specified
# at worst, this would result in re-indexing documents that were already indexed
if run_dt is not None:
cc_pair.last_successful_index_time = run_dt
if net_docs is not None:
cc_pair.total_docs_indexed += net_docs
if status is not None:
cc_pair.status = status
db_session.commit()
def update_connector_credential_pair_from_id(
db_session: Session,
cc_pair_id: int,
status: ConnectorCredentialPairStatus | None = None,
net_docs: int | None = None,
run_dt: datetime | None = None,
) -> None:
cc_pair = get_connector_credential_pair_from_id(cc_pair_id, db_session)
if not cc_pair:
logger.warning(
f"Attempted to update pair for Connector Credential Pair '{cc_pair_id}'"
f" but it does not exist"
)
return
_update_connector_credential_pair(
db_session=db_session,
cc_pair=cc_pair,
status=status,
net_docs=net_docs,
run_dt=run_dt,
)
def update_connector_credential_pair(
db_session: Session,
connector_id: int,
credential_id: int,
status: ConnectorCredentialPairStatus | None = None,
net_docs: int | None = None,
run_dt: datetime | None = None,
) -> None:
@ -123,13 +172,14 @@ def update_connector_credential_pair(
f"and credential id {credential_id}"
)
return
# simply don't update last_successful_index_time if run_dt is not specified
# at worst, this would result in re-indexing documents that were already indexed
if run_dt is not None:
cc_pair.last_successful_index_time = run_dt
if net_docs is not None:
cc_pair.total_docs_indexed += net_docs
db_session.commit()
_update_connector_credential_pair(
db_session=db_session,
cc_pair=cc_pair,
status=status,
net_docs=net_docs,
run_dt=run_dt,
)
def delete_connector_credential_pair__no_commit(
@ -160,6 +210,8 @@ def associate_default_cc_pair(db_session: Session) -> None:
connector_id=0,
credential_id=0,
name="DefaultCCPair",
status=ConnectorCredentialPairStatus.ACTIVE,
is_public=True,
)
db_session.add(association)
db_session.commit()
@ -204,6 +256,7 @@ def add_credential_to_connector(
connector_id=connector_id,
credential_id=credential_id,
name=cc_pair_name,
status=ConnectorCredentialPairStatus.ACTIVE,
is_public=is_public,
)
db_session.add(association)

View File

@ -1,6 +1,7 @@
from sqlalchemy.orm import Session
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.index_attempt import get_last_attempt
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import IndexingStatus
@ -13,7 +14,7 @@ def check_deletion_attempt_is_allowed(
) -> str | None:
"""
To be deletable:
(1) connector should be disabled
(1) connector should be paused
(2) there should be no in-progress/planned index attempts
Returns an error message if the deletion attempt is not allowed, otherwise None.
@ -23,7 +24,10 @@ def check_deletion_attempt_is_allowed(
f"'{connector_credential_pair.credential_id}' is not deletable."
)
if not connector_credential_pair.connector.disabled:
if (
connector_credential_pair.status != ConnectorCredentialPairStatus.PAUSED
and connector_credential_pair.status != ConnectorCredentialPairStatus.DELETING
):
return base_error_msg + " Connector must be paused."
connector_id = connector_credential_pair.connector_id

View File

@ -17,6 +17,7 @@ from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import Session
from danswer.configs.constants import DEFAULT_BOOST
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.feedback import delete_document_feedback_for_documents__no_commit
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import Credential
@ -110,36 +111,19 @@ def get_document_cnts_for_cc_pairs(
def get_acccess_info_for_documents(
db_session: Session,
document_ids: list[str],
cc_pair_to_delete: ConnectorCredentialPairIdentifier | None = None,
) -> Sequence[tuple[str, list[UUID | None], bool]]:
"""Gets back all relevant access info for the given documents. This includes
the user_ids for cc pairs that the document is associated with + whether any
of the associated cc pairs are intending to make the document globally public.
If `cc_pair_to_delete` is specified, gets the above access info as if that
pair had been deleted. This is needed since we want to delete from the Vespa
before deleting from Postgres to ensure that the state of Postgres never "loses"
documents that still exist in Vespa.
"""
stmt = select(
DocumentByConnectorCredentialPair.id,
func.array_agg(Credential.user_id).label("user_ids"),
func.bool_or(ConnectorCredentialPair.is_public).label("public_doc"),
).where(DocumentByConnectorCredentialPair.id.in_(document_ids))
# pretend that the specified cc pair doesn't exist
if cc_pair_to_delete:
stmt = stmt.where(
and_(
DocumentByConnectorCredentialPair.connector_id
!= cc_pair_to_delete.connector_id,
DocumentByConnectorCredentialPair.credential_id
!= cc_pair_to_delete.credential_id,
)
)
stmt = (
stmt.join(
select(
DocumentByConnectorCredentialPair.id,
func.array_agg(Credential.user_id).label("user_ids"),
func.bool_or(ConnectorCredentialPair.is_public).label("public_doc"),
)
.where(DocumentByConnectorCredentialPair.id.in_(document_ids))
.join(
Credential,
DocumentByConnectorCredentialPair.credential_id == Credential.id,
)
@ -152,6 +136,9 @@ def get_acccess_info_for_documents(
== ConnectorCredentialPair.credential_id,
),
)
# don't include CC pairs that are being deleted
# NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them
.where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING)
.group_by(DocumentByConnectorCredentialPair.id)
)
return db_session.execute(stmt).all() # type: ignore

View File

@ -9,6 +9,7 @@ from sqlalchemy import or_
from sqlalchemy import select
from sqlalchemy.orm import Session
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import Document
from danswer.db.models import DocumentByConnectorCredentialPair
@ -270,37 +271,20 @@ def mark_document_set_as_to_be_deleted(
raise
def mark_cc_pair__document_set_relationships_to_be_deleted__no_commit(
cc_pair_id: int, db_session: Session
) -> set[int]:
"""Marks all CC Pair -> Document Set relationships for the specified
`cc_pair_id` as not current and returns the list of all document set IDs
affected.
NOTE: raises a `ValueError` if any of the document sets are currently syncing
to avoid getting into a bad state."""
document_set__cc_pair_relationships = db_session.scalars(
select(DocumentSet__ConnectorCredentialPair).where(
def delete_document_set_cc_pair_relationship__no_commit(
connector_id: int, credential_id: int, db_session: Session
) -> None:
"""Deletes all rows from DocumentSet__ConnectorCredentialPair where the
connector_credential_pair_id matches the given cc_pair_id."""
delete_stmt = delete(DocumentSet__ConnectorCredentialPair).where(
and_(
ConnectorCredentialPair.connector_id == connector_id,
ConnectorCredentialPair.credential_id == credential_id,
DocumentSet__ConnectorCredentialPair.connector_credential_pair_id
== cc_pair_id
== ConnectorCredentialPair.id,
)
).all()
document_set_ids_touched: set[int] = set()
for document_set__cc_pair_relationship in document_set__cc_pair_relationships:
document_set__cc_pair_relationship.is_current = False
if not document_set__cc_pair_relationship.document_set.is_up_to_date:
raise ValueError(
"Cannot delete CC pair while it is attached to a document set "
"that is syncing. Please wait for the document set to finish "
"syncing, and then try again."
)
document_set__cc_pair_relationship.document_set.is_up_to_date = False
document_set_ids_touched.add(document_set__cc_pair_relationship.document_set_id)
return document_set_ids_touched
)
db_session.execute(delete_stmt)
def fetch_document_sets(
@ -431,8 +415,10 @@ def fetch_documents_for_document_set_paginated(
def fetch_document_sets_for_documents(
document_ids: list[str], db_session: Session
document_ids: list[str],
db_session: Session,
) -> Sequence[tuple[str, list[str]]]:
"""Gives back a list of (document_id, list[document_set_names]) tuples"""
stmt = (
select(Document.id, func.array_agg(DocumentSetDBModel.name))
.join(
@ -459,6 +445,10 @@ def fetch_document_sets_for_documents(
Document.id == DocumentByConnectorCredentialPair.id,
)
.where(Document.id.in_(document_ids))
# don't include CC pairs that are being deleted
# NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them
# as we can assume their document sets are no longer relevant
.where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING)
.where(DocumentSet__ConnectorCredentialPair.is_current == True) # noqa: E712
.group_by(Document.id)
)

View File

@ -33,3 +33,9 @@ class IndexModelStatus(str, PyEnum):
class ChatSessionSharedStatus(str, PyEnum):
PUBLIC = "public"
PRIVATE = "private"
class ConnectorCredentialPairStatus(str, PyEnum):
ACTIVE = "ACTIVE"
PAUSED = "PAUSED"
DELETING = "DELETING"

View File

@ -340,15 +340,14 @@ def expire_index_attempts(
db_session.commit()
def cancel_indexing_attempts_for_connector(
connector_id: int,
def cancel_indexing_attempts_for_ccpair(
cc_pair_id: int,
db_session: Session,
include_secondary_index: bool = False,
) -> None:
stmt = (
delete(IndexAttempt)
.where(IndexAttempt.connector_credential_pair_id == ConnectorCredentialPair.id)
.where(ConnectorCredentialPair.connector_id == connector_id)
.where(IndexAttempt.connector_credential_pair_id == cc_pair_id)
.where(IndexAttempt.status == IndexingStatus.NOT_STARTED)
)
@ -366,6 +365,8 @@ def cancel_indexing_attempts_for_connector(
def cancel_indexing_attempts_past_model(
db_session: Session,
) -> None:
"""Stops all indexing attempts that are in progress or not started for
any embedding model that not present/future"""
db_session.execute(
update(IndexAttempt)
.where(

View File

@ -43,6 +43,7 @@ from danswer.configs.constants import SearchFeedbackType
from danswer.configs.constants import TokenRateLimitScope
from danswer.connectors.models import InputType
from danswer.db.enums import ChatSessionSharedStatus
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.enums import IndexingStatus
from danswer.db.enums import IndexModelStatus
from danswer.db.enums import TaskStatus
@ -361,6 +362,9 @@ class ConnectorCredentialPair(Base):
nullable=False,
)
name: Mapped[str] = mapped_column(String, nullable=False)
status: Mapped[ConnectorCredentialPairStatus] = mapped_column(
Enum(ConnectorCredentialPairStatus, native_enum=False), nullable=False
)
connector_id: Mapped[int] = mapped_column(
ForeignKey("connector.id"), primary_key=True
)
@ -490,7 +494,6 @@ class Connector(Base):
time_updated: Mapped[datetime.datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
disabled: Mapped[bool] = mapped_column(Boolean, default=False)
credentials: Mapped[list["ConnectorCredentialPair"]] = relationship(
"ConnectorCredentialPair",

View File

@ -1,17 +1,24 @@
from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from danswer.auth.users import current_admin_user
from danswer.auth.users import current_user
from danswer.background.celery.celery_utils import get_deletion_status
from danswer.background.celery.celery_utils import get_deletion_attempt_snapshot
from danswer.db.connector_credential_pair import add_credential_to_connector
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.connector_credential_pair import remove_credential_from_connector
from danswer.db.connector_credential_pair import (
update_connector_credential_pair_from_id,
)
from danswer.db.document import get_document_cnts_for_cc_pairs
from danswer.db.engine import get_session
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import get_index_attempts_for_connector
from danswer.db.models import User
from danswer.server.documents.models import CCPairFullInfo
@ -58,20 +65,42 @@ def get_cc_pair_full_info(
document_count_info_list[0][-1] if document_count_info_list else 0
)
latest_deletion_attempt = get_deletion_status(
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
db_session=db_session,
)
return CCPairFullInfo.from_models(
cc_pair_model=cc_pair,
index_attempt_models=list(index_attempts),
latest_deletion_attempt=latest_deletion_attempt,
latest_deletion_attempt=get_deletion_attempt_snapshot(
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
db_session=db_session,
),
num_docs_indexed=documents_indexed,
)
class CCStatusUpdateRequest(BaseModel):
status: ConnectorCredentialPairStatus
@router.put("/admin/cc-pair/{cc_pair_id}/status")
def update_cc_pair_status(
cc_pair_id: int,
status_update_request: CCStatusUpdateRequest,
_: User | None = Depends(current_admin_user),
db_session: Session = Depends(get_session),
) -> None:
if status_update_request.status == ConnectorCredentialPairStatus.PAUSED:
cancel_indexing_attempts_for_ccpair(cc_pair_id, db_session)
# Just for good measure
cancel_indexing_attempts_past_model(db_session)
update_connector_credential_pair_from_id(
db_session=db_session,
cc_pair_id=cc_pair_id,
status=status_update_request.status,
)
@router.put("/admin/cc-pair/{cc_pair_id}/name")
def update_cc_pair_name(
cc_pair_id: int,

View File

@ -13,7 +13,7 @@ from sqlalchemy.orm import Session
from danswer.auth.users import current_admin_user
from danswer.auth.users import current_user
from danswer.background.celery.celery_utils import get_deletion_status
from danswer.background.celery.celery_utils import get_deletion_attempt_snapshot
from danswer.configs.app_configs import ENABLED_CONNECTOR_TYPES
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import FileOrigin
@ -62,8 +62,6 @@ from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
from danswer.db.document import get_document_cnts_for_cc_pairs
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.engine import get_session
from danswer.db.index_attempt import cancel_indexing_attempts_for_connector
from danswer.db.index_attempt import cancel_indexing_attempts_past_model
from danswer.db.index_attempt import create_index_attempt
from danswer.db.index_attempt import get_index_attempts_for_cc_pair
from danswer.db.index_attempt import get_latest_finished_index_attempt_for_cc_pair
@ -432,6 +430,7 @@ def get_connector_indexing_status(
ConnectorIndexingStatus(
cc_pair_id=cc_pair.id,
name=cc_pair.name,
cc_pair_status=cc_pair.status,
connector=ConnectorSnapshot.from_connector_db_model(connector),
credential=CredentialSnapshot.from_credential_db_model(credential),
public_doc=cc_pair.is_public,
@ -456,7 +455,7 @@ def get_connector_indexing_status(
if latest_index_attempt
else None
),
deletion_attempt=get_deletion_status(
deletion_attempt=get_deletion_attempt_snapshot(
connector_id=connector.id,
credential_id=credential.id,
db_session=db_session,
@ -550,12 +549,6 @@ def update_connector_from_model(
status_code=404, detail=f"Connector {connector_id} does not exist"
)
if updated_connector.disabled:
cancel_indexing_attempts_for_connector(connector_id, db_session)
# Just for good measure
cancel_indexing_attempts_past_model(db_session)
return ConnectorSnapshot(
id=updated_connector.id,
name=updated_connector.name,
@ -570,7 +563,6 @@ def update_connector_from_model(
indexing_start=updated_connector.indexing_start,
time_created=updated_connector.time_created,
time_updated=updated_connector.time_updated,
disabled=updated_connector.disabled,
)
@ -793,7 +785,6 @@ def get_connector_by_id(
],
time_created=connector.time_created,
time_updated=connector.time_updated,
disabled=connector.disabled,
)

View File

@ -7,6 +7,7 @@ from pydantic import BaseModel
from danswer.configs.app_configs import MASK_CREDENTIAL_PREFIX
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.models import Connector
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import Credential
@ -39,7 +40,6 @@ class ConnectorBase(BaseModel):
connector_specific_config: dict[str, Any]
refresh_freq: int | None # In seconds, None for one time index with no refresh
prune_freq: int | None
disabled: bool
indexing_start: datetime | None
@ -70,7 +70,6 @@ class ConnectorSnapshot(ConnectorBase):
indexing_start=connector.indexing_start,
time_created=connector.time_created,
time_updated=connector.time_updated,
disabled=connector.disabled,
)
@ -151,6 +150,7 @@ class IndexAttemptSnapshot(BaseModel):
class CCPairFullInfo(BaseModel):
id: int
name: str
status: ConnectorCredentialPairStatus
num_docs_indexed: int
connector: ConnectorSnapshot
credential: CredentialSnapshot
@ -168,6 +168,7 @@ class CCPairFullInfo(BaseModel):
return cls(
id=cc_pair_model.id,
name=cc_pair_model.name,
status=cc_pair_model.status,
num_docs_indexed=num_docs_indexed,
connector=ConnectorSnapshot.from_connector_db_model(
cc_pair_model.connector
@ -188,6 +189,7 @@ class ConnectorIndexingStatus(BaseModel):
cc_pair_id: int
name: str | None
cc_pair_status: ConnectorCredentialPairStatus
connector: ConnectorSnapshot
credential: CredentialSnapshot
owner: str

View File

@ -13,12 +13,16 @@ from danswer.configs.app_configs import GENERATIVE_MODEL_ACCESS_CHECK_FREQ
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import KV_GEN_AI_KEY_CHECK_TIME
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.connector_credential_pair import (
update_connector_credential_pair_from_id,
)
from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
from danswer.db.engine import get_session
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.feedback import fetch_docs_ranked_by_boost
from danswer.db.feedback import update_document_boost
from danswer.db.feedback import update_document_hidden
from danswer.db.index_attempt import cancel_indexing_attempts_for_connector
from danswer.db.index_attempt import cancel_indexing_attempts_for_ccpair
from danswer.db.models import User
from danswer.document_index.document_index_utils import get_both_index_names
from danswer.document_index.factory import get_default_document_index
@ -164,8 +168,8 @@ def create_deletion_attempt_for_connector_id(
)
# Cancel any scheduled indexing attempts
cancel_indexing_attempts_for_connector(
connector_id=connector_id, db_session=db_session, include_secondary_index=True
cancel_indexing_attempts_for_ccpair(
cc_pair_id=cc_pair.id, db_session=db_session, include_secondary_index=True
)
# Check if the deletion attempt should be allowed
@ -178,6 +182,13 @@ def create_deletion_attempt_for_connector_id(
detail=deletion_attempt_disallowed_reason,
)
# mark as deleting
update_connector_credential_pair_from_id(
db_session=db_session,
cc_pair_id=cc_pair.id,
status=ConnectorCredentialPairStatus.DELETING,
)
# actually kick off the deletion
cleanup_connector_credential_pair_task.apply_async(
kwargs=dict(connector_id=connector_id, credential_id=credential_id),
)

View File

@ -76,3 +76,7 @@ def fetch_versioned_implementation_with_fallback(
e,
)
return fallback
def noop_fallback(*args: Any, **kwargs: Any) -> None:
pass

View File

@ -7,7 +7,6 @@ from danswer.access.access import _get_acl_for_user as get_acl_for_user_without_
from danswer.access.models import DocumentAccess
from danswer.access.utils import prefix_user_group
from danswer.db.models import User
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from ee.danswer.db.user_group import fetch_user_groups_for_documents
from ee.danswer.db.user_group import fetch_user_groups_for_user
@ -15,19 +14,16 @@ from ee.danswer.db.user_group import fetch_user_groups_for_user
def _get_access_for_documents(
document_ids: list[str],
db_session: Session,
cc_pair_to_delete: ConnectorCredentialPairIdentifier | None,
) -> dict[str, DocumentAccess]:
non_ee_access_dict = get_access_for_documents_without_groups(
document_ids=document_ids,
db_session=db_session,
cc_pair_to_delete=cc_pair_to_delete,
)
user_group_info = {
document_id: group_names
for document_id, group_names in fetch_user_groups_for_documents(
db_session=db_session,
document_ids=document_ids,
cc_pair_to_delete=cc_pair_to_delete,
)
}

View File

@ -1,13 +1,36 @@
from sqlalchemy import delete
from sqlalchemy.orm import Session
from danswer.configs.constants import DocumentSource
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.models import Connector
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import UserGroup__ConnectorCredentialPair
from danswer.utils.logger import setup_logger
logger = setup_logger()
def _delete_connector_credential_pair_user_groups_relationship__no_commit(
db_session: Session, connector_id: int, credential_id: int
) -> None:
cc_pair = get_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
)
if cc_pair is None:
raise ValueError(
f"ConnectorCredentialPair with connector_id: {connector_id} "
f"and credential_id: {credential_id} not found"
)
stmt = delete(UserGroup__ConnectorCredentialPair).where(
UserGroup__ConnectorCredentialPair.cc_pair_id == cc_pair.id,
)
db_session.execute(stmt)
def get_cc_pairs_by_source(
source_type: DocumentSource,
db_session: Session,

View File

@ -2,10 +2,13 @@ from collections.abc import Sequence
from operator import and_
from uuid import UUID
from sqlalchemy import delete
from sqlalchemy import func
from sqlalchemy import select
from sqlalchemy.orm import Session
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import Document
from danswer.db.models import DocumentByConnectorCredentialPair
@ -15,7 +18,6 @@ from danswer.db.models import User
from danswer.db.models import User__UserGroup
from danswer.db.models import UserGroup
from danswer.db.models import UserGroup__ConnectorCredentialPair
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
from ee.danswer.server.user_group.models import UserGroupCreate
from ee.danswer.server.user_group.models import UserGroupUpdate
@ -90,7 +92,6 @@ def fetch_documents_for_user_group_paginated(
def fetch_user_groups_for_documents(
db_session: Session,
document_ids: list[str],
cc_pair_to_delete: ConnectorCredentialPairIdentifier | None = None,
) -> Sequence[tuple[int, list[str]]]:
stmt = (
select(Document.id, func.array_agg(UserGroup.name))
@ -114,19 +115,12 @@ def fetch_user_groups_for_documents(
.join(Document, Document.id == DocumentByConnectorCredentialPair.id)
.where(Document.id.in_(document_ids))
.where(UserGroup__ConnectorCredentialPair.is_current == True) # noqa: E712
# don't include CC pairs that are being deleted
# NOTE: CC pairs can never go from DELETING to any other state -> it's safe to ignore them
.where(ConnectorCredentialPair.status != ConnectorCredentialPairStatus.DELETING)
.group_by(Document.id)
)
# pretend that the specified cc pair doesn't exist
if cc_pair_to_delete is not None:
stmt = stmt.where(
and_(
ConnectorCredentialPair.connector_id != cc_pair_to_delete.connector_id,
ConnectorCredentialPair.credential_id
!= cc_pair_to_delete.credential_id,
)
)
return db_session.execute(stmt).all() # type: ignore
@ -343,3 +337,25 @@ def delete_user_group(db_session: Session, user_group: UserGroup) -> None:
db_session.delete(user_group)
db_session.commit()
def delete_user_group_cc_pair_relationship__no_commit(
cc_pair_id: int, db_session: Session
) -> None:
"""Deletes all rows from UserGroup__ConnectorCredentialPair where the
connector_credential_pair_id matches the given cc_pair_id.
Should be used very carefully (only for connectors that are being deleted)."""
cc_pair = get_connector_credential_pair_from_id(cc_pair_id, db_session)
if not cc_pair:
raise ValueError(f"Connector Credential Pair '{cc_pair_id}' does not exist")
if cc_pair.status != ConnectorCredentialPairStatus.DELETING:
raise ValueError(
f"Connector Credential Pair '{cc_pair_id}' is not in the DELETING state"
)
delete_stmt = delete(UserGroup__ConnectorCredentialPair).where(
UserGroup__ConnectorCredentialPair.cc_pair_id == cc_pair_id,
)
db_session.execute(delete_stmt)

View File

@ -5,6 +5,8 @@ import sys
from sqlalchemy import delete
from sqlalchemy.orm import Session
from danswer.db.enums import ConnectorCredentialPairStatus
# Modify sys.path
current_dir = os.path.dirname(os.path.abspath(__file__))
parent_dir = os.path.dirname(current_dir)
@ -22,7 +24,7 @@ from danswer.db.connector import fetch_connector_by_id
from danswer.db.document import get_documents_for_connector_credential_pair
from danswer.db.index_attempt import (
delete_index_attempts,
cancel_indexing_attempts_for_connector,
cancel_indexing_attempts_for_ccpair,
)
from danswer.db.models import ConnectorCredentialPair
from danswer.document_index.interfaces import DocumentIndex
@ -141,10 +143,10 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None:
logger.error(f"Connector credential pair with ID {cc_pair_id} not found")
return
if not cc_pair.connector.disabled:
if cc_pair.status != ConnectorCredentialPairStatus.PAUSED:
logger.error(
f"Connector {cc_pair.connector.name} is not disabled, cannot continue. \
Please navigate to the connector and disbale before attempting again"
f"Connector {cc_pair.connector.name} is not paused, cannot continue. \
Please navigate to the connector and pause before attempting again"
)
return
@ -159,8 +161,8 @@ def _delete_connector(cc_pair_id: int, db_session: Session) -> None:
return
logger.info("Cancelling indexing attempt for the connector")
cancel_indexing_attempts_for_connector(
connector_id=connector_id, db_session=db_session, include_secondary_index=True
cancel_indexing_attempts_for_ccpair(
cc_pair_id=cc_pair_id, db_session=db_session, include_secondary_index=True
)
validated_cc_pair = get_connector_credential_pair(

View File

@ -0,0 +1,66 @@
import requests
from sqlalchemy.orm import Session
from danswer.db.models import User
def test_create_chat_session_and_send_messages(db_session: Session) -> None:
# Create a test user
test_user = User(email="test@example.com", hashed_password="dummy_hash")
db_session.add(test_user)
db_session.commit()
base_url = "http://localhost:8080" # Adjust this to your API's base URL
headers = {"Authorization": f"Bearer {test_user.id}"}
# Create a new chat session
create_session_response = requests.post(
f"{base_url}/chat/create-chat-session",
json={
"description": "Test Chat",
"persona_id": 1,
}, # Assuming persona_id 1 exists
headers=headers,
)
assert create_session_response.status_code == 200
chat_session_id = create_session_response.json()["chat_session_id"]
# Send first message
first_message = "Hello, this is a test message."
send_message_response = requests.post(
f"{base_url}/chat/send-message",
json={
"chat_session_id": chat_session_id,
"message": first_message,
"prompt_id": None,
"retrieval_options": {"top_k": 3},
"stream_response": False,
},
headers=headers,
)
assert send_message_response.status_code == 200
# Send second message
second_message = "Can you provide more information?"
send_message_response = requests.post(
f"{base_url}/chat/send-message",
json={
"chat_session_id": chat_session_id,
"message": second_message,
"prompt_id": None,
"retrieval_options": {"top_k": 3},
"stream_response": False,
},
headers=headers,
)
assert send_message_response.status_code == 200
# Verify chat session details
get_session_response = requests.get(
f"{base_url}/chat/get-chat-session/{chat_session_id}", headers=headers
)
assert get_session_response.status_code == 200
session_details = get_session_response.json()
assert session_details["chat_session_id"] == chat_session_id
assert session_details["description"] == "Test Chat"
assert len(session_details["messages"]) == 4 # 2 user messages + 2 AI responses

View File

@ -0,0 +1,114 @@
import uuid
from typing import cast
import requests
from pydantic import BaseModel
from danswer.configs.constants import DocumentSource
from danswer.db.enums import ConnectorCredentialPairStatus
from tests.integration.common.constants import API_SERVER_URL
class ConnectorCreationDetails(BaseModel):
connector_id: int
credential_id: int
cc_pair_id: int
class ConnectorClient:
@staticmethod
def create_connector(
name_prefix: str = "test_connector", credential_id: int | None = None
) -> ConnectorCreationDetails:
unique_id = uuid.uuid4()
connector_name = f"{name_prefix}_{unique_id}"
connector_data = {
"name": connector_name,
"source": DocumentSource.NOT_APPLICABLE,
"input_type": "load_state",
"connector_specific_config": {},
"refresh_freq": 60,
"disabled": True,
}
response = requests.post(
f"{API_SERVER_URL}/manage/admin/connector",
json=connector_data,
)
response.raise_for_status()
connector_id = response.json()["id"]
# associate the credential with the connector
if not credential_id:
print("ID not specified, creating new credential")
# Create a new credential
credential_data = {
"credential_json": {},
"admin_public": True,
"source": DocumentSource.NOT_APPLICABLE,
}
response = requests.post(
f"{API_SERVER_URL}/manage/credential",
json=credential_data,
)
response.raise_for_status()
credential_id = cast(int, response.json()["id"])
cc_pair_metadata = {"name": f"test_cc_pair_{unique_id}", "is_public": True}
response = requests.put(
f"{API_SERVER_URL}/manage/connector/{connector_id}/credential/{credential_id}",
json=cc_pair_metadata,
)
response.raise_for_status()
# fetch the conenector credential pair id using the indexing status API
response = requests.get(
f"{API_SERVER_URL}/manage/admin/connector/indexing-status"
)
response.raise_for_status()
indexing_statuses = response.json()
cc_pair_id = None
for status in indexing_statuses:
if (
status["connector"]["id"] == connector_id
and status["credential"]["id"] == credential_id
):
cc_pair_id = status["cc_pair_id"]
break
if cc_pair_id is None:
raise ValueError("Could not find the connector credential pair id")
print(
f"Created connector with connector_id: {connector_id}, credential_id: {credential_id}, cc_pair_id: {cc_pair_id}"
)
return ConnectorCreationDetails(
connector_id=int(connector_id),
credential_id=int(credential_id),
cc_pair_id=int(cc_pair_id),
)
@staticmethod
def update_connector_status(
cc_pair_id: int, status: ConnectorCredentialPairStatus
) -> None:
response = requests.put(
f"{API_SERVER_URL}/manage/admin/cc-pair/{cc_pair_id}/status",
json={"status": status},
)
response.raise_for_status()
@staticmethod
def delete_connector(connector_id: int, credential_id: int) -> None:
response = requests.post(
f"{API_SERVER_URL}/manage/admin/deletion-attempt",
json={"connector_id": connector_id, "credential_id": credential_id},
)
response.raise_for_status()
@staticmethod
def get_connectors() -> list[dict]:
response = requests.get(f"{API_SERVER_URL}/manage/connector")
response.raise_for_status()
return response.json()

View File

@ -1 +1,2 @@
API_SERVER_URL = "http://localhost:8080"
MAX_DELAY = 30

View File

@ -0,0 +1,30 @@
from typing import cast
import requests
from danswer.server.features.document_set.models import DocumentSet
from danswer.server.features.document_set.models import DocumentSetCreationRequest
from tests.integration.common.constants import API_SERVER_URL
class DocumentSetClient:
@staticmethod
def create_document_set(
doc_set_creation_request: DocumentSetCreationRequest,
) -> int:
response = requests.post(
f"{API_SERVER_URL}/manage/admin/document-set",
json=doc_set_creation_request.dict(),
)
response.raise_for_status()
return cast(int, response.json())
@staticmethod
def fetch_document_sets() -> list[DocumentSet]:
response = requests.get(f"{API_SERVER_URL}/manage/admin/document-set")
response.raise_for_status()
document_sets = [
DocumentSet.parse_obj(doc_set_data) for doc_set_data in response.json()
]
return document_sets

View File

@ -82,6 +82,10 @@ def reset_postgres(database: str = "postgres") -> None:
if table_name == "alembic_version":
continue
# Don't touch Kombu
if table_name == "kombu_message" or table_name == "kombu_queue":
continue
cur.execute(f'DELETE FROM "{table_name}"')
# Re-enable triggers

View File

@ -1,10 +1,10 @@
import uuid
from typing import cast
import requests
from pydantic import BaseModel
from danswer.configs.constants import DocumentSource
from tests.integration.common.connectors import ConnectorClient
from tests.integration.common.constants import API_SERVER_URL
@ -15,34 +15,12 @@ class SeedDocumentResponse(BaseModel):
class TestDocumentClient:
@staticmethod
def seed_documents(num_docs: int = 5) -> SeedDocumentResponse:
unique_id = uuid.uuid4()
# Create a connector
connector_name = f"test_connector_{unique_id}"
connector_data = {
"name": connector_name,
"source": DocumentSource.NOT_APPLICABLE,
"input_type": "load_state",
"connector_specific_config": {},
"refresh_freq": 60,
"disabled": True,
}
response = requests.post(
f"{API_SERVER_URL}/manage/admin/connector",
json=connector_data,
)
response.raise_for_status()
connector_id = response.json()["id"]
# Associate the credential with the connector
cc_pair_metadata = {"name": f"test_cc_pair_{unique_id}", "is_public": True}
response = requests.put(
f"{API_SERVER_URL}/manage/connector/{connector_id}/credential/0",
json=cc_pair_metadata,
)
response.raise_for_status()
cc_pair_id = cast(int, response.json()["data"])
def seed_documents(
num_docs: int = 5, cc_pair_id: int | None = None
) -> SeedDocumentResponse:
if not cc_pair_id:
connector_details = ConnectorClient.create_connector()
cc_pair_id = connector_details.cc_pair_id
# Create and ingest some documents
document_ids: list[str] = []

View File

@ -0,0 +1,24 @@
from typing import cast
import requests
from ee.danswer.server.user_group.models import UserGroup
from ee.danswer.server.user_group.models import UserGroupCreate
from tests.integration.common.constants import API_SERVER_URL
class UserGroupClient:
@staticmethod
def create_user_group(user_group_creation_request: UserGroupCreate) -> int:
response = requests.post(
f"{API_SERVER_URL}/manage/admin/user-group",
json=user_group_creation_request.dict(),
)
response.raise_for_status()
return cast(int, response.json()["id"])
@staticmethod
def fetch_user_groups() -> list[UserGroup]:
response = requests.get(f"{API_SERVER_URL}/manage/admin/user-group")
response.raise_for_status()
return [UserGroup(**ug) for ug in response.json()]

View File

@ -0,0 +1,190 @@
import time
from danswer.db.enums import ConnectorCredentialPairStatus
from danswer.server.features.document_set.models import DocumentSetCreationRequest
from tests.integration.common.connectors import ConnectorClient
from tests.integration.common.constants import MAX_DELAY
from tests.integration.common.document_sets import DocumentSetClient
from tests.integration.common.seed_documents import TestDocumentClient
from tests.integration.common.user_groups import UserGroupClient
from tests.integration.common.user_groups import UserGroupCreate
from tests.integration.common.vespa import TestVespaClient
def test_connector_deletion(reset: None, vespa_client: TestVespaClient) -> None:
# create connectors
c1_details = ConnectorClient.create_connector(name_prefix="tc1")
c2_details = ConnectorClient.create_connector(name_prefix="tc2")
c1_seed_res = TestDocumentClient.seed_documents(
num_docs=5, cc_pair_id=c1_details.cc_pair_id
)
c2_seed_res = TestDocumentClient.seed_documents(
num_docs=5, cc_pair_id=c2_details.cc_pair_id
)
# create document sets
doc_set_1_id = DocumentSetClient.create_document_set(
DocumentSetCreationRequest(
name="Test Document Set 1",
description="Intially connector to be deleted, should be empty after test",
cc_pair_ids=[c1_details.cc_pair_id],
is_public=True,
users=[],
groups=[],
)
)
doc_set_2_id = DocumentSetClient.create_document_set(
DocumentSetCreationRequest(
name="Test Document Set 2",
description="Intially both connectors, should contain undeleted connector after test",
cc_pair_ids=[c1_details.cc_pair_id, c2_details.cc_pair_id],
is_public=True,
users=[],
groups=[],
)
)
# wait for document sets to be synced
start = time.time()
while True:
doc_sets = DocumentSetClient.fetch_document_sets()
doc_set_1 = next(
(doc_set for doc_set in doc_sets if doc_set.id == doc_set_1_id), None
)
doc_set_2 = next(
(doc_set for doc_set in doc_sets if doc_set.id == doc_set_2_id), None
)
if not doc_set_1 or not doc_set_2:
raise RuntimeError("Document set not found")
if doc_set_1.is_up_to_date and doc_set_2.is_up_to_date:
break
if time.time() - start > MAX_DELAY:
raise TimeoutError("Document sets were not synced within the max delay")
time.sleep(2)
print("Document sets created and synced")
# if so, create ACLs
user_group_1 = UserGroupClient.create_user_group(
UserGroupCreate(
name="Test User Group 1", user_ids=[], cc_pair_ids=[c1_details.cc_pair_id]
)
)
user_group_2 = UserGroupClient.create_user_group(
UserGroupCreate(
name="Test User Group 2",
user_ids=[],
cc_pair_ids=[c1_details.cc_pair_id, c2_details.cc_pair_id],
)
)
# wait for user groups to be available
start = time.time()
while True:
user_groups = {ug.id: ug for ug in UserGroupClient.fetch_user_groups()}
if not (
user_group_1 in user_groups.keys() and user_group_2 in user_groups.keys()
):
raise RuntimeError("User groups not found")
if (
user_groups[user_group_1].is_up_to_date
and user_groups[user_group_2].is_up_to_date
):
break
if time.time() - start > MAX_DELAY:
raise TimeoutError("User groups were not synced within the max delay")
time.sleep(2)
print("User groups created and synced")
# delete connector 1
ConnectorClient.update_connector_status(
cc_pair_id=c1_details.cc_pair_id, status=ConnectorCredentialPairStatus.PAUSED
)
ConnectorClient.delete_connector(
connector_id=c1_details.connector_id, credential_id=c1_details.credential_id
)
start = time.time()
while True:
connectors = ConnectorClient.get_connectors()
if c1_details.connector_id not in [c["id"] for c in connectors]:
break
if time.time() - start > MAX_DELAY:
raise TimeoutError("Connector 1 was not deleted within the max delay")
time.sleep(2)
print("Connector 1 deleted")
# validate vespa documents
c1_vespa_docs = vespa_client.get_documents_by_id(c1_seed_res.document_ids)[
"documents"
]
c2_vespa_docs = vespa_client.get_documents_by_id(c2_seed_res.document_ids)[
"documents"
]
assert len(c1_vespa_docs) == 0
assert len(c2_vespa_docs) == 5
for doc in c2_vespa_docs:
assert doc["fields"]["access_control_list"] == {
"PUBLIC": 1,
"group:Test User Group 2": 1,
}
assert doc["fields"]["document_sets"] == {"Test Document Set 2": 1}
# check that only connector 1 is deleted
# TODO: check for the CC pair rather than the connector once the refactor is done
all_connectors = ConnectorClient.get_connectors()
assert len(all_connectors) == 1
assert all_connectors[0]["id"] == c2_details.connector_id
# validate document sets
all_doc_sets = DocumentSetClient.fetch_document_sets()
assert len(all_doc_sets) == 2
doc_set_1_found = False
doc_set_2_found = False
for doc_set in all_doc_sets:
if doc_set.id == doc_set_1_id:
doc_set_1_found = True
assert doc_set.cc_pair_descriptors == []
if doc_set.id == doc_set_2_id:
doc_set_2_found = True
assert len(doc_set.cc_pair_descriptors) == 1
assert doc_set.cc_pair_descriptors[0].id == c2_details.cc_pair_id
assert doc_set_1_found
assert doc_set_2_found
# validate user groups
all_user_groups = UserGroupClient.fetch_user_groups()
assert len(all_user_groups) == 2
user_group_1_found = False
user_group_2_found = False
for user_group in all_user_groups:
if user_group.id == user_group_1:
user_group_1_found = True
assert user_group.cc_pairs == []
if user_group.id == user_group_2:
user_group_2_found = True
assert len(user_group.cc_pairs) == 1
assert user_group.cc_pairs[0].id == c2_details.cc_pair_id
assert user_group_1_found
assert user_group_2_found

View File

@ -165,7 +165,6 @@ def create_connector(env_name: str, file_paths: list[str]) -> int:
connector_specific_config={"file_locations": file_paths},
refresh_freq=None,
prune_freq=None,
disabled=False,
indexing_start=None,
)

View File

@ -1,7 +1,7 @@
"use client";
import { Button } from "@tremor/react";
import { CCPairFullInfo } from "./types";
import { CCPairFullInfo, ConnectorCredentialPairStatus } from "./types";
import { usePopup } from "@/components/admin/connectors/Popup";
import { FiTrash } from "react-icons/fi";
import { deleteCCPair } from "@/lib/documentDeletion";
@ -16,7 +16,7 @@ export function DeletionButton({ ccPair }: { ccPair: CCPairFullInfo }) {
ccPair?.latest_deletion_attempt?.status === "STARTED";
let tooltip: string;
if (ccPair.connector.disabled) {
if (ccPair.status !== ConnectorCredentialPairStatus.ACTIVE) {
if (isDeleting) {
tooltip = "This connector is currently being deleted";
} else {
@ -41,7 +41,9 @@ export function DeletionButton({ ccPair }: { ccPair: CCPairFullInfo }) {
)
}
icon={FiTrash}
disabled={!ccPair.connector.disabled || isDeleting}
disabled={
ccPair.status === ConnectorCredentialPairStatus.ACTIVE || isDeleting
}
tooltip={tooltip}
>
Delete

View File

@ -1,11 +1,11 @@
"use client";
import { Button } from "@tremor/react";
import { CCPairFullInfo } from "./types";
import { CCPairFullInfo, ConnectorCredentialPairStatus } from "./types";
import { usePopup } from "@/components/admin/connectors/Popup";
import { disableConnector } from "@/lib/connector";
import { mutate } from "swr";
import { buildCCPairInfoUrl } from "./lib";
import { setCCPairStatus } from "@/lib/ccPair";
export function ModifyStatusButtonCluster({
ccPair,
@ -17,13 +17,16 @@ export function ModifyStatusButtonCluster({
return (
<>
{popup}
{ccPair.connector.disabled ? (
{ccPair.status === ConnectorCredentialPairStatus.PAUSED ? (
<Button
color="green"
size="xs"
onClick={() =>
disableConnector(ccPair.connector, setPopup, () =>
mutate(buildCCPairInfoUrl(ccPair.id))
setCCPairStatus(
ccPair.id,
ConnectorCredentialPairStatus.ACTIVE,
setPopup,
() => mutate(buildCCPairInfoUrl(ccPair.id))
)
}
tooltip="Click to start indexing again!"
@ -35,8 +38,11 @@ export function ModifyStatusButtonCluster({
color="red"
size="xs"
onClick={() =>
disableConnector(ccPair.connector, setPopup, () =>
mutate(buildCCPairInfoUrl(ccPair.id))
setCCPairStatus(
ccPair.id,
ConnectorCredentialPairStatus.PAUSED,
setPopup,
() => mutate(buildCCPairInfoUrl(ccPair.id))
)
}
tooltip={

View File

@ -1,6 +1,6 @@
"use client";
import { CCPairFullInfo } from "./types";
import { CCPairFullInfo, ConnectorCredentialPairStatus } from "./types";
import { HealthCheckBanner } from "@/components/health/healthcheck";
import { CCPairStatus } from "@/components/Status";
import { BackButton } from "@/components/BackButton";
@ -92,7 +92,7 @@ function Main({ ccPairId }: { ccPairId: number }) {
}
const lastIndexAttempt = ccPair.index_attempts[0];
const isDeleting = isCurrentlyDeleting(ccPair.latest_deletion_attempt);
const isDeleting = ccPair.status === ConnectorCredentialPairStatus.DELETING;
// figure out if we need to artificially deflate the number of docs indexed.
// This is required since the total number of docs indexed by a CC Pair is
@ -112,9 +112,6 @@ function Main({ ccPairId }: { ccPairId: number }) {
setEditableName(ccPair.name);
setIsEditing(true);
};
const deleting =
ccPair.latest_deletion_attempt?.status == "PENDING" ||
ccPair.latest_deletion_attempt?.status == "STARTED";
const resetEditing = () => {
setIsEditing(false);
@ -169,16 +166,18 @@ function Main({ ccPairId }: { ccPairId: number }) {
ccPairId={ccPair.id}
connectorId={ccPair.connector.id}
credentialId={ccPair.credential.id}
isDisabled={ccPair.connector.disabled}
isDisabled={
ccPair.status === ConnectorCredentialPairStatus.PAUSED
}
isDeleting={isDeleting}
/>
)}
{!deleting && <ModifyStatusButtonCluster ccPair={ccPair} />}
{!isDeleting && <ModifyStatusButtonCluster ccPair={ccPair} />}
</div>
</div>
<CCPairStatus
status={lastIndexAttempt?.status || "not_started"}
disabled={ccPair.connector.disabled}
disabled={ccPair.status === ConnectorCredentialPairStatus.PAUSED}
isDeleting={isDeleting}
/>
<div className="text-sm mt-1">

View File

@ -2,9 +2,16 @@ import { Connector } from "@/lib/connectors/connectors";
import { Credential } from "@/lib/connectors/credentials";
import { DeletionAttemptSnapshot, IndexAttemptSnapshot } from "@/lib/types";
export enum ConnectorCredentialPairStatus {
ACTIVE = "ACTIVE",
PAUSED = "PAUSED",
DELETING = "DELETING",
}
export interface CCPairFullInfo {
id: number;
name: string;
status: ConnectorCredentialPairStatus;
num_docs_indexed: number;
connector: Connector<any>;
credential: Credential<any>;

View File

@ -189,7 +189,6 @@ export default function AddConnector({
refresh_freq: refreshFreq * 60 || null,
prune_freq: pruneFreq * 60 || null,
indexing_start: indexingStart,
disabled: false,
},
undefined,
credentialActivated ? false : true,

View File

@ -43,7 +43,6 @@ export const submitFiles = async (
refresh_freq: null,
prune_freq: null,
indexing_start: null,
disabled: false,
});
if (connectorErrorMsg || !connector) {
setPopup({

View File

@ -44,7 +44,6 @@ export const submitGoogleSite = async (
refresh_freq: advancedConfig.refreshFreq,
prune_freq: advancedConfig.pruneFreq,
indexing_start: advancedConfig.indexingStart,
disabled: false,
});
if (connectorErrorMsg || !connector) {
setPopup({

View File

@ -32,6 +32,7 @@ import { Warning } from "@phosphor-icons/react";
import Cookies from "js-cookie";
import { TOGGLED_CONNECTORS_COOKIE_NAME } from "@/lib/constants";
import { usePaidEnterpriseFeaturesEnabled } from "@/components/settings/usePaidEnterpriseFeaturesEnabled";
import { ConnectorCredentialPairStatus } from "../../connector/[ccPairId]/types";
const columnWidths = {
first: "20%",
@ -146,20 +147,25 @@ function ConnectorRow({
};
const getActivityBadge = () => {
if (ccPairsIndexingStatus.connector.disabled) {
if (ccPairsIndexingStatus.deletion_attempt) {
return (
<Badge
color="red"
className="w-fit px-2 py-1 rounded-full border border-red-500"
>
<div className="flex text-xs items-center gap-x-1">
<div className="w-3 h-3 rounded-full bg-red-500"></div>
Deleting
</div>
</Badge>
);
}
if (
ccPairsIndexingStatus.cc_pair_status ===
ConnectorCredentialPairStatus.DELETING
) {
return (
<Badge
color="red"
className="w-fit px-2 py-1 rounded-full border border-red-500"
>
<div className="flex text-xs items-center gap-x-1">
<div className="w-3 h-3 rounded-full bg-red-500"></div>
Deleting
</div>
</Badge>
);
} else if (
ccPairsIndexingStatus.cc_pair_status ===
ConnectorCredentialPairStatus.PAUSED
) {
return (
<Badge
color="yellow"
@ -172,6 +178,8 @@ function ConnectorRow({
</Badge>
);
}
// ACTIVE case
switch (ccPairsIndexingStatus.last_status) {
case "in_progress":
return (
@ -302,7 +310,10 @@ export function CCPairIndexingStatusTable({
const statuses = grouped[source];
summaries[source] = {
count: statuses.length,
active: statuses.filter((status) => !status.connector.disabled).length,
active: statuses.filter(
(status) =>
status.cc_pair_status === ConnectorCredentialPairStatus.ACTIVE
).length,
public: statuses.filter((status) => status.public_doc).length,
totalDocsIndexed: statuses.reduce(
(sum, status) => sum + status.docs_indexed,
@ -362,6 +373,7 @@ export function CCPairIndexingStatusTable({
ccPairsIndexingStatus={{
cc_pair_id: 1,
name: "Sample File Connector",
cc_pair_status: ConnectorCredentialPairStatus.ACTIVE,
last_status: "success",
connector: {
name: "Sample File Connector",
@ -373,7 +385,6 @@ export function CCPairIndexingStatusTable({
refresh_freq: 86400,
prune_freq: null,
indexing_start: new Date("2023-07-01T12:00:00Z"),
disabled: false,
id: 1,
credential_ids: [],
time_created: "2023-07-01T12:00:00Z",

View File

@ -194,7 +194,6 @@ export function ConnectorForm<T extends Yup.AnyObject>({
connector_specific_config: connectorConfig,
refresh_freq: refreshFreq || 0,
prune_freq: pruneFreq ?? null,
disabled: false,
indexing_start: indexingStart || null,
});
@ -343,7 +342,6 @@ export function UpdateConnectorForm<T extends Yup.AnyObject>({
connector_specific_config: values,
refresh_freq: existingConnector.refresh_freq,
prune_freq: existingConnector.prune_freq,
disabled: false,
indexing_start: existingConnector.indexing_start,
},
existingConnector.id

View File

@ -1,251 +0,0 @@
import { ConnectorIndexingStatus } from "@/lib/types";
import { PopupSpec, usePopup } from "@/components/admin/connectors/Popup";
import { useState } from "react";
import { LinkBreakIcon, LinkIcon } from "@/components/icons/icons";
import { disableConnector } from "@/lib/connector";
import { AttachCredentialButtonForTable } from "@/components/admin/connectors/buttons/AttachCredentialButtonForTable";
import { DeleteColumn } from "./DeleteColumn";
import {
Table,
TableHead,
TableRow,
TableHeaderCell,
TableBody,
TableCell,
} from "@tremor/react";
import { FiCheck, FiXCircle } from "react-icons/fi";
import { Credential } from "@/lib/connectors/credentials";
interface StatusRowProps<ConnectorConfigType, ConnectorCredentialType> {
connectorIndexingStatus: ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
>;
hasCredentialsIssue: boolean;
setPopup: (popupSpec: PopupSpec | null) => void;
onUpdate: () => void;
}
export function StatusRow<ConnectorConfigType, ConnectorCredentialType>({
connectorIndexingStatus,
hasCredentialsIssue,
setPopup,
onUpdate,
}: StatusRowProps<ConnectorConfigType, ConnectorCredentialType>) {
const [statusHovered, setStatusHovered] = useState<boolean>(false);
const connector = connectorIndexingStatus.connector;
let shouldDisplayDisabledToggle = !hasCredentialsIssue;
let statusDisplay;
switch (connectorIndexingStatus.last_status) {
case "failed":
statusDisplay = <div className="text-error">Failed</div>;
break;
default:
statusDisplay = <div className="text-success flex">Enabled!</div>;
}
if (connector.disabled) {
const deletionAttempt = connectorIndexingStatus.deletion_attempt;
if (!deletionAttempt || deletionAttempt.status === "FAILURE") {
statusDisplay = <div className="text-error">Paused</div>;
} else {
statusDisplay = <div className="text-error">Deleting...</div>;
shouldDisplayDisabledToggle = false;
}
}
return (
<div className="flex">
{statusDisplay}
{shouldDisplayDisabledToggle && (
<div
className="cursor-pointer ml-1 my-auto relative"
onMouseEnter={() => setStatusHovered(true)}
onMouseLeave={() => setStatusHovered(false)}
onClick={() => disableConnector(connector, setPopup, onUpdate)}
>
{statusHovered && (
<div className="flex flex-nowrap absolute top-0 left-0 ml-8 bg-background border border-border px-3 py-2 rounded shadow-lg">
{connector.disabled ? "Enable!" : "Pause!"}
</div>
)}
{connector.disabled ? (
<LinkIcon className="my-auto flex flex-shrink-0 text-error" />
) : (
<LinkBreakIcon
className={`my-auto flex flex-shrink-0 ${
connectorIndexingStatus.last_status === "failed"
? "text-error"
: "text-success"
}`}
/>
)}
</div>
)}
</div>
);
}
export interface ColumnSpecification<
ConnectorConfigType,
ConnectorCredentialType,
> {
header: string;
key: string;
getValue: (
ccPairStatus: ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
>
) => JSX.Element | string | undefined;
}
export interface ConnectorsTableProps<
ConnectorConfigType,
ConnectorCredentialType,
> {
connectorIndexingStatuses: ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
>[];
liveCredential?: Credential<ConnectorCredentialType> | null;
getCredential?: (
credential: Credential<ConnectorCredentialType>
) => JSX.Element | string;
onUpdate: () => void;
onCredentialLink?: (connectorId: number) => void;
specialColumns?: ColumnSpecification<
ConnectorConfigType,
ConnectorCredentialType
>[];
includeName?: boolean;
}
export function ConnectorsTable<ConnectorConfigType, ConnectorCredentialType>({
connectorIndexingStatuses,
liveCredential,
getCredential,
specialColumns,
onUpdate,
onCredentialLink,
includeName = false,
}: ConnectorsTableProps<ConnectorConfigType, ConnectorCredentialType>) {
const { popup, setPopup } = usePopup();
const connectorIncludesCredential =
getCredential !== undefined && onCredentialLink !== undefined;
const columns = [
...(includeName ? [{ header: "Name", key: "name" }] : []),
...(specialColumns ?? []),
{
header: "Status",
key: "status",
},
{
header: "Is Public",
key: "is_public",
},
];
if (connectorIncludesCredential) {
columns.push({
header: "Credential",
key: "credential",
});
}
columns.push({
header: "Remove",
key: "remove",
});
return (
<div>
{popup}
<Table className="overflow-visible">
<TableHead>
<TableRow>
{includeName && <TableHeaderCell>Name</TableHeaderCell>}
{specialColumns?.map(({ header }) => (
<TableHeaderCell key={header}>{header}</TableHeaderCell>
))}
<TableHeaderCell>Status</TableHeaderCell>
<TableHeaderCell>Is Public</TableHeaderCell>
{connectorIncludesCredential && (
<TableHeaderCell>Credential</TableHeaderCell>
)}
<TableHeaderCell>Remove</TableHeaderCell>
</TableRow>
</TableHead>
<TableBody>
{connectorIndexingStatuses.map((connectorIndexingStatus) => {
const connector = connectorIndexingStatus.connector;
// const credential = connectorIndexingStatus.credential;
const hasValidCredentials =
liveCredential &&
connector.credential_ids.includes(liveCredential.id);
const credentialDisplay = connectorIncludesCredential ? (
hasValidCredentials ? (
<div className="max-w-sm truncate">
{getCredential(liveCredential)}
</div>
) : liveCredential ? (
<AttachCredentialButtonForTable
onClick={() => onCredentialLink(connector.id)}
/>
) : (
<p className="text-red-700">N/A</p>
)
) : (
"-"
);
return (
<TableRow key={connectorIndexingStatus.cc_pair_id}>
{includeName && (
<TableCell className="whitespace-normal break-all">
<p className="text font-medium">
{connectorIndexingStatus.name}
</p>
</TableCell>
)}
{specialColumns?.map(({ key, getValue }) => (
<TableCell key={key}>
{getValue(connectorIndexingStatus)}
</TableCell>
))}
<TableCell>
<StatusRow
connectorIndexingStatus={connectorIndexingStatus}
hasCredentialsIssue={
!hasValidCredentials && connectorIncludesCredential
}
setPopup={setPopup}
onUpdate={onUpdate}
/>
</TableCell>
<TableCell>
{connectorIndexingStatus.public_doc ? (
<FiCheck className="my-auto text-success" size="18" />
) : (
<FiXCircle className="my-auto text-error" />
)}
</TableCell>
{connectorIncludesCredential && (
<TableCell>{credentialDisplay}</TableCell>
)}
<TableCell>
<DeleteColumn
connectorIndexingStatus={connectorIndexingStatus}
setPopup={setPopup}
onUpdate={onUpdate}
/>
</TableCell>
</TableRow>
);
})}
</TableBody>
</Table>
</div>
);
}

View File

@ -1,60 +0,0 @@
import { InfoIcon, TrashIcon } from "@/components/icons/icons";
import {
deleteCCPair,
scheduleDeletionJobForConnector,
} from "@/lib/documentDeletion";
import { ConnectorIndexingStatus } from "@/lib/types";
import { PopupSpec } from "../Popup";
import { useState } from "react";
import { DeleteButton } from "@/components/DeleteButton";
interface Props<ConnectorConfigType, ConnectorCredentialType> {
connectorIndexingStatus: ConnectorIndexingStatus<
ConnectorConfigType,
ConnectorCredentialType
>;
setPopup: (popupSpec: PopupSpec | null) => void;
onUpdate: () => void;
}
export function DeleteColumn<ConnectorConfigType, ConnectorCredentialType>({
connectorIndexingStatus,
setPopup,
onUpdate,
}: Props<ConnectorConfigType, ConnectorCredentialType>) {
const [deleteHovered, setDeleteHovered] = useState<boolean>(false);
const connector = connectorIndexingStatus.connector;
const credential = connectorIndexingStatus.credential;
return (
<div
className="relative"
onMouseEnter={() => setDeleteHovered(true)}
onMouseLeave={() => setDeleteHovered(false)}
>
{connectorIndexingStatus.is_deletable ? (
<div className="cursor-pointer mx-auto flex">
<DeleteButton
onClick={() =>
deleteCCPair(connector.id, credential.id, setPopup, onUpdate)
}
/>
</div>
) : (
<div>
{deleteHovered && (
<div className="w-64 z-30 whitespace-normal flex absolute mt-8 top-0 left-0 bg-background border border-border px-3 py-2 rounded shadow-lg text-xs">
<InfoIcon className="flex flex-shrink-0 mr-2" />
In order to delete a connector it must be disabled and have no
ongoing / planned index jobs.
</div>
)}
<div className="flex mx-auto text-xs">
<DeleteButton disabled />
</div>
</div>
)}
</div>
);
}

View File

@ -1,174 +0,0 @@
import { DeletionAttemptSnapshot, ValidStatuses } from "@/lib/types";
import { usePopup } from "@/components/admin/connectors/Popup";
import { updateConnector } from "@/lib/connector";
import { AttachCredentialButtonForTable } from "@/components/admin/connectors/buttons/AttachCredentialButtonForTable";
import { scheduleDeletionJobForConnector } from "@/lib/documentDeletion";
import { ConnectorsTableProps } from "./ConnectorsTable";
import {
Table,
TableHead,
TableRow,
TableHeaderCell,
TableBody,
TableCell,
} from "@tremor/react";
import { DeleteButton } from "@/components/DeleteButton";
const SingleUseConnectorStatus = ({
indexingStatus,
deletionAttempt,
}: {
indexingStatus: ValidStatuses | null;
deletionAttempt: DeletionAttemptSnapshot | null;
}) => {
if (
deletionAttempt &&
(deletionAttempt.status === "PENDING" ||
deletionAttempt.status === "STARTED")
) {
return <div className="text-error">Deleting...</div>;
}
if (!indexingStatus || indexingStatus === "not_started") {
return <div>Not Started</div>;
}
if (indexingStatus === "in_progress") {
return <div>In Progress</div>;
}
if (indexingStatus === "success") {
return <div className="text-success">Success!</div>;
}
return <div className="text-error">Failed</div>;
};
export function SingleUseConnectorsTable<
ConnectorConfigType,
ConnectorCredentialType,
>({
connectorIndexingStatuses,
liveCredential,
getCredential,
specialColumns,
onUpdate,
onCredentialLink,
includeName = false,
}: ConnectorsTableProps<ConnectorConfigType, ConnectorCredentialType>) {
const { popup, setPopup } = usePopup();
const connectorIncludesCredential =
getCredential !== undefined && onCredentialLink !== undefined;
return (
<div>
{popup}
<Table className="overflow-visible">
<TableHead>
<TableRow>
{includeName && <TableHeaderCell>Name</TableHeaderCell>}
{specialColumns?.map(({ header }) => (
<TableHeaderCell key={header}>{header}</TableHeaderCell>
))}
<TableHeaderCell>Status</TableHeaderCell>
{connectorIncludesCredential && (
<TableHeaderCell>Credential</TableHeaderCell>
)}
<TableHeaderCell>Remove</TableHeaderCell>
</TableRow>
</TableHead>
<TableBody>
{connectorIndexingStatuses.map((connectorIndexingStatus) => {
const connector = connectorIndexingStatus.connector;
// const credential = connectorIndexingStatus.credential;
const hasValidCredentials =
liveCredential &&
connector.credential_ids.includes(liveCredential.id);
const credentialDisplay = connectorIncludesCredential ? (
hasValidCredentials ? (
<div className="max-w-sm truncate">
{getCredential(liveCredential)}
</div>
) : liveCredential ? (
<AttachCredentialButtonForTable
onClick={() => onCredentialLink(connector.id)}
/>
) : (
<p className="text-red-700">N/A</p>
)
) : (
"-"
);
return (
<TableRow key={connectorIndexingStatus.cc_pair_id}>
{includeName && (
<TableCell className="whitespace-normal break-all">
<p className="text font-medium">
{connectorIndexingStatus.name}
</p>
</TableCell>
)}
{specialColumns?.map(({ key, getValue }) => (
<TableCell className="max-w-sm" key={key}>
<div className="break-words whitespace-normal">
{getValue(connectorIndexingStatus)}
</div>
</TableCell>
))}
<TableCell>
<SingleUseConnectorStatus
indexingStatus={connectorIndexingStatus.last_status}
deletionAttempt={connectorIndexingStatus.deletion_attempt}
/>
</TableCell>
{connectorIncludesCredential && (
<TableCell>{credentialDisplay}</TableCell>
)}
<TableCell>
<div
className="cursor-pointer mx-auto flex"
onClick={async () => {
// for one-time, just disable the connector at deletion time
// this is required before deletion can happen
await updateConnector({
...connector,
disabled: !connector.disabled,
});
const deletionScheduleError =
await scheduleDeletionJobForConnector(
connector.id,
connectorIndexingStatus.credential.id
);
if (deletionScheduleError) {
setPopup({
message:
"Failed to schedule deletion of connector - " +
deletionScheduleError,
type: "error",
});
} else {
setPopup({
message: "Scheduled deletion of connector!",
type: "success",
});
}
setTimeout(() => {
setPopup(null);
}, 4000);
onUpdate();
}}
>
<DeleteButton />
</div>
</TableCell>
</TableRow>
);
})}
</TableBody>
</Table>
</div>
);
}

49
web/src/lib/ccPair.ts Normal file
View File

@ -0,0 +1,49 @@
import { ConnectorCredentialPairStatus } from "@/app/admin/connector/[ccPairId]/types";
import { PopupSpec } from "@/components/admin/connectors/Popup";
export async function setCCPairStatus(
ccPairId: number,
ccPairStatus: ConnectorCredentialPairStatus,
setPopup?: (popupSpec: PopupSpec | null) => void,
onUpdate?: () => void
) {
try {
const response = await fetch(
`/api/manage/admin/cc-pair/${ccPairId}/status`,
{
method: "PUT",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({ status: ccPairStatus }),
}
);
if (!response.ok) {
const errorMessage = (await response.json()).detail;
setPopup &&
setPopup({
message: "Failed to update connector status - " + errorMessage,
type: "error",
});
}
setPopup &&
setPopup({
message:
ccPairStatus === ConnectorCredentialPairStatus.ACTIVE
? "Enabled connector!"
: "Paused connector!",
type: "success",
});
onUpdate && onUpdate();
} catch (error) {
console.error("Error updating CC pair status:", error);
setPopup &&
setPopup({
message: "Failed to update connector status",
type: "error",
});
}
}

View File

@ -52,26 +52,6 @@ export async function updateConnector<T>(
return await response.json();
}
export async function disableConnector(
connector: Connector<any>,
setPopup: (popupSpec: PopupSpec | null) => void,
onUpdate: () => void
) {
updateConnector({
...connector,
disabled: !connector.disabled,
}).then(() => {
setPopup({
message: connector.disabled ? "Enabled connector!" : "Paused connector!",
type: "success",
});
setTimeout(() => {
setPopup(null);
}, 4000);
onUpdate && onUpdate();
});
}
export async function deleteConnector(
connectorId: number
): Promise<string | null> {

View File

@ -774,7 +774,6 @@ export interface ConnectorBase<T> {
refresh_freq: number | null;
prune_freq: number | null;
indexing_start: Date | null;
disabled: boolean;
}
export interface Connector<T> extends ConnectorBase<T> {

View File

@ -1,6 +1,7 @@
import { Persona } from "@/app/admin/assistants/interfaces";
import { Credential } from "./connectors/credentials";
import { Connector } from "./connectors/connectors";
import { ConnectorCredentialPairStatus } from "@/app/admin/connector/[ccPairId]/types";
export interface UserPreferences {
chosen_assistants: number[] | null;
@ -67,6 +68,7 @@ export interface ConnectorIndexingStatus<
> {
cc_pair_id: number;
name: string | null;
cc_pair_status: ConnectorCredentialPairStatus;
connector: Connector<ConnectorConfigType>;
credential: Credential<ConnectorCredentialType>;
public_doc: boolean;