diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 5696d75bcac7..4f37bc48fafc 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -84,8 +84,10 @@ def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: def on_worker_init(sender: Worker, **kwargs: Any) -> None: logger.info("worker_init signal received.") + EXTRA_CONCURRENCY = 4 # small extra fudge factor for connection limits + SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME) - SqlEngine.init_engine(pool_size=8, max_overflow=0) + SqlEngine.init_engine(pool_size=sender.concurrency, max_overflow=EXTRA_CONCURRENCY) # type: ignore app_base.wait_for_redis(sender, **kwargs) app_base.wait_for_db(sender, **kwargs) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 0309611acd7f..e87ea9d49f7f 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -18,7 +18,7 @@ BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) # hack to slow down task dispatch in the cloud until # we have a better implementation (backpressure, etc) -CLOUD_BEAT_SCHEDULE_MULTIPLIER = 4 +CLOUD_BEAT_SCHEDULE_MULTIPLIER = 8 # tasks that run in either self-hosted on cloud beat_task_templates: list[dict] = [] diff --git a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py index 003742f6b110..17dfa4282b03 100644 --- a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py @@ -186,7 +186,7 @@ def try_generate_document_cc_pair_cleanup_tasks( sync_type=SyncType.CONNECTOR_DELETION, ) except Exception: - pass + task_logger.exception("insert_sync_record exceptioned.") except TaskDependencyError: redis_connector.delete.set_fence(None) diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index 748e62fc86e3..00e1f81787a4 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -228,12 +228,15 @@ def try_creating_permissions_sync_task( # create before setting fence to avoid race condition where the monitoring # task updates the sync record before it is created - with get_session_with_tenant(tenant_id) as db_session: - insert_sync_record( - db_session=db_session, - entity_id=cc_pair_id, - sync_type=SyncType.EXTERNAL_PERMISSIONS, - ) + try: + with get_session_with_tenant(tenant_id) as db_session: + insert_sync_record( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.EXTERNAL_PERMISSIONS, + ) + except Exception: + task_logger.exception("insert_sync_record exceptioned.") # set a basic fence to start redis_connector.permissions.set_active() @@ -257,11 +260,10 @@ def try_creating_permissions_sync_task( ) # fill in the celery task id - redis_connector.permissions.set_active() payload.celery_task_id = result.id redis_connector.permissions.set_fence(payload) - payload_id = payload.celery_task_id + payload_id = payload.id except Exception: task_logger.exception(f"Unexpected exception: cc_pair={cc_pair_id}") return None @@ -290,6 +292,8 @@ def connector_permission_sync_generator_task( This task assumes that the task has already been properly fenced """ + payload_id: str | None = None + LoggerContextVars.reset() doc_permission_sync_ctx_dict = doc_permission_sync_ctx.get() @@ -332,9 +336,12 @@ def connector_permission_sync_generator_task( sleep(1) continue + payload_id = payload.id + logger.info( f"connector_permission_sync_generator_task - Fence found, continuing...: " - f"fence={redis_connector.permissions.fence_key}" + f"fence={redis_connector.permissions.fence_key} " + f"payload_id={payload.id}" ) break @@ -413,7 +420,9 @@ def connector_permission_sync_generator_task( redis_connector.permissions.generator_complete = tasks_generated except Exception as e: - task_logger.exception(f"Failed to run permission sync: cc_pair={cc_pair_id}") + task_logger.exception( + f"Permission sync exceptioned: cc_pair={cc_pair_id} payload_id={payload_id}" + ) redis_connector.permissions.generator_clear() redis_connector.permissions.taskset_clear() @@ -423,6 +432,10 @@ def connector_permission_sync_generator_task( if lock.owned(): lock.release() + task_logger.info( + f"Permission sync finished: cc_pair={cc_pair_id} payload_id={payload.id}" + ) + @shared_task( name=OnyxCeleryTask.UPDATE_EXTERNAL_DOCUMENT_PERMISSIONS_TASK, @@ -659,7 +672,7 @@ def validate_permission_sync_fence( f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}" ) - # we're only active if tasks_scanned > 0 and tasks_not_in_celery == 0 + # we're active if there are still tasks to run and those tasks all exist in celery if tasks_scanned > 0 and tasks_not_in_celery == 0: redis_connector.permissions.set_active() return @@ -680,7 +693,8 @@ def validate_permission_sync_fence( "validate_permission_sync_fence - " "Resetting fence because no associated celery tasks were found: " f"cc_pair={cc_pair_id} " - f"fence={fence_key}" + f"fence={fence_key} " + f"payload_id={payload.id}" ) redis_connector.permissions.reset() diff --git a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py index 85465f05a7e7..3c5dd700a41e 100644 --- a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py @@ -2,15 +2,17 @@ import time from datetime import datetime from datetime import timedelta from datetime import timezone +from typing import Any +from typing import cast from uuid import uuid4 from celery import Celery from celery import shared_task from celery import Task from celery.exceptions import SoftTimeLimitExceeded +from pydantic import ValidationError from redis import Redis from redis.lock import Lock as RedisLock -from sqlalchemy.orm import Session from ee.onyx.db.connector_credential_pair import get_all_auto_sync_cc_pairs from ee.onyx.db.connector_credential_pair import get_cc_pairs_by_source @@ -32,7 +34,9 @@ from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks +from onyx.configs.constants import OnyxRedisSignals from onyx.db.connector import mark_cc_pair_as_external_group_synced from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.engine import get_session_with_tenant @@ -49,7 +53,8 @@ from onyx.redis.redis_connector_ext_group_sync import ( RedisConnectorExternalGroupSyncPayload, ) from onyx.redis.redis_pool import get_redis_client -from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT +from onyx.redis.redis_pool import get_redis_replica_client +from onyx.server.utils import make_short_id from onyx.utils.logger import setup_logger logger = setup_logger() @@ -107,11 +112,11 @@ def _is_external_group_sync_due(cc_pair: ConnectorCredentialPair) -> bool: bind=True, ) def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool | None: - r = get_redis_client(tenant_id=tenant_id) - # we need to use celery's redis client to access its redis data # (which lives on a different db number) - # r_celery: Redis = self.app.broker_connection().channel().client # type: ignore + r = get_redis_client(tenant_id=tenant_id) + r_replica = get_redis_replica_client(tenant_id=tenant_id) + r_celery: Redis = self.app.broker_connection().channel().client # type: ignore lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK, @@ -149,30 +154,32 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool lock_beat.reacquire() for cc_pair_id in cc_pair_ids_to_sync: - tasks_created = try_creating_external_group_sync_task( + payload_id = try_creating_external_group_sync_task( self.app, cc_pair_id, r, tenant_id ) - if not tasks_created: + if not payload_id: continue - task_logger.info(f"External group sync queued: cc_pair={cc_pair_id}") + task_logger.info( + f"External group sync queued: cc_pair={cc_pair_id} id={payload_id}" + ) # we want to run this less frequently than the overall task - # lock_beat.reacquire() - # if not r.exists(OnyxRedisSignals.VALIDATE_EXTERNAL_GROUP_SYNC_FENCES): - # # clear any indexing fences that don't have associated celery tasks in progress - # # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), - # # or be currently executing - # try: - # validate_external_group_sync_fences( - # tenant_id, self.app, r, r_celery, lock_beat - # ) - # except Exception: - # task_logger.exception( - # "Exception while validating external group sync fences" - # ) + lock_beat.reacquire() + if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES): + # clear fences that don't have associated celery tasks in progress + # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), + # or be currently executing + try: + validate_external_group_sync_fences( + tenant_id, self.app, r, r_replica, r_celery, lock_beat + ) + except Exception: + task_logger.exception( + "Exception while validating external group sync fences" + ) - # r.set(OnyxRedisSignals.VALIDATE_EXTERNAL_GROUP_SYNC_FENCES, 1, ex=60) + r.set(OnyxRedisSignals.BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES, 1, ex=300) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." @@ -191,9 +198,11 @@ def try_creating_external_group_sync_task( cc_pair_id: int, r: Redis, tenant_id: str | None, -) -> int | None: +) -> str | None: """Returns an int if syncing is needed. The int represents the number of sync tasks generated. Returns None if no syncing is required.""" + payload_id: str | None = None + redis_connector = RedisConnector(tenant_id, cc_pair_id) LOCK_TIMEOUT = 30 @@ -215,11 +224,28 @@ def try_creating_external_group_sync_task( redis_connector.external_group_sync.generator_clear() redis_connector.external_group_sync.taskset_clear() + # create before setting fence to avoid race condition where the monitoring + # task updates the sync record before it is created + try: + with get_session_with_tenant(tenant_id) as db_session: + insert_sync_record( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.EXTERNAL_GROUP, + ) + except Exception: + task_logger.exception("insert_sync_record exceptioned.") + + # Signal active before creating fence + redis_connector.external_group_sync.set_active() + payload = RedisConnectorExternalGroupSyncPayload( + id=make_short_id(), submitted=datetime.now(timezone.utc), started=None, celery_task_id=None, ) + redis_connector.external_group_sync.set_fence(payload) custom_task_id = f"{redis_connector.external_group_sync.taskset_key}_{uuid4()}" @@ -234,17 +260,10 @@ def try_creating_external_group_sync_task( priority=OnyxCeleryPriority.HIGH, ) - # create before setting fence to avoid race condition where the monitoring - # task updates the sync record before it is created - with get_session_with_tenant(tenant_id) as db_session: - insert_sync_record( - db_session=db_session, - entity_id=cc_pair_id, - sync_type=SyncType.EXTERNAL_GROUP, - ) - payload.celery_task_id = result.id redis_connector.external_group_sync.set_fence(payload) + + payload_id = payload.id except Exception: task_logger.exception( f"Unexpected exception while trying to create external group sync task: cc_pair={cc_pair_id}" @@ -254,7 +273,7 @@ def try_creating_external_group_sync_task( if lock.owned(): lock.release() - return 1 + return payload_id @shared_task( @@ -312,7 +331,8 @@ def connector_external_group_sync_generator_task( logger.info( f"connector_external_group_sync_generator_task - Fence found, continuing...: " - f"fence={redis_connector.external_group_sync.fence_key}" + f"fence={redis_connector.external_group_sync.fence_key} " + f"payload_id={payload.id}" ) break @@ -381,7 +401,7 @@ def connector_external_group_sync_generator_task( ) except Exception as e: task_logger.exception( - f"Failed to run external group sync: cc_pair={cc_pair_id}" + f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}" ) with get_session_with_tenant(tenant_id) as db_session: @@ -401,32 +421,41 @@ def connector_external_group_sync_generator_task( if lock.owned(): lock.release() + task_logger.info( + f"External group sync finished: cc_pair={cc_pair_id} payload_id={payload.id}" + ) + def validate_external_group_sync_fences( tenant_id: str | None, celery_app: Celery, r: Redis, + r_replica: Redis, r_celery: Redis, lock_beat: RedisLock, ) -> None: - reserved_sync_tasks = celery_get_unacked_task_ids( + reserved_tasks = celery_get_unacked_task_ids( OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery ) - # validate all existing indexing jobs - for key_bytes in r.scan_iter( - RedisConnectorExternalGroupSync.FENCE_PREFIX + "*", - count=SCAN_ITER_COUNT_DEFAULT, - ): + # validate all existing external group sync tasks + lock_beat.reacquire() + keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + for key in keys: + key_bytes = cast(bytes, key) + key_str = key_bytes.decode("utf-8") + if not key_str.startswith(RedisConnectorExternalGroupSync.FENCE_PREFIX): + continue + + validate_external_group_sync_fence( + tenant_id, + key_bytes, + reserved_tasks, + r_celery, + ) + lock_beat.reacquire() - with get_session_with_tenant(tenant_id) as db_session: - validate_external_group_sync_fence( - tenant_id, - key_bytes, - reserved_sync_tasks, - r_celery, - db_session, - ) + return @@ -435,7 +464,6 @@ def validate_external_group_sync_fence( key_bytes: bytes, reserved_tasks: set[str], r_celery: Redis, - db_session: Session, ) -> None: """Checks for the error condition where an indexing fence is set but the associated celery tasks don't exist. This can happen if the indexing worker hard crashes or is terminated. @@ -478,26 +506,26 @@ def validate_external_group_sync_fence( if not redis_connector.external_group_sync.fenced: return - payload = redis_connector.external_group_sync.payload - if not payload: - return - - # OK, there's actually something for us to validate - - if payload.celery_task_id is None: - # the fence is just barely set up. - # if redis_connector_index.active(): - # return - - # it would be odd to get here as there isn't that much that can go wrong during - # initial fence setup, but it's still worth making sure we can recover - logger.info( + try: + payload = redis_connector.external_group_sync.payload + except ValidationError: + task_logger.exception( "validate_external_group_sync_fence - " - f"Resetting fence in basic state without any activity: fence={fence_key}" + "Resetting fence because fence schema is out of date: " + f"cc_pair={cc_pair_id} " + f"fence={fence_key}" ) + redis_connector.external_group_sync.reset() return + if not payload: + return + + if not payload.celery_task_id: + return + + # OK, there's actually something for us to validate found = celery_find_task( payload.celery_task_id, OnyxCeleryQueues.CONNECTOR_EXTERNAL_GROUP_SYNC, r_celery ) @@ -527,7 +555,8 @@ def validate_external_group_sync_fence( "validate_external_group_sync_fence - " "Resetting fence because no associated celery tasks were found: " f"cc_pair={cc_pair_id} " - f"fence={fence_key}" + f"fence={fence_key} " + f"payload_id={payload.id}" ) redis_connector.external_group_sync.reset() diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 0b823a09a77a..5a1ff851f167 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -423,8 +423,8 @@ def connector_indexing_task( # define a callback class callback = IndexingCallback( os.getppid(), - redis_connector.stop.fence_key, - redis_connector_index.generator_progress_key, + redis_connector, + redis_connector_index, lock, r, ) diff --git a/backend/onyx/background/celery/tasks/indexing/utils.py b/backend/onyx/background/celery/tasks/indexing/utils.py index 928a76121a03..b813da225d57 100644 --- a/backend/onyx/background/celery/tasks/indexing/utils.py +++ b/backend/onyx/background/celery/tasks/indexing/utils.py @@ -99,16 +99,16 @@ class IndexingCallback(IndexingHeartbeatInterface): def __init__( self, parent_pid: int, - stop_key: str, - generator_progress_key: str, + redis_connector: RedisConnector, + redis_connector_index: RedisConnectorIndex, redis_lock: RedisLock, redis_client: Redis, ): super().__init__() self.parent_pid = parent_pid + self.redis_connector: RedisConnector = redis_connector + self.redis_connector_index: RedisConnectorIndex = redis_connector_index self.redis_lock: RedisLock = redis_lock - self.stop_key: str = stop_key - self.generator_progress_key: str = generator_progress_key self.redis_client = redis_client self.started: datetime = datetime.now(timezone.utc) self.redis_lock.reacquire() @@ -120,7 +120,7 @@ class IndexingCallback(IndexingHeartbeatInterface): self.last_parent_check = time.monotonic() def should_stop(self) -> bool: - if self.redis_client.exists(self.stop_key): + if self.redis_connector.stop.fenced: return True return False @@ -143,6 +143,8 @@ class IndexingCallback(IndexingHeartbeatInterface): # self.last_parent_check = now try: + self.redis_connector.prune.set_active() + current_time = time.monotonic() if current_time - self.last_lock_monotonic >= ( CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4 @@ -165,7 +167,9 @@ class IndexingCallback(IndexingHeartbeatInterface): redis_lock_dump(self.redis_lock, self.redis_client) raise - self.redis_client.incrby(self.generator_progress_key, amount) + self.redis_client.incrby( + self.redis_connector_index.generator_progress_key, amount + ) def validate_indexing_fence( diff --git a/backend/onyx/background/celery/tasks/pruning/tasks.py b/backend/onyx/background/celery/tasks/pruning/tasks.py index 99a37ddd0172..651ff50da769 100644 --- a/backend/onyx/background/celery/tasks/pruning/tasks.py +++ b/backend/onyx/background/celery/tasks/pruning/tasks.py @@ -1,28 +1,39 @@ +import time from datetime import datetime from datetime import timedelta from datetime import timezone +from typing import Any +from typing import cast from uuid import uuid4 from celery import Celery from celery import shared_task from celery import Task from celery.exceptions import SoftTimeLimitExceeded +from pydantic import ValidationError from redis import Redis from redis.lock import Lock as RedisLock from sqlalchemy.orm import Session from onyx.background.celery.apps.app_base import task_logger +from onyx.background.celery.celery_redis import celery_find_task +from onyx.background.celery.celery_redis import celery_get_queue_length +from onyx.background.celery.celery_redis import celery_get_queued_task_ids +from onyx.background.celery.celery_redis import celery_get_unacked_task_ids from onyx.background.celery.celery_utils import extract_ids_from_runnable_connector from onyx.background.celery.tasks.indexing.utils import IndexingCallback from onyx.configs.app_configs import ALLOW_SIMULTANEOUS_PRUNING from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_PRUNING_LOCK_TIMEOUT +from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from onyx.configs.constants import OnyxRedisConstants from onyx.configs.constants import OnyxRedisLocks +from onyx.configs.constants import OnyxRedisSignals from onyx.connectors.factory import instantiate_connector from onyx.connectors.models import InputType from onyx.db.connector import mark_ccpair_as_pruned @@ -35,10 +46,15 @@ from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.enums import SyncStatus from onyx.db.enums import SyncType from onyx.db.models import ConnectorCredentialPair +from onyx.db.search_settings import get_current_search_settings from onyx.db.sync_record import insert_sync_record from onyx.db.sync_record import update_sync_record_status from onyx.redis.redis_connector import RedisConnector +from onyx.redis.redis_connector_prune import RedisConnectorPrune +from onyx.redis.redis_connector_prune import RedisConnectorPrunePayload from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import get_redis_replica_client +from onyx.server.utils import make_short_id from onyx.utils.logger import LoggerContextVars from onyx.utils.logger import pruning_ctx from onyx.utils.logger import setup_logger @@ -93,6 +109,8 @@ def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool: ) def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None: r = get_redis_client(tenant_id=tenant_id) + r_replica = get_redis_replica_client(tenant_id=tenant_id) + r_celery: Redis = self.app.broker_connection().channel().client # type: ignore lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK, @@ -123,13 +141,28 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None: if not _is_pruning_due(cc_pair): continue - tasks_created = try_creating_prune_generator_task( + payload_id = try_creating_prune_generator_task( self.app, cc_pair, db_session, r, tenant_id ) - if not tasks_created: + if not payload_id: continue - task_logger.info(f"Pruning queued: cc_pair={cc_pair.id}") + task_logger.info( + f"Pruning queued: cc_pair={cc_pair.id} id={payload_id}" + ) + + # we want to run this less frequently than the overall task + lock_beat.reacquire() + if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES): + # clear any permission fences that don't have associated celery tasks in progress + # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), + # or be currently executing + try: + validate_pruning_fences(tenant_id, r, r_replica, r_celery, lock_beat) + except Exception: + task_logger.exception("Exception while validating pruning fences") + + r.set(OnyxRedisSignals.BLOCK_VALIDATE_PRUNING_FENCES, 1, ex=300) except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." @@ -149,7 +182,7 @@ def try_creating_prune_generator_task( db_session: Session, r: Redis, tenant_id: str | None, -) -> int | None: +) -> str | None: """Checks for any conditions that should block the pruning generator task from being created, then creates the task. @@ -168,7 +201,7 @@ def try_creating_prune_generator_task( # we need to serialize starting pruning since it can be triggered either via # celery beat or manually (API call) - lock = r.lock( + lock: RedisLock = r.lock( DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_creating_prune_generator_task", timeout=LOCK_TIMEOUT, ) @@ -200,7 +233,30 @@ def try_creating_prune_generator_task( custom_task_id = f"{redis_connector.prune.generator_task_key}_{uuid4()}" - celery_app.send_task( + # create before setting fence to avoid race condition where the monitoring + # task updates the sync record before it is created + try: + insert_sync_record( + db_session=db_session, + entity_id=cc_pair.id, + sync_type=SyncType.PRUNING, + ) + except Exception: + task_logger.exception("insert_sync_record exceptioned.") + + # signal active before the fence is set + redis_connector.prune.set_active() + + # set a basic fence to start + payload = RedisConnectorPrunePayload( + id=make_short_id(), + submitted=datetime.now(timezone.utc), + started=None, + celery_task_id=None, + ) + redis_connector.prune.set_fence(payload) + + result = celery_app.send_task( OnyxCeleryTask.CONNECTOR_PRUNING_GENERATOR_TASK, kwargs=dict( cc_pair_id=cc_pair.id, @@ -213,16 +269,11 @@ def try_creating_prune_generator_task( priority=OnyxCeleryPriority.LOW, ) - # create before setting fence to avoid race condition where the monitoring - # task updates the sync record before it is created - insert_sync_record( - db_session=db_session, - entity_id=cc_pair.id, - sync_type=SyncType.PRUNING, - ) + # fill in the celery task id + payload.celery_task_id = result.id + redis_connector.prune.set_fence(payload) - # set this only after all tasks have been added - redis_connector.prune.set_fence(True) + payload_id = payload.id except Exception: task_logger.exception(f"Unexpected exception: cc_pair={cc_pair.id}") return None @@ -230,7 +281,7 @@ def try_creating_prune_generator_task( if lock.owned(): lock.release() - return 1 + return payload_id @shared_task( @@ -252,6 +303,8 @@ def connector_pruning_generator_task( and compares those IDs to locally stored documents and deletes all locally stored IDs missing from the most recently pulled document ID list""" + payload_id: str | None = None + LoggerContextVars.reset() pruning_ctx_dict = pruning_ctx.get() @@ -265,6 +318,46 @@ def connector_pruning_generator_task( r = get_redis_client(tenant_id=tenant_id) + # this wait is needed to avoid a race condition where + # the primary worker sends the task and it is immediately executed + # before the primary worker can finalize the fence + start = time.monotonic() + while True: + if time.monotonic() - start > CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT: + raise ValueError( + f"connector_prune_generator_task - timed out waiting for fence to be ready: " + f"fence={redis_connector.prune.fence_key}" + ) + + if not redis_connector.prune.fenced: # The fence must exist + raise ValueError( + f"connector_prune_generator_task - fence not found: " + f"fence={redis_connector.prune.fence_key}" + ) + + payload = redis_connector.prune.payload # The payload must exist + if not payload: + raise ValueError( + "connector_prune_generator_task: payload invalid or not found" + ) + + if payload.celery_task_id is None: + logger.info( + f"connector_prune_generator_task - Waiting for fence: " + f"fence={redis_connector.prune.fence_key}" + ) + time.sleep(1) + continue + + payload_id = payload.id + + logger.info( + f"connector_prune_generator_task - Fence found, continuing...: " + f"fence={redis_connector.prune.fence_key} " + f"payload_id={payload.id}" + ) + break + # set thread_local=False since we don't control what thread the indexing/pruning # might run our callback with lock: RedisLock = r.lock( @@ -294,6 +387,18 @@ def connector_pruning_generator_task( ) return + payload = redis_connector.prune.payload + if not payload: + raise ValueError(f"No fence payload found: cc_pair={cc_pair_id}") + + new_payload = RedisConnectorPrunePayload( + id=payload.id, + submitted=payload.submitted, + started=datetime.now(timezone.utc), + celery_task_id=payload.celery_task_id, + ) + redis_connector.prune.set_fence(new_payload) + task_logger.info( f"Pruning generator running connector: " f"cc_pair={cc_pair_id} " @@ -307,10 +412,13 @@ def connector_pruning_generator_task( cc_pair.credential, ) + search_settings = get_current_search_settings(db_session) + redis_connector_index = redis_connector.new_index(search_settings.id) + callback = IndexingCallback( 0, - redis_connector.stop.fence_key, - redis_connector.prune.generator_progress_key, + redis_connector, + redis_connector_index, lock, r, ) @@ -357,7 +465,9 @@ def connector_pruning_generator_task( redis_connector.prune.generator_complete = tasks_generated except Exception as e: task_logger.exception( - f"Failed to run pruning: cc_pair={cc_pair_id} connector={connector_id}" + f"Pruning exceptioned: cc_pair={cc_pair_id} " + f"connector={connector_id} " + f"payload_id={payload_id}" ) redis_connector.prune.reset() @@ -366,7 +476,9 @@ def connector_pruning_generator_task( if lock.owned(): lock.release() - task_logger.info(f"Pruning generator finished: cc_pair={cc_pair_id}") + task_logger.info( + f"Pruning generator finished: cc_pair={cc_pair_id} payload_id={payload_id}" + ) """Monitoring pruning utils, called in monitor_vespa_sync""" @@ -415,4 +527,184 @@ def monitor_ccpair_pruning_taskset( redis_connector.prune.taskset_clear() redis_connector.prune.generator_clear() - redis_connector.prune.set_fence(False) + redis_connector.prune.set_fence(None) + + +def validate_pruning_fences( + tenant_id: str | None, + r: Redis, + r_replica: Redis, + r_celery: Redis, + lock_beat: RedisLock, +) -> None: + # building lookup table can be expensive, so we won't bother + # validating until the queue is small + PERMISSION_SYNC_VALIDATION_MAX_QUEUE_LEN = 1024 + + queue_len = celery_get_queue_length(OnyxCeleryQueues.CONNECTOR_DELETION, r_celery) + if queue_len > PERMISSION_SYNC_VALIDATION_MAX_QUEUE_LEN: + return + + # the queue for a single pruning generator task + reserved_generator_tasks = celery_get_unacked_task_ids( + OnyxCeleryQueues.CONNECTOR_PRUNING, r_celery + ) + + # the queue for a reasonably large set of lightweight deletion tasks + queued_upsert_tasks = celery_get_queued_task_ids( + OnyxCeleryQueues.CONNECTOR_DELETION, r_celery + ) + + # Use replica for this because the worst thing that happens + # is that we don't run the validation on this pass + keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + for key in keys: + key_bytes = cast(bytes, key) + key_str = key_bytes.decode("utf-8") + if not key_str.startswith(RedisConnectorPrune.FENCE_PREFIX): + continue + + validate_pruning_fence( + tenant_id, + key_bytes, + reserved_generator_tasks, + queued_upsert_tasks, + r, + r_celery, + ) + + lock_beat.reacquire() + + return + + +def validate_pruning_fence( + tenant_id: str | None, + key_bytes: bytes, + reserved_tasks: set[str], + queued_tasks: set[str], + r: Redis, + r_celery: Redis, +) -> None: + """See validate_indexing_fence for an overall idea of validation flows. + + queued_tasks: the celery queue of lightweight permission sync tasks + reserved_tasks: prefetched tasks for sync task generator + """ + # if the fence doesn't exist, there's nothing to do + fence_key = key_bytes.decode("utf-8") + cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key) + if cc_pair_id_str is None: + task_logger.warning( + f"validate_pruning_fence - could not parse id from {fence_key}" + ) + return + + cc_pair_id = int(cc_pair_id_str) + # parse out metadata and initialize the helper class with it + redis_connector = RedisConnector(tenant_id, int(cc_pair_id)) + + # check to see if the fence/payload exists + if not redis_connector.prune.fenced: + return + + # in the cloud, the payload format may have changed ... + # it's a little sloppy, but just reset the fence for now if that happens + # TODO: add intentional cleanup/abort logic + try: + payload = redis_connector.prune.payload + except ValidationError: + task_logger.exception( + "validate_pruning_fence - " + "Resetting fence because fence schema is out of date: " + f"cc_pair={cc_pair_id} " + f"fence={fence_key}" + ) + + redis_connector.prune.reset() + return + + if not payload: + return + + if not payload.celery_task_id: + return + + # OK, there's actually something for us to validate + + # either the generator task must be in flight or its subtasks must be + found = celery_find_task( + payload.celery_task_id, + OnyxCeleryQueues.CONNECTOR_PRUNING, + r_celery, + ) + if found: + # the celery task exists in the redis queue + redis_connector.prune.set_active() + return + + if payload.celery_task_id in reserved_tasks: + # the celery task was prefetched and is reserved within a worker + redis_connector.prune.set_active() + return + + # look up every task in the current taskset in the celery queue + # every entry in the taskset should have an associated entry in the celery task queue + # because we get the celery tasks first, the entries in our own pruning taskset + # should be roughly a subset of the tasks in celery + + # this check isn't very exact, but should be sufficient over a period of time + # A single successful check over some number of attempts is sufficient. + + # TODO: if the number of tasks in celery is much lower than than the taskset length + # we might be able to shortcut the lookup since by definition some of the tasks + # must not exist in celery. + + tasks_scanned = 0 + tasks_not_in_celery = 0 # a non-zero number after completing our check is bad + + for member in r.sscan_iter(redis_connector.prune.taskset_key): + tasks_scanned += 1 + + member_bytes = cast(bytes, member) + member_str = member_bytes.decode("utf-8") + if member_str in queued_tasks: + continue + + if member_str in reserved_tasks: + continue + + tasks_not_in_celery += 1 + + task_logger.info( + "validate_pruning_fence task check: " + f"tasks_scanned={tasks_scanned} tasks_not_in_celery={tasks_not_in_celery}" + ) + + # we're active if there are still tasks to run and those tasks all exist in celery + if tasks_scanned > 0 and tasks_not_in_celery == 0: + redis_connector.prune.set_active() + return + + # we may want to enable this check if using the active task list somehow isn't good enough + # if redis_connector_index.generator_locked(): + # logger.info(f"{payload.celery_task_id} is currently executing.") + + # if we get here, we didn't find any direct indication that the associated celery tasks exist, + # but they still might be there due to gaps in our ability to check states during transitions + # Checking the active signal safeguards us against these transition periods + # (which has a duration that allows us to bridge those gaps) + if redis_connector.prune.active(): + return + + # celery tasks don't exist and the active signal has expired, possibly due to a crash. Clean it up. + task_logger.warning( + "validate_pruning_fence - " + "Resetting fence because no associated celery tasks were found: " + f"cc_pair={cc_pair_id} " + f"fence={fence_key} " + f"payload_id={payload.id}" + ) + + redis_connector.prune.reset() + return diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index b665ef5980a9..9c209d27f60d 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -339,11 +339,15 @@ def try_generate_document_set_sync_tasks( # create before setting fence to avoid race condition where the monitoring # task updates the sync record before it is created - insert_sync_record( - db_session=db_session, - entity_id=document_set_id, - sync_type=SyncType.DOCUMENT_SET, - ) + try: + insert_sync_record( + db_session=db_session, + entity_id=document_set_id, + sync_type=SyncType.DOCUMENT_SET, + ) + except Exception: + task_logger.exception("insert_sync_record exceptioned.") + # set this only after all tasks have been added rds.set_fence(tasks_generated) return tasks_generated @@ -411,11 +415,15 @@ def try_generate_user_group_sync_tasks( # create before setting fence to avoid race condition where the monitoring # task updates the sync record before it is created - insert_sync_record( - db_session=db_session, - entity_id=usergroup_id, - sync_type=SyncType.USER_GROUP, - ) + try: + insert_sync_record( + db_session=db_session, + entity_id=usergroup_id, + sync_type=SyncType.USER_GROUP, + ) + except Exception: + task_logger.exception("insert_sync_record exceptioned.") + # set this only after all tasks have been added rug.set_fence(tasks_generated) @@ -904,7 +912,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: # use a lookup table to find active fences. We still have to verify the fence # exists since it is an optimization and not the source of truth. - keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES)) for key in keys: key_bytes = cast(bytes, key) diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 0bf7085e1c7a..a85c70c5cae2 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -324,6 +324,7 @@ class OnyxRedisSignals: BLOCK_VALIDATE_PERMISSION_SYNC_FENCES = ( "signal:block_validate_permission_sync_fences" ) + BLOCK_VALIDATE_PRUNING_FENCES = "signal:block_validate_pruning_fences" BLOCK_BUILD_FENCE_LOOKUP_TABLE = "signal:block_build_fence_lookup_table" diff --git a/backend/onyx/redis/redis_connector_doc_perm_sync.py b/backend/onyx/redis/redis_connector_doc_perm_sync.py index 385bb0b92613..70b9b4011344 100644 --- a/backend/onyx/redis/redis_connector_doc_perm_sync.py +++ b/backend/onyx/redis/redis_connector_doc_perm_sync.py @@ -80,7 +80,8 @@ class RedisConnectorPermissionSync: def get_active_task_count(self) -> int: """Count of active permission sync tasks""" count = 0 - for _ in self.redis.scan_iter( + for _ in self.redis.sscan_iter( + OnyxRedisConstants.ACTIVE_FENCES, RedisConnectorPermissionSync.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT, ): diff --git a/backend/onyx/redis/redis_connector_ext_group_sync.py b/backend/onyx/redis/redis_connector_ext_group_sync.py index 54654bb7138c..a63463df81c8 100644 --- a/backend/onyx/redis/redis_connector_ext_group_sync.py +++ b/backend/onyx/redis/redis_connector_ext_group_sync.py @@ -1,5 +1,4 @@ from datetime import datetime -from typing import Any from typing import cast import redis @@ -8,10 +7,12 @@ from pydantic import BaseModel from redis.lock import Lock as RedisLock from sqlalchemy.orm import Session +from onyx.configs.constants import OnyxRedisConstants from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT class RedisConnectorExternalGroupSyncPayload(BaseModel): + id: str submitted: datetime started: datetime | None celery_task_id: str | None @@ -37,6 +38,12 @@ class RedisConnectorExternalGroupSync: TASKSET_PREFIX = f"{PREFIX}_taskset" # connectorexternalgroupsync_taskset SUBTASK_PREFIX = f"{PREFIX}+sub" # connectorexternalgroupsync+sub + # used to signal the overall workflow is still active + # it's impossible to get the exact state of the system at a single point in time + # so we need a signal with a TTL to bridge gaps in our checks + ACTIVE_PREFIX = PREFIX + "_active" + ACTIVE_TTL = 3600 + def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None: self.tenant_id: str | None = tenant_id self.id = id @@ -50,6 +57,7 @@ class RedisConnectorExternalGroupSync: self.taskset_key = f"{self.TASKSET_PREFIX}_{id}" self.subtask_prefix: str = f"{self.SUBTASK_PREFIX}_{id}" + self.active_key = f"{self.ACTIVE_PREFIX}_{id}" def taskset_clear(self) -> None: self.redis.delete(self.taskset_key) @@ -66,7 +74,8 @@ class RedisConnectorExternalGroupSync: def get_active_task_count(self) -> int: """Count of active external group syncing tasks""" count = 0 - for _ in self.redis.scan_iter( + for _ in self.redis.sscan_iter( + OnyxRedisConstants.ACTIVE_FENCES, RedisConnectorExternalGroupSync.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT, ): @@ -83,10 +92,11 @@ class RedisConnectorExternalGroupSync: @property def payload(self) -> RedisConnectorExternalGroupSyncPayload | None: # read related data and evaluate/print task progress - fence_bytes = cast(Any, self.redis.get(self.fence_key)) - if fence_bytes is None: + fence_raw = self.redis.get(self.fence_key) + if fence_raw is None: return None + fence_bytes = cast(bytes, fence_raw) fence_str = fence_bytes.decode("utf-8") payload = RedisConnectorExternalGroupSyncPayload.model_validate_json( cast(str, fence_str) @@ -99,10 +109,26 @@ class RedisConnectorExternalGroupSync: payload: RedisConnectorExternalGroupSyncPayload | None, ) -> None: if not payload: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return self.redis.set(self.fence_key, payload.model_dump_json()) + self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + + def set_active(self) -> None: + """This sets a signal to keep the permissioning flow from getting cleaned up within + the expiration time. + + The slack in timing is needed to avoid race conditions where simply checking + the celery queue and task status could result in race conditions.""" + self.redis.set(self.active_key, 0, ex=self.ACTIVE_TTL) + + def active(self) -> bool: + if self.redis.exists(self.active_key): + return True + + return False @property def generator_complete(self) -> int | None: @@ -138,6 +164,8 @@ class RedisConnectorExternalGroupSync: pass def reset(self) -> None: + self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + self.redis.delete(self.active_key) self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_complete_key) self.redis.delete(self.taskset_key) @@ -152,6 +180,9 @@ class RedisConnectorExternalGroupSync: @staticmethod def reset_all(r: redis.Redis) -> None: """Deletes all redis values for all connectors""" + for key in r.scan_iter(RedisConnectorExternalGroupSync.ACTIVE_PREFIX + "*"): + r.delete(key) + for key in r.scan_iter(RedisConnectorExternalGroupSync.TASKSET_PREFIX + "*"): r.delete(key) diff --git a/backend/onyx/redis/redis_connector_prune.py b/backend/onyx/redis/redis_connector_prune.py index bc04cd5166b3..089dbbfdf684 100644 --- a/backend/onyx/redis/redis_connector_prune.py +++ b/backend/onyx/redis/redis_connector_prune.py @@ -1,9 +1,11 @@ import time +from datetime import datetime from typing import cast from uuid import uuid4 import redis from celery import Celery +from pydantic import BaseModel from redis.lock import Lock as RedisLock from sqlalchemy.orm import Session @@ -16,6 +18,13 @@ from onyx.db.connector_credential_pair import get_connector_credential_pair_from from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT +class RedisConnectorPrunePayload(BaseModel): + id: str + submitted: datetime + started: datetime | None + celery_task_id: str | None + + class RedisConnectorPrune: """Manages interactions with redis for pruning tasks. Should only be accessed through RedisConnector.""" @@ -36,6 +45,12 @@ class RedisConnectorPrune: TASKSET_PREFIX = f"{PREFIX}_taskset" # connectorpruning_taskset SUBTASK_PREFIX = f"{PREFIX}+sub" # connectorpruning+sub + # used to signal the overall workflow is still active + # it's impossible to get the exact state of the system at a single point in time + # so we need a signal with a TTL to bridge gaps in our checks + ACTIVE_PREFIX = PREFIX + "_active" + ACTIVE_TTL = 3600 + def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None: self.tenant_id: str | None = tenant_id self.id = id @@ -49,6 +64,7 @@ class RedisConnectorPrune: self.taskset_key = f"{self.TASKSET_PREFIX}_{id}" self.subtask_prefix: str = f"{self.SUBTASK_PREFIX}_{id}" + self.active_key = f"{self.ACTIVE_PREFIX}_{id}" def taskset_clear(self) -> None: self.redis.delete(self.taskset_key) @@ -65,8 +81,10 @@ class RedisConnectorPrune: def get_active_task_count(self) -> int: """Count of active pruning tasks""" count = 0 - for key in self.redis.scan_iter( - RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT + for _ in self.redis.sscan_iter( + OnyxRedisConstants.ACTIVE_FENCES, + RedisConnectorPrune.FENCE_PREFIX + "*", + count=SCAN_ITER_COUNT_DEFAULT, ): count += 1 return count @@ -78,15 +96,44 @@ class RedisConnectorPrune: return False - def set_fence(self, value: bool) -> None: - if not value: + @property + def payload(self) -> RedisConnectorPrunePayload | None: + # read related data and evaluate/print task progress + fence_bytes = cast(bytes, self.redis.get(self.fence_key)) + if fence_bytes is None: + return None + + fence_str = fence_bytes.decode("utf-8") + payload = RedisConnectorPrunePayload.model_validate_json(cast(str, fence_str)) + + return payload + + def set_fence( + self, + payload: RedisConnectorPrunePayload | None, + ) -> None: + if not payload: self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) self.redis.delete(self.fence_key) return - self.redis.set(self.fence_key, 0) + self.redis.set(self.fence_key, payload.model_dump_json()) self.redis.sadd(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + def set_active(self) -> None: + """This sets a signal to keep the permissioning flow from getting cleaned up within + the expiration time. + + The slack in timing is needed to avoid race conditions where simply checking + the celery queue and task status could result in race conditions.""" + self.redis.set(self.active_key, 0, ex=self.ACTIVE_TTL) + + def active(self) -> bool: + if self.redis.exists(self.active_key): + return True + + return False + @property def generator_complete(self) -> int | None: """the fence payload is an int representing the starting number of @@ -162,6 +209,7 @@ class RedisConnectorPrune: def reset(self) -> None: self.redis.srem(OnyxRedisConstants.ACTIVE_FENCES, self.fence_key) + self.redis.delete(self.active_key) self.redis.delete(self.generator_progress_key) self.redis.delete(self.generator_complete_key) self.redis.delete(self.taskset_key) @@ -176,6 +224,9 @@ class RedisConnectorPrune: @staticmethod def reset_all(r: redis.Redis) -> None: """Deletes all redis values for all connectors""" + for key in r.scan_iter(RedisConnectorPrune.ACTIVE_PREFIX + "*"): + r.delete(key) + for key in r.scan_iter(RedisConnectorPrune.TASKSET_PREFIX + "*"): r.delete(key) diff --git a/backend/onyx/server/documents/cc_pair.py b/backend/onyx/server/documents/cc_pair.py index 7c2d7752960f..59e930dcd95c 100644 --- a/backend/onyx/server/documents/cc_pair.py +++ b/backend/onyx/server/documents/cc_pair.py @@ -368,15 +368,17 @@ def prune_cc_pair( f"credential={cc_pair.credential_id} " f"{cc_pair.connector.name} connector." ) - tasks_created = try_creating_prune_generator_task( + payload_id = try_creating_prune_generator_task( primary_app, cc_pair, db_session, r, CURRENT_TENANT_ID_CONTEXTVAR.get() ) - if not tasks_created: + if not payload_id: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="Pruning task creation failed.", ) + logger.info(f"Pruning queued: cc_pair={cc_pair.id} id={payload_id}") + return StatusResponse( success=True, message="Successfully created the pruning task.", @@ -514,15 +516,17 @@ def sync_cc_pair_groups( f"credential_id={cc_pair.credential_id} " f"{cc_pair.connector.name} connector." ) - tasks_created = try_creating_external_group_sync_task( + payload_id = try_creating_external_group_sync_task( primary_app, cc_pair_id, r, CURRENT_TENANT_ID_CONTEXTVAR.get() ) - if not tasks_created: + if not payload_id: raise HTTPException( status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail="External group sync task creation failed.", ) + logger.info(f"External group sync queued: cc_pair={cc_pair_id} id={payload_id}") + return StatusResponse( success=True, message="Successfully created the external group sync task.",