Fix Undeleteable Connectors (#1507)

This commit is contained in:
Yuhong Sun 2024-05-23 23:44:32 -07:00 committed by GitHub
parent 57452b1030
commit 6e5d9f33d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 78 additions and 92 deletions

View File

@ -0,0 +1,31 @@
"""Remove Last Attempt Status from CC Pair
Revision ID: ec85f2b3c544
Revises: 3879338f8ba1
Create Date: 2024-05-23 21:39:46.126010
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "ec85f2b3c544"
down_revision = "3879338f8ba1"
branch_labels: None = None
depends_on: None = None
def upgrade() -> None:
op.drop_column("connector_credential_pair", "last_attempt_status")
def downgrade() -> None:
op.add_column(
"connector_credential_pair",
sa.Column(
"last_attempt_status",
sa.VARCHAR(),
autoincrement=False,
nullable=True,
),
)

View File

@ -68,7 +68,9 @@ def cleanup_connector_credential_pair_task(
f"{connector_id} and Credential ID: {credential_id} does not exist." f"{connector_id} and Credential ID: {credential_id} does not exist."
) )
deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(cc_pair) deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(
connector_credential_pair=cc_pair, db_session=db_session
)
if deletion_attempt_disallowed_reason: if deletion_attempt_disallowed_reason:
raise ValueError(deletion_attempt_disallowed_reason) raise ValueError(deletion_attempt_disallowed_reason)

View File

@ -160,6 +160,7 @@ def _run_indexing(
source_type=db_connector.source, source_type=db_connector.source,
) )
): ):
try:
window_start = max( window_start = max(
window_start - timedelta(minutes=POLL_CONNECTOR_OFFSET), window_start - timedelta(minutes=POLL_CONNECTOR_OFFSET),
datetime(1970, 1, 1, tzinfo=timezone.utc), datetime(1970, 1, 1, tzinfo=timezone.utc),
@ -172,7 +173,6 @@ def _run_indexing(
end_time=window_end, end_time=window_end,
) )
try:
all_connector_doc_ids: set[str] = set() all_connector_doc_ids: set[str] = set()
for doc_batch in doc_batch_generator: for doc_batch in doc_batch_generator:
# Check if connector is disabled mid run and stop if so unless it's the secondary # Check if connector is disabled mid run and stop if so unless it's the secondary
@ -263,7 +263,6 @@ def _run_indexing(
db_session=db_session, db_session=db_session,
connector_id=db_connector.id, connector_id=db_connector.id,
credential_id=db_credential.id, credential_id=db_credential.id,
attempt_status=IndexingStatus.IN_PROGRESS,
net_docs=net_doc_change, net_docs=net_doc_change,
run_dt=run_end_dt, run_dt=run_end_dt,
) )
@ -294,7 +293,6 @@ def _run_indexing(
db_session=db_session, db_session=db_session,
connector_id=index_attempt.connector.id, connector_id=index_attempt.connector.id,
credential_id=index_attempt.credential.id, credential_id=index_attempt.credential.id,
attempt_status=IndexingStatus.FAILED,
net_docs=net_doc_change, net_docs=net_doc_change,
) )
raise e raise e
@ -309,7 +307,6 @@ def _run_indexing(
db_session=db_session, db_session=db_session,
connector_id=db_connector.id, connector_id=db_connector.id,
credential_id=db_credential.id, credential_id=db_credential.id,
attempt_status=IndexingStatus.SUCCESS,
run_dt=run_end_dt, run_dt=run_end_dt,
) )
@ -343,15 +340,7 @@ def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexA
# only commit once, to make sure this all happens in a single transaction # only commit once, to make sure this all happens in a single transaction
mark_attempt_in_progress__no_commit(attempt) mark_attempt_in_progress__no_commit(attempt)
is_primary = attempt.embedding_model.status == IndexModelStatus.PRESENT if attempt.embedding_model.status != IndexModelStatus.PRESENT:
if is_primary:
update_connector_credential_pair(
db_session=db_session,
connector_id=attempt.connector.id,
credential_id=attempt.credential.id,
attempt_status=IndexingStatus.IN_PROGRESS,
)
else:
db_session.commit() db_session.commit()
return attempt return attempt

View File

@ -17,8 +17,6 @@ from danswer.configs.app_configs import DASK_JOB_CLIENT_ENABLED
from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP from danswer.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP
from danswer.configs.app_configs import NUM_INDEXING_WORKERS from danswer.configs.app_configs import NUM_INDEXING_WORKERS
from danswer.db.connector import fetch_connectors from danswer.db.connector import fetch_connectors
from danswer.db.connector_credential_pair import mark_all_in_progress_cc_pairs_failed
from danswer.db.connector_credential_pair import update_connector_credential_pair
from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.embedding_model import get_secondary_db_embedding_model from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.engine import get_db_current_time from danswer.db.engine import get_db_current_time
@ -119,17 +117,6 @@ def _mark_run_failed(
db_session=db_session, db_session=db_session,
failure_reason=failure_reason, failure_reason=failure_reason,
) )
if (
index_attempt.connector_id is not None
and index_attempt.credential_id is not None
and index_attempt.embedding_model.status == IndexModelStatus.PRESENT
):
update_connector_credential_pair(
db_session=db_session,
connector_id=index_attempt.connector_id,
credential_id=index_attempt.credential_id,
attempt_status=IndexingStatus.FAILED,
)
"""Main funcs""" """Main funcs"""
@ -192,16 +179,6 @@ def create_indexing_jobs(existing_jobs: dict[int, Future | SimpleJob]) -> None:
connector.id, credential.id, model.id, db_session connector.id, credential.id, model.id, db_session
) )
# CC-Pair will have the status that it should for the primary index
# Will be re-sync-ed once the indices are swapped
if model.status == IndexModelStatus.PRESENT:
update_connector_credential_pair(
db_session=db_session,
connector_id=connector.id,
credential_id=credential.id,
attempt_status=IndexingStatus.NOT_STARTED,
)
def cleanup_indexing_jobs( def cleanup_indexing_jobs(
existing_jobs: dict[int, Future | SimpleJob], existing_jobs: dict[int, Future | SimpleJob],
@ -391,11 +368,6 @@ def update_loop(delay: int = 10, num_workers: int = NUM_INDEXING_WORKERS) -> Non
existing_jobs: dict[int, Future | SimpleJob] = {} existing_jobs: dict[int, Future | SimpleJob] = {}
with Session(engine) as db_session:
# Previous version did not always clean up cc-pairs well leaving some connectors undeleteable
# This ensures that bad states get cleaned up
mark_all_in_progress_cc_pairs_failed(db_session)
while True: while True:
start = time.time() start = time.time()
start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S") start_time_utc = datetime.utcfromtimestamp(start).strftime("%Y-%m-%d %H:%M:%S")

View File

@ -193,7 +193,7 @@ class DiscourseConnector(PollConnector):
start=start_datetime, end=end_datetime start=start_datetime, end=end_datetime
) )
return self._yield_discourse_documents(latest_topic_ids) yield from self._yield_discourse_documents(latest_topic_ids)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -207,7 +207,7 @@ class ProductboardConnector(PollConnector):
): ):
return True return True
else: else:
logger.error(f"Unable to find updated_at for document '{document.id}'") logger.debug(f"Unable to find updated_at for document '{document.id}'")
return False return False

View File

@ -4,7 +4,6 @@ from fastapi import HTTPException
from sqlalchemy import delete from sqlalchemy import delete
from sqlalchemy import desc from sqlalchemy import desc
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy import update
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from danswer.db.connector import fetch_connector_by_id from danswer.db.connector import fetch_connector_by_id
@ -96,7 +95,6 @@ def update_connector_credential_pair(
db_session: Session, db_session: Session,
connector_id: int, connector_id: int,
credential_id: int, credential_id: int,
attempt_status: IndexingStatus,
net_docs: int | None = None, net_docs: int | None = None,
run_dt: datetime | None = None, run_dt: datetime | None = None,
) -> None: ) -> None:
@ -107,13 +105,9 @@ def update_connector_credential_pair(
f"and credential id {credential_id}" f"and credential id {credential_id}"
) )
return return
cc_pair.last_attempt_status = attempt_status
# simply don't update last_successful_index_time if run_dt is not specified # simply don't update last_successful_index_time if run_dt is not specified
# at worst, this would result in re-indexing documents that were already indexed # at worst, this would result in re-indexing documents that were already indexed
if ( if run_dt is not None:
attempt_status == IndexingStatus.SUCCESS
or attempt_status == IndexingStatus.IN_PROGRESS
) and run_dt is not None:
cc_pair.last_successful_index_time = run_dt cc_pair.last_successful_index_time = run_dt
if net_docs is not None: if net_docs is not None:
cc_pair.total_docs_indexed += net_docs cc_pair.total_docs_indexed += net_docs
@ -132,20 +126,6 @@ def delete_connector_credential_pair__no_commit(
db_session.execute(stmt) db_session.execute(stmt)
def mark_all_in_progress_cc_pairs_failed(
db_session: Session,
) -> None:
stmt = (
update(ConnectorCredentialPair)
.where(
ConnectorCredentialPair.last_attempt_status == IndexingStatus.IN_PROGRESS
)
.values(last_attempt_status=IndexingStatus.FAILED)
)
db_session.execute(stmt)
db_session.commit()
def associate_default_cc_pair(db_session: Session) -> None: def associate_default_cc_pair(db_session: Session) -> None:
existing_association = ( existing_association = (
db_session.query(ConnectorCredentialPair) db_session.query(ConnectorCredentialPair)
@ -297,12 +277,4 @@ def resync_cc_pair(
last_success.time_started if last_success else None last_success.time_started if last_success else None
) )
last_run = find_latest_index_attempt(
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
only_include_success=False,
db_session=db_session,
)
cc_pair.last_attempt_status = last_run.status if last_run else None
db_session.commit() db_session.commit()

View File

@ -1,9 +1,14 @@
from sqlalchemy.orm import Session
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.index_attempt import get_last_attempt
from danswer.db.models import ConnectorCredentialPair from danswer.db.models import ConnectorCredentialPair
from danswer.db.models import IndexingStatus from danswer.db.models import IndexingStatus
def check_deletion_attempt_is_allowed( def check_deletion_attempt_is_allowed(
connector_credential_pair: ConnectorCredentialPair, connector_credential_pair: ConnectorCredentialPair,
db_session: Session,
allow_scheduled: bool = False, allow_scheduled: bool = False,
) -> str | None: ) -> str | None:
""" """
@ -21,9 +26,22 @@ def check_deletion_attempt_is_allowed(
if not connector_credential_pair.connector.disabled: if not connector_credential_pair.connector.disabled:
return base_error_msg + " Connector must be paused." return base_error_msg + " Connector must be paused."
if connector_credential_pair.last_attempt_status == IndexingStatus.IN_PROGRESS or ( connector_id = connector_credential_pair.connector_id
connector_credential_pair.last_attempt_status == IndexingStatus.NOT_STARTED credential_id = connector_credential_pair.credential_id
and not allow_scheduled current_embedding_model = get_current_db_embedding_model(db_session)
last_indexing = get_last_attempt(
connector_id=connector_id,
credential_id=credential_id,
embedding_model_id=current_embedding_model.id,
db_session=db_session,
)
if not last_indexing:
return None
if last_indexing.status == IndexingStatus.IN_PROGRESS or (
last_indexing.status == IndexingStatus.NOT_STARTED and not allow_scheduled
): ):
return ( return (
base_error_msg base_error_msg

View File

@ -270,9 +270,6 @@ class ConnectorCredentialPair(Base):
last_successful_index_time: Mapped[datetime.datetime | None] = mapped_column( last_successful_index_time: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True), default=None DateTime(timezone=True), default=None
) )
last_attempt_status: Mapped[IndexingStatus | None] = mapped_column(
Enum(IndexingStatus, native_enum=False)
)
total_docs_indexed: Mapped[int] = mapped_column(Integer, default=0) total_docs_indexed: Mapped[int] = mapped_column(Integer, default=0)
connector: Mapped["Connector"] = relationship( connector: Mapped["Connector"] = relationship(

View File

@ -418,7 +418,9 @@ def get_connector_indexing_status(
credential=CredentialSnapshot.from_credential_db_model(credential), credential=CredentialSnapshot.from_credential_db_model(credential),
public_doc=cc_pair.is_public, public_doc=cc_pair.is_public,
owner=credential.user.email if credential.user else "", owner=credential.user.email if credential.user else "",
last_status=cc_pair.last_attempt_status, last_status=latest_index_attempt.status
if latest_index_attempt
else None,
last_success=cc_pair.last_successful_index_time, last_success=cc_pair.last_successful_index_time,
docs_indexed=cc_pair_to_document_cnt.get( docs_indexed=cc_pair_to_document_cnt.get(
(connector.id, credential.id), 0 (connector.id, credential.id), 0
@ -438,6 +440,7 @@ def get_connector_indexing_status(
), ),
is_deletable=check_deletion_attempt_is_allowed( is_deletable=check_deletion_attempt_is_allowed(
connector_credential_pair=cc_pair, connector_credential_pair=cc_pair,
db_session=db_session,
# allow scheduled indexing attempts here, since on deletion request we will cancel them # allow scheduled indexing attempts here, since on deletion request we will cancel them
allow_scheduled=True, allow_scheduled=True,
) )

View File

@ -175,7 +175,9 @@ def create_deletion_attempt_for_connector_id(
) )
# Check if the deletion attempt should be allowed # Check if the deletion attempt should be allowed
deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(cc_pair) deletion_attempt_disallowed_reason = check_deletion_attempt_is_allowed(
connector_credential_pair=cc_pair, db_session=db_session
)
if deletion_attempt_disallowed_reason: if deletion_attempt_disallowed_reason:
raise HTTPException( raise HTTPException(
status_code=400, status_code=400,