Files
danswer/backend/onyx/redis/redis_connector_utils.py
rkuo-danswer 392b87fb4f Bugfix/limit permission size (#4695)
* add utility function

* add utility functions to DocExternalAccess

* refactor db access out of individual celery tasks and put it directly into the heavy task

* code review and remove leftovers

* fix circular imports

---------

Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
2025-05-13 00:46:31 +00:00

61 lines
1.8 KiB
Python

from sqlalchemy.orm import Session
from onyx.db.connector_credential_pair import get_connector_credential_pair
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.enums import TaskStatus
from onyx.db.models import TaskQueueState
from onyx.redis.redis_connector import RedisConnector
from onyx.server.documents.models import DeletionAttemptSnapshot
def _get_deletion_status(
connector_id: int,
credential_id: int,
db_session: Session,
tenant_id: str,
) -> TaskQueueState | None:
"""We no longer store TaskQueueState in the DB for a deletion attempt.
This function populates TaskQueueState by just checking redis.
"""
cc_pair = get_connector_credential_pair(
connector_id=connector_id, credential_id=credential_id, db_session=db_session
)
if not cc_pair:
return None
redis_connector = RedisConnector(tenant_id, cc_pair.id)
if redis_connector.delete.fenced:
return TaskQueueState(
task_id="",
task_name=redis_connector.delete.fence_key,
status=TaskStatus.STARTED,
)
if cc_pair.status == ConnectorCredentialPairStatus.DELETING:
return TaskQueueState(
task_id="",
task_name=redis_connector.delete.fence_key,
status=TaskStatus.PENDING,
)
return None
def get_deletion_attempt_snapshot(
connector_id: int,
credential_id: int,
db_session: Session,
tenant_id: str,
) -> DeletionAttemptSnapshot | None:
deletion_task = _get_deletion_status(
connector_id, credential_id, db_session, tenant_id
)
if not deletion_task:
return None
return DeletionAttemptSnapshot(
connector_id=connector_id,
credential_id=credential_id,
status=deletion_task.status,
)