Related permission docs to cc_pair to prevent orphan docs ()

* Related permission docs to cc_pair to prevent orphan docs

* added script

* group sync deduping

* logging
This commit is contained in:
hagen-danswer 2024-12-04 13:00:54 -08:00 committed by GitHub
parent 993acec5e9
commit ef9942b751
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 152 additions and 9 deletions
backend
danswer
access
background/celery/tasks
doc_permission_syncing
external_group_syncing
connectors/confluence
redis
ee/danswer
scripts

@ -18,6 +18,11 @@ class ExternalAccess:
@dataclass(frozen=True)
class DocExternalAccess:
"""
This is just a class to wrap the external access and the document ID
together. It's used for syncing document permissions to Redis.
"""
external_access: ExternalAccess
# The document ID
doc_id: str

@ -22,6 +22,7 @@ from danswer.configs.constants import DanswerCeleryTask
from danswer.configs.constants import DanswerRedisLocks
from danswer.configs.constants import DocumentSource
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.document import upsert_document_by_connector_credential_pair
from danswer.db.engine import get_session_with_tenant
from danswer.db.enums import AccessType
from danswer.db.enums import ConnectorCredentialPairStatus
@ -262,7 +263,12 @@ def connector_permission_sync_generator_task(
f"RedisConnector.permissions.generate_tasks starting. cc_pair={cc_pair_id}"
)
tasks_generated = redis_connector.permissions.generate_tasks(
self.app, lock, document_external_accesses, source_type
celery_app=self.app,
lock=lock,
new_permissions=document_external_accesses,
source_string=source_type,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
)
if tasks_generated is None:
return None
@ -298,6 +304,8 @@ def update_external_document_permissions_task(
tenant_id: str | None,
serialized_doc_external_access: dict,
source_string: str,
connector_id: int,
credential_id: int,
) -> bool:
document_external_access = DocExternalAccess.from_dict(
serialized_doc_external_access
@ -306,18 +314,28 @@ def update_external_document_permissions_task(
external_access = document_external_access.external_access
try:
with get_session_with_tenant(tenant_id) as db_session:
# Then we build the update requests to update vespa
# Add the users to the DB if they don't exist
batch_add_ext_perm_user_if_not_exists(
db_session=db_session,
emails=list(external_access.external_user_emails),
)
upsert_document_external_perms(
# Then we upsert the document's external permissions in postgres
created_new_doc = upsert_document_external_perms(
db_session=db_session,
doc_id=doc_id,
external_access=external_access,
source_type=DocumentSource(source_string),
)
if created_new_doc:
# If a new document was created, we associate it with the cc_pair
upsert_document_by_connector_credential_pair(
db_session=db_session,
connector_id=connector_id,
credential_id=credential_id,
document_ids=[doc_id],
)
logger.debug(
f"Successfully synced postgres document permissions for {doc_id}"
)

@ -32,10 +32,14 @@ from danswer.redis.redis_connector_ext_group_sync import (
from danswer.redis.redis_pool import get_redis_client
from danswer.utils.logger import setup_logger
from ee.danswer.db.connector_credential_pair import get_all_auto_sync_cc_pairs
from ee.danswer.db.connector_credential_pair import get_cc_pairs_by_source
from ee.danswer.db.external_perm import ExternalUserGroup
from ee.danswer.db.external_perm import replace_user__ext_group_for_cc_pair
from ee.danswer.external_permissions.sync_params import EXTERNAL_GROUP_SYNC_PERIODS
from ee.danswer.external_permissions.sync_params import GROUP_PERMISSIONS_FUNC_MAP
from ee.danswer.external_permissions.sync_params import (
GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC,
)
logger = setup_logger()
@ -107,6 +111,22 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> None:
with get_session_with_tenant(tenant_id) as db_session:
cc_pairs = get_all_auto_sync_cc_pairs(db_session)
# We only want to sync one cc_pair per source type in
# GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC
for source in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC:
# These are ordered by cc_pair id so the first one is the one we want
cc_pairs_to_dedupe = get_cc_pairs_by_source(
db_session, source, only_sync=True
)
# We only want to sync one cc_pair per source type
# in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC so we dedupe here
for cc_pair_to_remove in cc_pairs_to_dedupe[1:]:
cc_pairs = [
cc_pair
for cc_pair in cc_pairs
if cc_pair.id != cc_pair_to_remove.id
]
for cc_pair in cc_pairs:
if _is_external_group_sync_due(cc_pair):
cc_pair_ids_to_sync.append(cc_pair.id)

@ -32,7 +32,11 @@ def get_user_email_from_username__server(
response = confluence_client.get_mobile_parameters(user_name)
email = response.get("email")
except Exception:
email = None
# For now, we'll just return a string that indicates failure
# We may want to revert to returning None in the future
# email = None
email = f"FAILED TO GET CONFLUENCE EMAIL FOR {user_name}"
logger.warning(f"failed to get confluence email for {user_name}")
_USER_EMAIL_CACHE[user_name] = email
return _USER_EMAIL_CACHE[user_name]

@ -133,6 +133,8 @@ class RedisConnectorPermissionSync:
lock: RedisLock | None,
new_permissions: list[DocExternalAccess],
source_string: str,
connector_id: int,
credential_id: int,
) -> int | None:
last_lock_time = time.monotonic()
async_results = []
@ -155,6 +157,8 @@ class RedisConnectorPermissionSync:
tenant_id=self.tenant_id,
serialized_doc_external_access=doc_perm.to_dict(),
source_string=source_string,
connector_id=connector_id,
credential_id=credential_id,
),
queue=DanswerCeleryQueues.DOC_PERMISSIONS_UPSERT,
task_id=custom_task_id,

@ -37,10 +37,15 @@ def get_cc_pairs_by_source(
source_type: DocumentSource,
only_sync: bool,
) -> list[ConnectorCredentialPair]:
"""
Get all cc_pairs for a given source type (and optionally only sync)
result is sorted by cc_pair id
"""
query = (
db_session.query(ConnectorCredentialPair)
.join(ConnectorCredentialPair.connector)
.filter(Connector.source == source_type)
.order_by(ConnectorCredentialPair.id)
)
if only_sync:

@ -55,9 +55,10 @@ def upsert_document_external_perms(
doc_id: str,
external_access: ExternalAccess,
source_type: DocumentSource,
) -> None:
) -> bool:
"""
This sets the permissions for a document in postgres.
This sets the permissions for a document in postgres. Returns True if the
a new document was created, False otherwise.
NOTE: this will replace any existing external access, it will not do a union
"""
document = db_session.scalars(
@ -85,7 +86,7 @@ def upsert_document_external_perms(
)
db_session.add(document)
db_session.commit()
return
return True
# If the document exists, we need to check if the external access has changed
if (
@ -98,3 +99,5 @@ def upsert_document_external_perms(
document.is_public = external_access.is_public
document.last_modified = datetime.now(timezone.utc)
db_session.commit()
return False

@ -48,6 +48,11 @@ GROUP_PERMISSIONS_FUNC_MAP: dict[DocumentSource, GroupSyncFuncType] = {
}
GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC: set[DocumentSource] = {
DocumentSource.CONFLUENCE,
}
# If nothing is specified here, we run the doc_sync every time the celery beat runs
DOC_PERMISSION_SYNC_PERIODS: dict[DocumentSource, int] = {
# Polling is not supported so we fetch all doc permissions every 5 minutes
@ -57,9 +62,9 @@ DOC_PERMISSION_SYNC_PERIODS: dict[DocumentSource, int] = {
# If nothing is specified here, we run the doc_sync every time the celery beat runs
EXTERNAL_GROUP_SYNC_PERIODS: dict[DocumentSource, int] = {
# Polling is not supported so we fetch all group permissions every 5 minutes
# Polling is not supported so we fetch all group permissions every 30 minutes
DocumentSource.GOOGLE_DRIVE: 5 * 60,
DocumentSource.CONFLUENCE: 5 * 60,
DocumentSource.CONFLUENCE: 30 * 60,
}

@ -0,0 +1,79 @@
import os
import sys
from sqlalchemy import text
from sqlalchemy.orm import Session
# makes it so `PYTHONPATH=.` is not required when running this script
parent_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(parent_dir)
from danswer.db.engine import get_session_context_manager # noqa: E402
from danswer.db.document import delete_documents_complete__no_commit # noqa: E402
from danswer.db.search_settings import get_current_search_settings # noqa: E402
from danswer.document_index.vespa.index import VespaIndex # noqa: E402
from danswer.background.celery.tasks.shared.RetryDocumentIndex import ( # noqa: E402
RetryDocumentIndex,
)
def _get_orphaned_document_ids(db_session: Session) -> list[str]:
"""Get document IDs that don't have any entries in document_by_connector_credential_pair"""
query = text(
"""
SELECT d.id
FROM document d
LEFT JOIN document_by_connector_credential_pair dbcc ON d.id = dbcc.id
WHERE dbcc.id IS NULL
"""
)
orphaned_ids = [doc_id[0] for doc_id in db_session.execute(query)]
print(f"Found {len(orphaned_ids)} orphaned documents")
return orphaned_ids
def main() -> None:
with get_session_context_manager() as db_session:
# Get orphaned document IDs
orphaned_ids = _get_orphaned_document_ids(db_session)
if not orphaned_ids:
print("No orphaned documents found")
return
# Setup Vespa index
search_settings = get_current_search_settings(db_session)
index_name = search_settings.index_name
vespa_index = VespaIndex(index_name=index_name, secondary_index_name=None)
retry_index = RetryDocumentIndex(vespa_index)
# Delete chunks from Vespa first
print("Deleting orphaned document chunks from Vespa")
successfully_vespa_deleted_doc_ids = []
for doc_id in orphaned_ids:
try:
chunks_deleted = retry_index.delete_single(doc_id)
successfully_vespa_deleted_doc_ids.append(doc_id)
if chunks_deleted > 0:
print(f"Deleted {chunks_deleted} chunks for document {doc_id}")
except Exception as e:
print(
f"Error deleting document {doc_id} in Vespa and will not delete from Postgres: {e}"
)
# Delete documents from Postgres
print("Deleting orphaned documents from Postgres")
try:
delete_documents_complete__no_commit(
db_session, successfully_vespa_deleted_doc_ids
)
db_session.commit()
except Exception as e:
print(f"Error deleting documents from Postgres: {e}")
print(
f"Successfully cleaned up {len(successfully_vespa_deleted_doc_ids)} orphaned documents"
)
if __name__ == "__main__":
main()