From d8a17a723899096b2a7bb7b7ab31aceacb82ba8e Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Sat, 25 Jan 2025 19:48:25 -0800 Subject: [PATCH] try using a redis replica in some areas (#3748) * try using a redis replica in some areas * harden up replica usage * comment * slow down cloud dispatch temporarily * add ignored syncing list back * raise multiplier to 8 * comment out per tenant code (no longer used by fanout) --------- Co-authored-by: Richard Kuo (Danswer) --- backend/onyx/background/celery/apps/beat.py | 32 +++++---- .../background/celery/tasks/beat_schedule.py | 26 ++++--- .../background/celery/tasks/indexing/tasks.py | 4 +- .../background/celery/tasks/indexing/utils.py | 7 +- .../background/celery/tasks/shared/tasks.py | 5 ++ .../background/celery/tasks/vespa/tasks.py | 72 ++++++++++++------- backend/onyx/configs/app_configs.py | 2 + backend/onyx/redis/redis_pool.py | 18 ++++- 8 files changed, 110 insertions(+), 56 deletions(-) diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 4e866b3dbb..16a5332398 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -1,6 +1,5 @@ from datetime import timedelta from typing import Any -from typing import cast from celery import Celery from celery import signals @@ -8,7 +7,6 @@ from celery.beat import PersistentScheduler # type: ignore from celery.signals import beat_init import onyx.background.celery.apps.app_base as app_base -from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import POSTGRES_CELERY_BEAT_APP_NAME from onyx.db.engine import get_all_tenant_ids from onyx.db.engine import SqlEngine @@ -132,21 +130,25 @@ class DynamicTenantScheduler(PersistentScheduler): # get current schedule and extract current tenants current_schedule = self.schedule.items() - current_tenants = set() - for task_name, _ in current_schedule: - task_name = cast(str, task_name) - if task_name.startswith(ONYX_CLOUD_CELERY_TASK_PREFIX): - continue + # there are no more per tenant beat tasks, so comment this out + # NOTE: we may not actualy need this scheduler any more and should + # test reverting to a regular beat schedule implementation - if "_" in task_name: - # example: "check-for-condition-tenant_12345678-abcd-efgh-ijkl-12345678" - # -> "12345678-abcd-efgh-ijkl-12345678" - current_tenants.add(task_name.split("_")[-1]) - logger.info(f"Found {len(current_tenants)} existing items in schedule") + # current_tenants = set() + # for task_name, _ in current_schedule: + # task_name = cast(str, task_name) + # if task_name.startswith(ONYX_CLOUD_CELERY_TASK_PREFIX): + # continue - for tenant_id in tenant_ids: - if tenant_id not in current_tenants: - logger.info(f"Processing new tenant: {tenant_id}") + # if "_" in task_name: + # # example: "check-for-condition-tenant_12345678-abcd-efgh-ijkl-12345678" + # # -> "12345678-abcd-efgh-ijkl-12345678" + # current_tenants.add(task_name.split("_")[-1]) + # logger.info(f"Found {len(current_tenants)} existing items in schedule") + + # for tenant_id in tenant_ids: + # if tenant_id not in current_tenants: + # logger.info(f"Processing new tenant: {tenant_id}") new_schedule = self._generate_schedule(tenant_ids) diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index d2c01e1591..2eb48cdf67 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -16,6 +16,10 @@ from shared_configs.configs import MULTI_TENANT # it's only important that they run relatively regularly BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) +# hack to slow down task dispatch in the cloud until +# we have a better implementation (backpressure, etc) +CLOUD_BEAT_SCHEDULE_MULTIPLIER = 8 + # tasks that only run in the cloud # the name attribute must start with ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" to be filtered # by the DynamicTenantScheduler @@ -24,7 +28,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-alembic", "task": OnyxCeleryTask.CLOUD_CHECK_ALEMBIC, - "schedule": timedelta(hours=1), + "schedule": timedelta(hours=1 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "queue": OnyxCeleryQueues.MONITORING, "priority": OnyxCeleryPriority.HIGH, @@ -35,7 +39,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-indexing", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=15), + "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, @@ -47,7 +51,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-connector-deletion", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20), + "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, @@ -59,7 +63,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-vespa-sync", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20), + "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, @@ -71,7 +75,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-prune", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=15), + "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, @@ -83,7 +87,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-vespa-sync", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=5), + "schedule": timedelta(seconds=15 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, @@ -95,7 +99,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-doc-permissions-sync", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=30), + "schedule": timedelta(seconds=30 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, @@ -107,7 +111,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-external-group-sync", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(seconds=20), + "schedule": timedelta(seconds=20 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, @@ -119,7 +123,7 @@ cloud_tasks_to_schedule = [ { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_monitor-background-processes", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(minutes=5), + "schedule": timedelta(minutes=5 * CLOUD_BEAT_SCHEDULE_MULTIPLIER), "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, @@ -137,7 +141,9 @@ if LLM_MODEL_UPDATE_API_URL: { "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-llm-model-update", "task": OnyxCeleryTask.CLOUD_BEAT_TASK_GENERATOR, - "schedule": timedelta(hours=1), # Check every hour + "schedule": timedelta( + hours=1 * CLOUD_BEAT_SCHEDULE_MULTIPLIER + ), # Check every hour "options": { "priority": OnyxCeleryPriority.HIGHEST, "expires": BEAT_EXPIRES_DEFAULT, diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 309e038188..94ace6776c 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -45,6 +45,7 @@ from onyx.natural_language_processing.search_nlp_models import EmbeddingModel from onyx.natural_language_processing.search_nlp_models import warm_up_bi_encoder from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import get_redis_replica_client from onyx.redis.redis_pool import redis_lock_dump from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import global_version @@ -69,6 +70,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: tasks_created = 0 locked = False redis_client = get_redis_client(tenant_id=tenant_id) + redis_client_replica = get_redis_replica_client(tenant_id=tenant_id) # we need to use celery's redis client to access its redis data # (which lives on a different db number) @@ -227,7 +229,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: # or be currently executing try: validate_indexing_fences( - tenant_id, self.app, redis_client, redis_client_celery, lock_beat + tenant_id, redis_client_replica, redis_client_celery, lock_beat ) except Exception: task_logger.exception("Exception while validating indexing fences") diff --git a/backend/onyx/background/celery/tasks/indexing/utils.py b/backend/onyx/background/celery/tasks/indexing/utils.py index 2ef1856466..8651139146 100644 --- a/backend/onyx/background/celery/tasks/indexing/utils.py +++ b/backend/onyx/background/celery/tasks/indexing/utils.py @@ -291,8 +291,7 @@ def validate_indexing_fence( def validate_indexing_fences( tenant_id: str | None, - celery_app: Celery, - r: Redis, + r_replica: Redis, r_celery: Redis, lock_beat: RedisLock, ) -> None: @@ -301,7 +300,9 @@ def validate_indexing_fences( ) # validate all existing indexing jobs - for key_bytes in r.scan_iter( + # Use replica for this because the worst thing that happens + # is that we don't run the validation on this pass + for key_bytes in r_replica.scan_iter( RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT ): lock_beat.reacquire() diff --git a/backend/onyx/background/celery/tasks/shared/tasks.py b/backend/onyx/background/celery/tasks/shared/tasks.py index 0dba3e6dc3..46082492b8 100644 --- a/backend/onyx/background/celery/tasks/shared/tasks.py +++ b/backend/onyx/background/celery/tasks/shared/tasks.py @@ -33,6 +33,7 @@ from onyx.document_index.interfaces import VespaDocumentFields from onyx.redis.redis_pool import get_redis_client from onyx.redis.redis_pool import redis_lock_dump from onyx.server.documents.models import ConnectorCredentialPairIdentifier +from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES = 3 @@ -247,6 +248,10 @@ def cloud_beat_task_generator( lock_beat.reacquire() last_lock_time = current_time + # needed in the cloud + if IGNORED_SYNCING_TENANT_LIST and tenant_id in IGNORED_SYNCING_TENANT_LIST: + continue + self.app.send_task( task_name, kwargs=dict( diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index a350d2ce0c..58df78ee43 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -78,6 +78,7 @@ from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_connector_prune import RedisConnectorPrune from onyx.redis.redis_document_set import RedisDocumentSet from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import get_redis_replica_client from onyx.redis.redis_pool import redis_lock_dump from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.redis.redis_usergroup import RedisUserGroup @@ -895,6 +896,17 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: r = get_redis_client(tenant_id=tenant_id) + # Replica usage notes + # + # False negatives are OK. (aka fail to to see a key that exists on the master). + # We simply skip the monitoring work and it will be caught on the next pass. + # + # False positives are not OK, and are possible if we clear a fence on the master and + # then read from the replica. In this case, monitoring work could be done on a fence + # that no longer exists. To avoid this, we scan from the replica, but double check + # the result on the master. + r_replica = get_redis_replica_client(tenant_id=tenant_id) + lock_beat: RedisLock = r.lock( OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK, timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, @@ -954,17 +966,19 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: # scan and monitor activity to completion phase_start = time.monotonic() lock_beat.reacquire() - if r.exists(RedisConnectorCredentialPair.get_fence_key()): - monitor_connector_taskset(r) + if r_replica.exists(RedisConnectorCredentialPair.get_fence_key()): + if r.exists(RedisConnectorCredentialPair.get_fence_key()): + monitor_connector_taskset(r) timings["connector"] = time.monotonic() - phase_start timings["connector_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) phase_start = time.monotonic() lock_beat.reacquire() - for key_bytes in r.scan_iter( + for key_bytes in r_replica.scan_iter( RedisConnectorDelete.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT ): - monitor_connector_deletion_taskset(tenant_id, key_bytes, r) + if r.exists(key_bytes): + monitor_connector_deletion_taskset(tenant_id, key_bytes, r) lock_beat.reacquire() timings["connector_deletion"] = time.monotonic() - phase_start @@ -974,66 +988,74 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: phase_start = time.monotonic() lock_beat.reacquire() - for key_bytes in r.scan_iter( + for key_bytes in r_replica.scan_iter( RedisDocumentSet.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT ): - with get_session_with_tenant(tenant_id) as db_session: - monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) + if r.exists(key_bytes): + with get_session_with_tenant(tenant_id) as db_session: + monitor_document_set_taskset(tenant_id, key_bytes, r, db_session) lock_beat.reacquire() timings["documentset"] = time.monotonic() - phase_start timings["documentset_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) phase_start = time.monotonic() lock_beat.reacquire() - for key_bytes in r.scan_iter( + for key_bytes in r_replica.scan_iter( RedisUserGroup.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT ): - monitor_usergroup_taskset = fetch_versioned_implementation_with_fallback( - "onyx.background.celery.tasks.vespa.tasks", - "monitor_usergroup_taskset", - noop_fallback, - ) - with get_session_with_tenant(tenant_id) as db_session: - monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) + if r.exists(key_bytes): + monitor_usergroup_taskset = ( + fetch_versioned_implementation_with_fallback( + "onyx.background.celery.tasks.vespa.tasks", + "monitor_usergroup_taskset", + noop_fallback, + ) + ) + with get_session_with_tenant(tenant_id) as db_session: + monitor_usergroup_taskset(tenant_id, key_bytes, r, db_session) lock_beat.reacquire() timings["usergroup"] = time.monotonic() - phase_start timings["usergroup_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) phase_start = time.monotonic() lock_beat.reacquire() - for key_bytes in r.scan_iter( + for key_bytes in r_replica.scan_iter( RedisConnectorPrune.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT ): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) + if r.exists(key_bytes): + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_pruning_taskset(tenant_id, key_bytes, r, db_session) lock_beat.reacquire() timings["pruning"] = time.monotonic() - phase_start timings["pruning_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) phase_start = time.monotonic() lock_beat.reacquire() - for key_bytes in r.scan_iter( + for key_bytes in r_replica.scan_iter( RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT ): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) + if r.exists(key_bytes): + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_indexing_taskset(tenant_id, key_bytes, r, db_session) lock_beat.reacquire() timings["indexing"] = time.monotonic() - phase_start timings["indexing_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) phase_start = time.monotonic() lock_beat.reacquire() - for key_bytes in r.scan_iter( + for key_bytes in r_replica.scan_iter( RedisConnectorPermissionSync.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT, ): - with get_session_with_tenant(tenant_id) as db_session: - monitor_ccpair_permissions_taskset(tenant_id, key_bytes, r, db_session) + if r.exists(key_bytes): + with get_session_with_tenant(tenant_id) as db_session: + monitor_ccpair_permissions_taskset( + tenant_id, key_bytes, r, db_session + ) lock_beat.reacquire() timings["permissions"] = time.monotonic() - phase_start timings["permissions_ttl"] = r.ttl(OnyxRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK) - except SoftTimeLimitExceeded: task_logger.info( "Soft time limit exceeded, task is being terminated gracefully." diff --git a/backend/onyx/configs/app_configs.py b/backend/onyx/configs/app_configs.py index ad6b53c609..3235f6127b 100644 --- a/backend/onyx/configs/app_configs.py +++ b/backend/onyx/configs/app_configs.py @@ -200,6 +200,8 @@ REDIS_HOST = os.environ.get("REDIS_HOST") or "localhost" REDIS_PORT = int(os.environ.get("REDIS_PORT", 6379)) REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD") or "" +# this assumes that other redis settings remain the same as the primary +REDIS_REPLICA_HOST = os.environ.get("REDIS_REPLICA_HOST") or REDIS_HOST REDIS_AUTH_KEY_PREFIX = "fastapi_users_token:" diff --git a/backend/onyx/redis/redis_pool.py b/backend/onyx/redis/redis_pool.py index e3617127a6..10a2c7655c 100644 --- a/backend/onyx/redis/redis_pool.py +++ b/backend/onyx/redis/redis_pool.py @@ -21,6 +21,7 @@ from onyx.configs.app_configs import REDIS_HOST from onyx.configs.app_configs import REDIS_PASSWORD from onyx.configs.app_configs import REDIS_POOL_MAX_CONNECTIONS from onyx.configs.app_configs import REDIS_PORT +from onyx.configs.app_configs import REDIS_REPLICA_HOST from onyx.configs.app_configs import REDIS_SSL from onyx.configs.app_configs import REDIS_SSL_CA_CERTS from onyx.configs.app_configs import REDIS_SSL_CERT_REQS @@ -132,23 +133,32 @@ class RedisPool: _instance: Optional["RedisPool"] = None _lock: threading.Lock = threading.Lock() _pool: redis.BlockingConnectionPool + _replica_pool: redis.BlockingConnectionPool def __new__(cls) -> "RedisPool": if not cls._instance: with cls._lock: if not cls._instance: cls._instance = super(RedisPool, cls).__new__(cls) - cls._instance._init_pool() + cls._instance._init_pools() return cls._instance - def _init_pool(self) -> None: + def _init_pools(self) -> None: self._pool = RedisPool.create_pool(ssl=REDIS_SSL) + self._replica_pool = RedisPool.create_pool( + host=REDIS_REPLICA_HOST, ssl=REDIS_SSL + ) def get_client(self, tenant_id: str | None) -> Redis: if tenant_id is None: tenant_id = "public" return TenantRedis(tenant_id, connection_pool=self._pool) + def get_replica_client(self, tenant_id: str | None) -> Redis: + if tenant_id is None: + tenant_id = "public" + return TenantRedis(tenant_id, connection_pool=self._replica_pool) + @staticmethod def create_pool( host: str = REDIS_HOST, @@ -212,6 +222,10 @@ def get_redis_client(*, tenant_id: str | None) -> Redis: return redis_pool.get_client(tenant_id) +def get_redis_replica_client(*, tenant_id: str | None) -> Redis: + return redis_pool.get_replica_client(tenant_id) + + SSL_CERT_REQS_MAP = { "none": ssl.CERT_NONE, "optional": ssl.CERT_OPTIONAL,