From c0f381e47190e33fc06346e10135343a6c82114e Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Thu, 13 Feb 2025 00:44:55 -0800 Subject: [PATCH] Add background errors ability (#3982) --- ...39c5794c10a_add_background_errors_table.py | 40 ++++++++ .../confluence/group_sync.py | 27 +++++- .../tasks/external_group_syncing/tasks.py | 93 +++++++++++-------- backend/onyx/background/error_logging.py | 13 +++ backend/onyx/db/background_error.py | 10 ++ backend/onyx/db/models.py | 29 ++++++ 6 files changed, 166 insertions(+), 46 deletions(-) create mode 100644 backend/alembic/versions/f39c5794c10a_add_background_errors_table.py create mode 100644 backend/onyx/background/error_logging.py create mode 100644 backend/onyx/db/background_error.py diff --git a/backend/alembic/versions/f39c5794c10a_add_background_errors_table.py b/backend/alembic/versions/f39c5794c10a_add_background_errors_table.py new file mode 100644 index 000000000..475f1be81 --- /dev/null +++ b/backend/alembic/versions/f39c5794c10a_add_background_errors_table.py @@ -0,0 +1,40 @@ +"""Add background errors table + +Revision ID: f39c5794c10a +Revises: 2cdeff6d8c93 +Create Date: 2025-02-12 17:11:14.527876 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = "f39c5794c10a" +down_revision = "2cdeff6d8c93" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "background_error", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column("message", sa.String(), nullable=False), + sa.Column( + "time_created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=False, + ), + sa.Column("cc_pair_id", sa.Integer(), nullable=True), + sa.PrimaryKeyConstraint("id"), + sa.ForeignKeyConstraint( + ["cc_pair_id"], + ["connector_credential_pair.id"], + ondelete="CASCADE", + ), + ) + + +def downgrade() -> None: + op.drop_table("background_error") diff --git a/backend/ee/onyx/external_permissions/confluence/group_sync.py b/backend/ee/onyx/external_permissions/confluence/group_sync.py index fdd859deb..f4b17cb1b 100644 --- a/backend/ee/onyx/external_permissions/confluence/group_sync.py +++ b/backend/ee/onyx/external_permissions/confluence/group_sync.py @@ -1,5 +1,6 @@ from ee.onyx.db.external_perm import ExternalUserGroup from ee.onyx.external_permissions.confluence.constants import ALL_CONF_EMAILS_GROUP_NAME +from onyx.background.error_logging import emit_background_error from onyx.connectors.confluence.onyx_confluence import build_confluence_client from onyx.connectors.confluence.onyx_confluence import OnyxConfluence from onyx.connectors.confluence.utils import get_user_email_from_username__server @@ -10,7 +11,7 @@ logger = setup_logger() def _build_group_member_email_map( - confluence_client: OnyxConfluence, + confluence_client: OnyxConfluence, cc_pair_id: int ) -> dict[str, set[str]]: group_member_emails: dict[str, set[str]] = {} for user_result in confluence_client.paginated_cql_user_retrieval(): @@ -18,8 +19,11 @@ def _build_group_member_email_map( user = user_result.get("user", {}) if not user: - logger.warning(f"user result missing user field: {user_result}") + msg = f"user result missing user field: {user_result}" + emit_background_error(msg, cc_pair_id=cc_pair_id) + logger.error(msg) continue + email = user.get("email") if not email: # This field is only present in Confluence Server @@ -32,7 +36,12 @@ def _build_group_member_email_map( ) if not email: # If we still don't have an email, skip this user - logger.warning(f"user result missing email field: {user_result}") + msg = f"user result missing email field: {user_result}" + if user.get("type") == "app": + logger.warning(msg) + else: + emit_background_error(msg, cc_pair_id=cc_pair_id) + logger.error(msg) continue all_users_groups: set[str] = set() @@ -42,11 +51,18 @@ def _build_group_member_email_map( group_member_emails.setdefault(group_id, set()).add(email) all_users_groups.add(group_id) - if not group_member_emails: - logger.warning(f"No groups found for user with email: {email}") + if not all_users_groups: + msg = f"No groups found for user with email: {email}" + emit_background_error(msg, cc_pair_id=cc_pair_id) + logger.error(msg) else: logger.debug(f"Found groups {all_users_groups} for user with email {email}") + if not group_member_emails: + msg = "No groups found for any users." + emit_background_error(msg, cc_pair_id=cc_pair_id) + logger.error(msg) + return group_member_emails @@ -61,6 +77,7 @@ def confluence_group_sync( group_member_email_map = _build_group_member_email_map( confluence_client=confluence_client, + cc_pair_id=cc_pair.id, ) onyx_groups: list[ExternalUserGroup] = [] all_found_emails = set() diff --git a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py index 3c5dd700a..b72fd7e65 100644 --- a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py @@ -26,11 +26,11 @@ from ee.onyx.external_permissions.sync_params import ( from onyx.background.celery.apps.app_base import task_logger from onyx.background.celery.celery_redis import celery_find_task from onyx.background.celery.celery_redis import celery_get_unacked_task_ids +from onyx.background.error_logging import emit_background_error from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT -from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask @@ -72,18 +72,26 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool: """Returns boolean indicating if external group sync is due.""" if cc_pair.access_type != AccessType.SYNC: - return False - - # skip external group sync if not active - if cc_pair.status != ConnectorCredentialPairStatus.ACTIVE: + task_logger.error( + f"Recieved non-sync CC Pair {cc_pair.id} for external " + f"group sync. Actual access type: {cc_pair.access_type}" + ) return False if cc_pair.status == ConnectorCredentialPairStatus.DELETING: + task_logger.debug( + f"Skipping group sync for CC Pair {cc_pair.id} - " + f"CC Pair is being deleted" + ) return False # If there is not group sync function for the connector, we don't run the sync # This is fine because all sources dont necessarily have a concept of groups if not GROUP_PERMISSIONS_FUNC_MAP.get(cc_pair.connector.source): + task_logger.debug( + f"Skipping group sync for CC Pair {cc_pair.id} - " + f"no group sync function for {cc_pair.connector.source}" + ) return False # If the last sync is None, it has never been run so we run the sync @@ -125,6 +133,9 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool # these tasks should never overlap if not lock_beat.acquire(blocking=False): + task_logger.warning( + f"Failed to acquire beat lock for external group sync: {tenant_id}" + ) return None try: @@ -205,20 +216,12 @@ def try_creating_external_group_sync_task( redis_connector = RedisConnector(tenant_id, cc_pair_id) - LOCK_TIMEOUT = 30 - - lock: RedisLock = r.lock( - DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_generate_external_group_sync_tasks", - timeout=LOCK_TIMEOUT, - ) - - acquired = lock.acquire(blocking_timeout=LOCK_TIMEOUT / 2) - if not acquired: - return None - try: # Dont kick off a new sync if the previous one is still running if redis_connector.external_group_sync.fenced: + logger.warning( + f"Skipping external group sync for CC Pair {cc_pair_id} - already running." + ) return None redis_connector.external_group_sync.generator_clear() @@ -269,9 +272,6 @@ def try_creating_external_group_sync_task( f"Unexpected exception while trying to create external group sync task: cc_pair={cc_pair_id}" ) return None - finally: - if lock.owned(): - lock.release() return payload_id @@ -304,22 +304,26 @@ def connector_external_group_sync_generator_task( start = time.monotonic() while True: if time.monotonic() - start > CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT: - raise ValueError( + msg = ( f"connector_external_group_sync_generator_task - timed out waiting for fence to be ready: " f"fence={redis_connector.external_group_sync.fence_key}" ) + emit_background_error(msg, cc_pair_id=cc_pair_id) + raise ValueError(msg) if not redis_connector.external_group_sync.fenced: # The fence must exist - raise ValueError( + msg = ( f"connector_external_group_sync_generator_task - fence not found: " f"fence={redis_connector.external_group_sync.fence_key}" ) + emit_background_error(msg, cc_pair_id=cc_pair_id) + raise ValueError(msg) payload = redis_connector.external_group_sync.payload # The payload must exist if not payload: - raise ValueError( - "connector_external_group_sync_generator_task: payload invalid or not found" - ) + msg = "connector_external_group_sync_generator_task: payload invalid or not found" + emit_background_error(msg, cc_pair_id=cc_pair_id) + raise ValueError(msg) if payload.celery_task_id is None: logger.info( @@ -344,9 +348,9 @@ def connector_external_group_sync_generator_task( acquired = lock.acquire(blocking=False) if not acquired: - task_logger.warning( - f"External group sync task already running, exiting...: cc_pair={cc_pair_id}" - ) + msg = f"External group sync task already running, exiting...: cc_pair={cc_pair_id}" + emit_background_error(msg, cc_pair_id=cc_pair_id) + task_logger.error(msg) return None try: @@ -367,9 +371,9 @@ def connector_external_group_sync_generator_task( ext_group_sync_func = GROUP_PERMISSIONS_FUNC_MAP.get(source_type) if ext_group_sync_func is None: - raise ValueError( - f"No external group sync func found for {source_type} for cc_pair: {cc_pair_id}" - ) + msg = f"No external group sync func found for {source_type} for cc_pair: {cc_pair_id}" + emit_background_error(msg, cc_pair_id=cc_pair_id) + raise ValueError(msg) logger.info( f"Syncing external groups for {source_type} for cc_pair: {cc_pair_id}" @@ -400,9 +404,9 @@ def connector_external_group_sync_generator_task( sync_status=SyncStatus.SUCCESS, ) except Exception as e: - task_logger.exception( - f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}" - ) + msg = f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}" + task_logger.exception(msg) + emit_background_error(msg + f"\n\n{e}", cc_pair_id=cc_pair_id) with get_session_with_tenant(tenant_id) as db_session: update_sync_record_status( @@ -492,9 +496,11 @@ def validate_external_group_sync_fence( fence_key = key_bytes.decode("utf-8") cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key) if cc_pair_id_str is None: - task_logger.warning( + msg = ( f"validate_external_group_sync_fence - could not parse id from {fence_key}" ) + emit_background_error(msg) + task_logger.error(msg) return cc_pair_id = int(cc_pair_id_str) @@ -509,12 +515,14 @@ def validate_external_group_sync_fence( try: payload = redis_connector.external_group_sync.payload except ValidationError: - task_logger.exception( + msg = ( "validate_external_group_sync_fence - " "Resetting fence because fence schema is out of date: " f"cc_pair={cc_pair_id} " f"fence={fence_key}" ) + task_logger.exception(msg) + emit_background_error(msg, cc_pair_id=cc_pair_id) redis_connector.external_group_sync.reset() return @@ -551,12 +559,15 @@ def validate_external_group_sync_fence( # return # celery tasks don't exist and the active signal has expired, possibly due to a crash. Clean it up. - logger.warning( - "validate_external_group_sync_fence - " - "Resetting fence because no associated celery tasks were found: " - f"cc_pair={cc_pair_id} " - f"fence={fence_key} " - f"payload_id={payload.id}" + emit_background_error( + message=( + "validate_external_group_sync_fence - " + "Resetting fence because no associated celery tasks were found: " + f"cc_pair={cc_pair_id} " + f"fence={fence_key} " + f"payload_id={payload.id}" + ), + cc_pair_id=cc_pair_id, ) redis_connector.external_group_sync.reset() diff --git a/backend/onyx/background/error_logging.py b/backend/onyx/background/error_logging.py new file mode 100644 index 000000000..03eb6a655 --- /dev/null +++ b/backend/onyx/background/error_logging.py @@ -0,0 +1,13 @@ +from onyx.db.background_error import create_background_error +from onyx.db.engine import get_session_with_tenant + + +def emit_background_error( + message: str, + cc_pair_id: int | None = None, +) -> None: + """Currently just saves a row in the background_errors table. + + In the future, could create notifications based on the severity.""" + with get_session_with_tenant() as db_session: + create_background_error(db_session, message, cc_pair_id) diff --git a/backend/onyx/db/background_error.py b/backend/onyx/db/background_error.py new file mode 100644 index 000000000..3e9689ea4 --- /dev/null +++ b/backend/onyx/db/background_error.py @@ -0,0 +1,10 @@ +from sqlalchemy.orm import Session + +from onyx.db.models import BackgroundError + + +def create_background_error( + db_session: Session, message: str, cc_pair_id: int | None +) -> None: + db_session.add(BackgroundError(message=message, cc_pair_id=cc_pair_id)) + db_session.commit() diff --git a/backend/onyx/db/models.py b/backend/onyx/db/models.py index 4931b37de..1187f7aeb 100644 --- a/backend/onyx/db/models.py +++ b/backend/onyx/db/models.py @@ -483,6 +483,10 @@ class ConnectorCredentialPair(Base): primaryjoin="foreign(ConnectorCredentialPair.creator_id) == remote(User.id)", ) + background_errors: Mapped[list["BackgroundError"]] = relationship( + "BackgroundError", back_populates="cc_pair", cascade="all, delete-orphan" + ) + class Document(Base): __tablename__ = "document" @@ -2115,6 +2119,31 @@ class StandardAnswer(Base): ) +class BackgroundError(Base): + """Important background errors. Serves to: + 1. Ensure that important logs are kept around and not lost on rotation/container restarts + 2. A trail for high-signal events so that the debugger doesn't need to remember/know every + possible relevant log line. + """ + + __tablename__ = "background_error" + + id: Mapped[int] = mapped_column(primary_key=True) + message: Mapped[str] = mapped_column(String) + time_created: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + + # option to link the error to a specific CC Pair + cc_pair_id: Mapped[int | None] = mapped_column( + ForeignKey("connector_credential_pair.id", ondelete="CASCADE"), nullable=True + ) + + cc_pair: Mapped["ConnectorCredentialPair | None"] = relationship( + "ConnectorCredentialPair", back_populates="background_errors" + ) + + """Tables related to Permission Sync"""