diff --git a/backend/ee/onyx/background/celery/tasks/beat_schedule.py b/backend/ee/onyx/background/celery/tasks/beat_schedule.py index 670b2ebbf..2dbda2398 100644 --- a/backend/ee/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/ee/onyx/background/celery/tasks/beat_schedule.py @@ -1,6 +1,9 @@ from datetime import timedelta from typing import Any +from onyx.background.celery.tasks.beat_schedule import ( + cloud_tasks_to_schedule as base_cloud_tasks_to_schedule, +) from onyx.background.celery.tasks.beat_schedule import ( tasks_to_schedule as base_tasks_to_schedule, ) @@ -8,7 +11,7 @@ from onyx.configs.constants import OnyxCeleryTask ee_tasks_to_schedule = [ { - "name": "autogenerate_usage_report", + "name": "autogenerate-usage-report", "task": OnyxCeleryTask.AUTOGENERATE_USAGE_REPORT_TASK, "schedule": timedelta(days=30), # TODO: change this to config flag }, @@ -20,5 +23,9 @@ ee_tasks_to_schedule = [ ] +def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]: + return base_cloud_tasks_to_schedule + + def get_tasks_to_schedule() -> list[dict[str, Any]]: return ee_tasks_to_schedule + base_tasks_to_schedule diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index 40a98f38a..7435342c4 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -20,6 +20,7 @@ from sqlalchemy.orm import Session from onyx.background.celery.apps.task_formatters import CeleryTaskColoredFormatter from onyx.background.celery.apps.task_formatters import CeleryTaskPlainFormatter from onyx.background.celery.celery_utils import celery_is_worker_primary +from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import OnyxRedisLocks from onyx.db.engine import get_sqlalchemy_engine from onyx.document_index.vespa.shared_utils.utils import get_vespa_http_client @@ -100,6 +101,10 @@ def on_task_postrun( if not task_id: return + if task.name.startswith(ONYX_CLOUD_CELERY_TASK_PREFIX): + # this is a cloud / all tenant task ... no postrun is needed + return + # Get tenant_id directly from kwargs- each celery task has a tenant_id kwarg if not kwargs: logger.error(f"Task {task.name} (ID: {task_id}) is missing kwargs") diff --git a/backend/onyx/background/celery/apps/beat.py b/backend/onyx/background/celery/apps/beat.py index 8ebf6d90c..02b24147d 100644 --- a/backend/onyx/background/celery/apps/beat.py +++ b/backend/onyx/background/celery/apps/beat.py @@ -1,5 +1,6 @@ from datetime import timedelta from typing import Any +from typing import cast from celery import Celery from celery import signals @@ -7,12 +8,14 @@ from celery.beat import PersistentScheduler # type: ignore from celery.signals import beat_init import onyx.background.celery.apps.app_base as app_base +from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import POSTGRES_CELERY_BEAT_APP_NAME from onyx.db.engine import get_all_tenant_ids from onyx.db.engine import SqlEngine from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import fetch_versioned_implementation from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST +from shared_configs.configs import MULTI_TENANT logger = setup_logger(__name__) @@ -28,7 +31,7 @@ class DynamicTenantScheduler(PersistentScheduler): self._last_reload = self.app.now() - self._reload_interval # Let the parent class handle store initialization self.setup_schedule() - self._update_tenant_tasks() + self._try_updating_schedule() logger.info(f"Set reload interval to {self._reload_interval}") def setup_schedule(self) -> None: @@ -44,105 +47,154 @@ class DynamicTenantScheduler(PersistentScheduler): or (now - self._last_reload) > self._reload_interval ): logger.info("Reload interval reached, initiating task update") - self._update_tenant_tasks() + try: + self._try_updating_schedule() + except (AttributeError, KeyError) as e: + logger.exception(f"Failed to process task configuration: {str(e)}") + except Exception as e: + logger.exception(f"Unexpected error updating tasks: {str(e)}") + self._last_reload = now logger.info("Task update completed, reset reload timer") return retval - def _update_tenant_tasks(self) -> None: - logger.info("Starting task update process") - try: - logger.info("Fetching all IDs") - tenant_ids = get_all_tenant_ids() - logger.info(f"Found {len(tenant_ids)} IDs") + def _generate_schedule( + self, tenant_ids: list[str] | list[None] + ) -> dict[str, dict[str, Any]]: + """Given a list of tenant id's, generates a new beat schedule for celery.""" + logger.info("Fetching tasks to schedule") - logger.info("Fetching tasks to schedule") - tasks_to_schedule = fetch_versioned_implementation( - "onyx.background.celery.tasks.beat_schedule", "get_tasks_to_schedule" + new_schedule: dict[str, dict[str, Any]] = {} + + if MULTI_TENANT: + # cloud tasks only need the single task beat across all tenants + get_cloud_tasks_to_schedule = fetch_versioned_implementation( + "onyx.background.celery.tasks.beat_schedule", + "get_cloud_tasks_to_schedule", ) - new_beat_schedule: dict[str, dict[str, Any]] = {} + cloud_tasks_to_schedule: list[ + dict[str, Any] + ] = get_cloud_tasks_to_schedule() + for task in cloud_tasks_to_schedule: + task_name = task["name"] + cloud_task = { + "task": task["task"], + "schedule": task["schedule"], + "kwargs": {}, + } + if options := task.get("options"): + logger.debug(f"Adding options to task {task_name}: {options}") + cloud_task["options"] = options + new_schedule[task_name] = cloud_task - current_schedule = self.schedule.items() + # regular task beats are multiplied across all tenants + get_tasks_to_schedule = fetch_versioned_implementation( + "onyx.background.celery.tasks.beat_schedule", "get_tasks_to_schedule" + ) - existing_tenants = set() - for task_name, _ in current_schedule: - if "-" in task_name: - existing_tenants.add(task_name.split("-")[-1]) - logger.info(f"Found {len(existing_tenants)} existing items in schedule") + tasks_to_schedule: list[dict[str, Any]] = get_tasks_to_schedule() - for tenant_id in tenant_ids: - if ( - IGNORED_SYNCING_TENANT_LIST - and tenant_id in IGNORED_SYNCING_TENANT_LIST - ): - logger.info( - f"Skipping tenant {tenant_id} as it is in the ignored syncing list" - ) - continue - - if tenant_id not in existing_tenants: - logger.info(f"Processing new item: {tenant_id}") - - for task in tasks_to_schedule(): - task_name = f"{task['name']}-{tenant_id}" - logger.debug(f"Creating task configuration for {task_name}") - new_task = { - "task": task["task"], - "schedule": task["schedule"], - "kwargs": {"tenant_id": tenant_id}, - } - if options := task.get("options"): - logger.debug(f"Adding options to task {task_name}: {options}") - new_task["options"] = options - new_beat_schedule[task_name] = new_task - - if self._should_update_schedule(current_schedule, new_beat_schedule): + for tenant_id in tenant_ids: + if IGNORED_SYNCING_TENANT_LIST and tenant_id in IGNORED_SYNCING_TENANT_LIST: logger.info( - "Schedule update required", - extra={ - "new_tasks": len(new_beat_schedule), - "current_tasks": len(current_schedule), - }, + f"Skipping tenant {tenant_id} as it is in the ignored syncing list" ) + continue - # Create schedule entries - entries = {} - for name, entry in new_beat_schedule.items(): - entries[name] = self.Entry( - name=name, - app=self.app, - task=entry["task"], - schedule=entry["schedule"], - options=entry.get("options", {}), - kwargs=entry.get("kwargs", {}), + for task in tasks_to_schedule: + task_name = task["name"] + tenant_task_name = f"{task['name']}-{tenant_id}" + + logger.debug(f"Creating task configuration for {tenant_task_name}") + tenant_task = { + "task": task["task"], + "schedule": task["schedule"], + "kwargs": {"tenant_id": tenant_id}, + } + if options := task.get("options"): + logger.debug( + f"Adding options to task {tenant_task_name}: {options}" ) + tenant_task["options"] = options + new_schedule[tenant_task_name] = tenant_task - # Update the schedule using the scheduler's methods - self.schedule.clear() - self.schedule.update(entries) + return new_schedule - # Ensure changes are persisted - self.sync() + def _try_updating_schedule(self) -> None: + """Only updates the actual beat schedule on the celery app when it changes""" - logger.info("Schedule update completed successfully") - else: - logger.info("Schedule is up to date, no changes needed") - except (AttributeError, KeyError) as e: - logger.exception(f"Failed to process task configuration: {str(e)}") - except Exception as e: - logger.exception(f"Unexpected error updating tasks: {str(e)}") + logger.info("_try_updating_schedule starting") - def _should_update_schedule( - self, current_schedule: dict, new_schedule: dict - ) -> bool: - """Compare schedules to determine if an update is needed.""" - logger.debug("Comparing current and new schedules") - current_tasks = set(name for name, _ in current_schedule) - new_tasks = set(new_schedule.keys()) - needs_update = current_tasks != new_tasks - logger.debug(f"Schedule update needed: {needs_update}") - return needs_update + tenant_ids = get_all_tenant_ids() + logger.info(f"Found {len(tenant_ids)} IDs") + + # get current schedule and extract current tenants + current_schedule = self.schedule.items() + + current_tenants = set() + for task_name, _ in current_schedule: + task_name = cast(str, task_name) + if task_name.startswith(ONYX_CLOUD_CELERY_TASK_PREFIX): + continue + + if "_" in task_name: + # example: "check-for-condition-tenant_12345678-abcd-efgh-ijkl-12345678" + # -> "12345678-abcd-efgh-ijkl-12345678" + current_tenants.add(task_name.split("_")[-1]) + logger.info(f"Found {len(current_tenants)} existing items in schedule") + + for tenant_id in tenant_ids: + if tenant_id not in current_tenants: + logger.info(f"Processing new tenant: {tenant_id}") + + new_schedule = self._generate_schedule(tenant_ids) + + if DynamicTenantScheduler._compare_schedules(current_schedule, new_schedule): + logger.info( + "_try_updating_schedule: Current schedule is up to date, no changes needed" + ) + return + + logger.info( + "Schedule update required", + extra={ + "new_tasks": len(new_schedule), + "current_tasks": len(current_schedule), + }, + ) + + # Create schedule entries + entries = {} + for name, entry in new_schedule.items(): + entries[name] = self.Entry( + name=name, + app=self.app, + task=entry["task"], + schedule=entry["schedule"], + options=entry.get("options", {}), + kwargs=entry.get("kwargs", {}), + ) + + # Update the schedule using the scheduler's methods + self.schedule.clear() + self.schedule.update(entries) + + # Ensure changes are persisted + self.sync() + + logger.info("_try_updating_schedule: Schedule updated successfully") + + @staticmethod + def _compare_schedules(schedule1: dict, schedule2: dict) -> bool: + """Compare schedules to determine if an update is needed. + True if equivalent, False if not.""" + current_tasks = set(name for name, _ in schedule1) + new_tasks = set(schedule2.keys()) + if current_tasks != new_tasks: + return False + + return True @beat_init.connect diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 8056e3d5e..b2ccd0874 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -17,7 +17,7 @@ from redis.lock import Lock as RedisLock import onyx.background.celery.apps.app_base as app_base from onyx.background.celery.apps.app_base import task_logger from onyx.background.celery.celery_utils import celery_is_worker_primary -from onyx.background.celery.tasks.indexing.tasks import ( +from onyx.background.celery.tasks.indexing.utils import ( get_unfenced_index_attempt_ids, ) from onyx.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT diff --git a/backend/onyx/background/celery/tasks/beat_schedule.py b/backend/onyx/background/celery/tasks/beat_schedule.py index 58e27b91e..3e7b1c45a 100644 --- a/backend/onyx/background/celery/tasks/beat_schedule.py +++ b/backend/onyx/background/celery/tasks/beat_schedule.py @@ -2,25 +2,43 @@ from datetime import timedelta from typing import Any from onyx.configs.app_configs import LLM_MODEL_UPDATE_API_URL +from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask +from shared_configs.configs import MULTI_TENANT # choosing 15 minutes because it roughly gives us enough time to process many tasks # we might be able to reduce this greatly if we can run a unified # loop across all tenants rather than tasks per tenant -BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) - # we set expires because it isn't necessary to queue up these tasks # it's only important that they run relatively regularly +BEAT_EXPIRES_DEFAULT = 15 * 60 # 15 minutes (in seconds) + +# tasks that only run in the cloud +# the name attribute must start with ONYX_CELERY_CLOUD_PREFIX = "cloud" to be filtered +# by the DynamicTenantScheduler +cloud_tasks_to_schedule = [ + { + "name": f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check-for-indexing", + "task": OnyxCeleryTask.CLOUD_CHECK_FOR_INDEXING, + "schedule": timedelta(seconds=15), + "options": { + "priority": OnyxCeleryPriority.HIGHEST, + "expires": BEAT_EXPIRES_DEFAULT, + }, + }, +] + +# tasks that run in either self-hosted on cloud tasks_to_schedule = [ { "name": "check-for-vespa-sync", "task": OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK, "schedule": timedelta(seconds=20), "options": { - "priority": OnyxCeleryPriority.HIGH, + "priority": OnyxCeleryPriority.MEDIUM, "expires": BEAT_EXPIRES_DEFAULT, }, }, @@ -29,16 +47,7 @@ tasks_to_schedule = [ "task": OnyxCeleryTask.CHECK_FOR_CONNECTOR_DELETION, "schedule": timedelta(seconds=20), "options": { - "priority": OnyxCeleryPriority.HIGH, - "expires": BEAT_EXPIRES_DEFAULT, - }, - }, - { - "name": "check-for-indexing", - "task": OnyxCeleryTask.CHECK_FOR_INDEXING, - "schedule": timedelta(seconds=15), - "options": { - "priority": OnyxCeleryPriority.HIGH, + "priority": OnyxCeleryPriority.MEDIUM, "expires": BEAT_EXPIRES_DEFAULT, }, }, @@ -47,7 +56,7 @@ tasks_to_schedule = [ "task": OnyxCeleryTask.CHECK_FOR_PRUNING, "schedule": timedelta(seconds=15), "options": { - "priority": OnyxCeleryPriority.HIGH, + "priority": OnyxCeleryPriority.MEDIUM, "expires": BEAT_EXPIRES_DEFAULT, }, }, @@ -65,7 +74,7 @@ tasks_to_schedule = [ "task": OnyxCeleryTask.MONITOR_VESPA_SYNC, "schedule": timedelta(seconds=5), "options": { - "priority": OnyxCeleryPriority.HIGH, + "priority": OnyxCeleryPriority.MEDIUM, "expires": BEAT_EXPIRES_DEFAULT, }, }, @@ -84,7 +93,7 @@ tasks_to_schedule = [ "task": OnyxCeleryTask.CHECK_FOR_DOC_PERMISSIONS_SYNC, "schedule": timedelta(seconds=30), "options": { - "priority": OnyxCeleryPriority.HIGH, + "priority": OnyxCeleryPriority.MEDIUM, "expires": BEAT_EXPIRES_DEFAULT, }, }, @@ -93,12 +102,25 @@ tasks_to_schedule = [ "task": OnyxCeleryTask.CHECK_FOR_EXTERNAL_GROUP_SYNC, "schedule": timedelta(seconds=20), "options": { - "priority": OnyxCeleryPriority.HIGH, + "priority": OnyxCeleryPriority.MEDIUM, "expires": BEAT_EXPIRES_DEFAULT, }, }, ] +if not MULTI_TENANT: + tasks_to_schedule.append( + { + "name": "check-for-indexing", + "task": OnyxCeleryTask.CHECK_FOR_INDEXING, + "schedule": timedelta(seconds=15), + "options": { + "priority": OnyxCeleryPriority.MEDIUM, + "expires": BEAT_EXPIRES_DEFAULT, + }, + } + ) + # Only add the LLM model update task if the API URL is configured if LLM_MODEL_UPDATE_API_URL: tasks_to_schedule.append( @@ -114,5 +136,9 @@ if LLM_MODEL_UPDATE_API_URL: ) +def get_cloud_tasks_to_schedule() -> list[dict[str, Any]]: + return cloud_tasks_to_schedule + + def get_tasks_to_schedule() -> list[dict[str, Any]]: return tasks_to_schedule diff --git a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py index c64fb97a0..df0bda494 100644 --- a/backend/onyx/background/celery/tasks/connector_deletion/tasks.py +++ b/backend/onyx/background/celery/tasks/connector_deletion/tasks.py @@ -10,7 +10,7 @@ from sqlalchemy.orm import Session from onyx.background.celery.apps.app_base import task_logger from onyx.configs.app_configs import JOB_TIMEOUT -from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisLocks from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id @@ -44,7 +44,7 @@ def check_for_connector_deletion_task( lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_CONNECTOR_DELETION_BEAT_LOCK, - timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, + timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, ) # these tasks should never overlap diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index 69d425bdb..5db7a4191 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -22,9 +22,9 @@ from ee.onyx.external_permissions.sync_params import ( from onyx.access.models import DocExternalAccess from onyx.background.celery.apps.app_base import task_logger from onyx.configs.app_configs import JOB_TIMEOUT +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT -from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from onyx.configs.constants import DocumentSource from onyx.configs.constants import OnyxCeleryPriority @@ -99,7 +99,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_CONNECTOR_DOC_PERMISSIONS_SYNC_BEAT_LOCK, - timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, + timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, ) # these tasks should never overlap diff --git a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py index 7ee701e07..5fed241c4 100644 --- a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py @@ -22,7 +22,7 @@ from ee.onyx.external_permissions.sync_params import ( from onyx.background.celery.apps.app_base import task_logger from onyx.configs.app_configs import JOB_TIMEOUT from onyx.configs.constants import CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT -from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues @@ -99,7 +99,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_CONNECTOR_EXTERNAL_GROUP_SYNC_BEAT_LOCK, - timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, + timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, ) # these tasks should never overlap diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index 42e63e529..bf0a82738 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -7,64 +7,49 @@ from datetime import timezone from http import HTTPStatus from time import sleep -import redis import sentry_sdk -from celery import Celery from celery import shared_task from celery import Task from celery.exceptions import SoftTimeLimitExceeded from redis import Redis -from redis.exceptions import LockError from redis.lock import Lock as RedisLock -from sqlalchemy.orm import Session from onyx.background.celery.apps.app_base import task_logger -from onyx.background.celery.celery_redis import celery_find_task -from onyx.background.celery.celery_redis import celery_get_unacked_task_ids +from onyx.background.celery.tasks.beat_schedule import BEAT_EXPIRES_DEFAULT +from onyx.background.celery.tasks.indexing.utils import _should_index +from onyx.background.celery.tasks.indexing.utils import get_unfenced_index_attempt_ids +from onyx.background.celery.tasks.indexing.utils import IndexingCallback +from onyx.background.celery.tasks.indexing.utils import try_creating_indexing_task +from onyx.background.celery.tasks.indexing.utils import validate_indexing_fences from onyx.background.indexing.job_client import SimpleJobClient from onyx.background.indexing.run_indexing import run_indexing_entrypoint -from onyx.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_INDEXING_LOCK_TIMEOUT from onyx.configs.constants import CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT -from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT -from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX -from onyx.configs.constants import DocumentSource +from onyx.configs.constants import ONYX_CLOUD_TENANT_ID from onyx.configs.constants import OnyxCeleryPriority -from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import OnyxRedisSignals from onyx.db.connector import mark_ccpair_with_indexing_trigger from onyx.db.connector_credential_pair import fetch_connector_credential_pairs from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id -from onyx.db.engine import get_db_current_time +from onyx.db.engine import get_all_tenant_ids from onyx.db.engine import get_session_with_tenant -from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.enums import IndexingMode -from onyx.db.enums import IndexingStatus -from onyx.db.enums import IndexModelStatus -from onyx.db.index_attempt import create_index_attempt -from onyx.db.index_attempt import delete_index_attempt -from onyx.db.index_attempt import get_all_index_attempts_by_status from onyx.db.index_attempt import get_index_attempt from onyx.db.index_attempt import get_last_attempt_for_cc_pair from onyx.db.index_attempt import mark_attempt_canceled from onyx.db.index_attempt import mark_attempt_failed -from onyx.db.models import ConnectorCredentialPair -from onyx.db.models import IndexAttempt from onyx.db.models import SearchSettings from onyx.db.search_settings import get_active_search_settings from onyx.db.search_settings import get_current_search_settings from onyx.db.swap_index import check_index_swap -from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface from onyx.natural_language_processing.search_nlp_models import EmbeddingModel from onyx.natural_language_processing.search_nlp_models import warm_up_bi_encoder from onyx.redis.redis_connector import RedisConnector -from onyx.redis.redis_connector_index import RedisConnectorIndex -from onyx.redis.redis_connector_index import RedisConnectorIndexPayload from onyx.redis.redis_pool import get_redis_client from onyx.redis.redis_pool import redis_lock_dump -from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import global_version from shared_configs.configs import INDEXING_MODEL_SERVER_HOST @@ -75,129 +60,6 @@ from shared_configs.configs import SENTRY_DSN logger = setup_logger() -class IndexingCallback(IndexingHeartbeatInterface): - PARENT_CHECK_INTERVAL = 60 - - def __init__( - self, - parent_pid: int, - stop_key: str, - generator_progress_key: str, - redis_lock: RedisLock, - redis_client: Redis, - ): - super().__init__() - self.parent_pid = parent_pid - self.redis_lock: RedisLock = redis_lock - self.stop_key: str = stop_key - self.generator_progress_key: str = generator_progress_key - self.redis_client = redis_client - self.started: datetime = datetime.now(timezone.utc) - self.redis_lock.reacquire() - - self.last_tag: str = "IndexingCallback.__init__" - self.last_lock_reacquire: datetime = datetime.now(timezone.utc) - self.last_lock_monotonic = time.monotonic() - - self.last_parent_check = time.monotonic() - - def should_stop(self) -> bool: - if self.redis_client.exists(self.stop_key): - return True - - return False - - def progress(self, tag: str, amount: int) -> None: - # rkuo: this shouldn't be necessary yet because we spawn the process this runs inside - # with daemon = True. It seems likely some indexing tasks will need to spawn other processes eventually - # so leave this code in until we're ready to test it. - - # if self.parent_pid: - # # check if the parent pid is alive so we aren't running as a zombie - # now = time.monotonic() - # if now - self.last_parent_check > IndexingCallback.PARENT_CHECK_INTERVAL: - # try: - # # this is unintuitive, but it checks if the parent pid is still running - # os.kill(self.parent_pid, 0) - # except Exception: - # logger.exception("IndexingCallback - parent pid check exceptioned") - # raise - # self.last_parent_check = now - - try: - current_time = time.monotonic() - if current_time - self.last_lock_monotonic >= ( - CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 - ): - self.redis_lock.reacquire() - self.last_lock_reacquire = datetime.now(timezone.utc) - self.last_lock_monotonic = time.monotonic() - - self.last_tag = tag - except LockError: - logger.exception( - f"IndexingCallback - lock.reacquire exceptioned: " - f"lock_timeout={self.redis_lock.timeout} " - f"start={self.started} " - f"last_tag={self.last_tag} " - f"last_reacquired={self.last_lock_reacquire} " - f"now={datetime.now(timezone.utc)}" - ) - - redis_lock_dump(self.redis_lock, self.redis_client) - raise - - self.redis_client.incrby(self.generator_progress_key, amount) - - -def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[int]: - """Gets a list of unfenced index attempts. Should not be possible, so we'd typically - want to clean them up. - - Unfenced = attempt not in terminal state and fence does not exist. - """ - unfenced_attempts: list[int] = [] - - # inner/outer/inner double check pattern to avoid race conditions when checking for - # bad state - # inner = index_attempt in non terminal state - # outer = r.fence_key down - - # check the db for index attempts in a non terminal state - attempts: list[IndexAttempt] = [] - attempts.extend( - get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session) - ) - attempts.extend( - get_all_index_attempts_by_status(IndexingStatus.IN_PROGRESS, db_session) - ) - - for attempt in attempts: - fence_key = RedisConnectorIndex.fence_key_with_ids( - attempt.connector_credential_pair_id, attempt.search_settings_id - ) - - # if the fence is down / doesn't exist, possible error but not confirmed - if r.exists(fence_key): - continue - - # Between the time the attempts are first looked up and the time we see the fence down, - # the attempt may have completed and taken down the fence normally. - - # We need to double check that the index attempt is still in a non terminal state - # and matches the original state, which confirms we are really in a bad state. - attempt_2 = get_index_attempt(db_session, attempt.id) - if not attempt_2: - continue - - if attempt.status != attempt_2.status: - continue - - unfenced_attempts.append(attempt.id) - - return unfenced_attempts - - @shared_task( name=OnyxCeleryTask.CHECK_FOR_INDEXING, soft_time_limit=300, @@ -222,7 +84,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: lock_beat: RedisLock = redis_client.lock( OnyxRedisLocks.CHECK_INDEXING_BEAT_LOCK, - timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, + timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, ) # these tasks should never overlap @@ -496,573 +358,10 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None: redis_lock_dump(lock_beat, redis_client) time_elapsed = time.monotonic() - time_start - task_logger.debug(f"check_for_indexing finished: elapsed={time_elapsed:.2f}") + task_logger.info(f"check_for_indexing finished: elapsed={time_elapsed:.2f}") return tasks_created -def validate_indexing_fences( - tenant_id: str | None, - celery_app: Celery, - r: Redis, - r_celery: Redis, - lock_beat: RedisLock, -) -> None: - reserved_indexing_tasks = celery_get_unacked_task_ids( - OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery - ) - - # validate all existing indexing jobs - for key_bytes in r.scan_iter( - RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT - ): - lock_beat.reacquire() - with get_session_with_tenant(tenant_id) as db_session: - validate_indexing_fence( - tenant_id, - key_bytes, - reserved_indexing_tasks, - r_celery, - db_session, - ) - return - - -def validate_indexing_fence( - tenant_id: str | None, - key_bytes: bytes, - reserved_tasks: set[str], - r_celery: Redis, - db_session: Session, -) -> None: - """Checks for the error condition where an indexing fence is set but the associated celery tasks don't exist. - This can happen if the indexing worker hard crashes or is terminated. - Being in this bad state means the fence will never clear without help, so this function - gives the help. - - How this works: - 1. This function renews the active signal with a 5 minute TTL under the following conditions - 1.2. When the task is seen in the redis queue - 1.3. When the task is seen in the reserved / prefetched list - - 2. Externally, the active signal is renewed when: - 2.1. The fence is created - 2.2. The indexing watchdog checks the spawned task. - - 3. The TTL allows us to get through the transitions on fence startup - and when the task starts executing. - - More TTL clarification: it is seemingly impossible to exactly query Celery for - whether a task is in the queue or currently executing. - 1. An unknown task id is always returned as state PENDING. - 2. Redis can be inspected for the task id, but the task id is gone between the time a worker receives the task - and the time it actually starts on the worker. - """ - # if the fence doesn't exist, there's nothing to do - fence_key = key_bytes.decode("utf-8") - composite_id = RedisConnector.get_id_from_fence_key(fence_key) - if composite_id is None: - task_logger.warning( - f"validate_indexing_fence - could not parse composite_id from {fence_key}" - ) - return - - # parse out metadata and initialize the helper class with it - parts = composite_id.split("/") - if len(parts) != 2: - return - - cc_pair_id = int(parts[0]) - search_settings_id = int(parts[1]) - - redis_connector = RedisConnector(tenant_id, cc_pair_id) - redis_connector_index = redis_connector.new_index(search_settings_id) - - # check to see if the fence/payload exists - if not redis_connector_index.fenced: - return - - payload = redis_connector_index.payload - if not payload: - return - - # OK, there's actually something for us to validate - - if payload.celery_task_id is None: - # the fence is just barely set up. - if redis_connector_index.active(): - return - - # it would be odd to get here as there isn't that much that can go wrong during - # initial fence setup, but it's still worth making sure we can recover - logger.info( - f"validate_indexing_fence - Resetting fence in basic state without any activity: fence={fence_key}" - ) - redis_connector_index.reset() - return - - found = celery_find_task( - payload.celery_task_id, OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery - ) - if found: - # the celery task exists in the redis queue - redis_connector_index.set_active() - return - - if payload.celery_task_id in reserved_tasks: - # the celery task was prefetched and is reserved within the indexing worker - redis_connector_index.set_active() - return - - # we may want to enable this check if using the active task list somehow isn't good enough - # if redis_connector_index.generator_locked(): - # logger.info(f"{payload.celery_task_id} is currently executing.") - - # if we get here, we didn't find any direct indication that the associated celery tasks exist, - # but they still might be there due to gaps in our ability to check states during transitions - # Checking the active signal safeguards us against these transition periods - # (which has a duration that allows us to bridge those gaps) - if redis_connector_index.active(): - return - - # celery tasks don't exist and the active signal has expired, possibly due to a crash. Clean it up. - logger.warning( - f"validate_indexing_fence - Resetting fence because no associated celery tasks were found: " - f"index_attempt={payload.index_attempt_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id} " - f"fence={fence_key}" - ) - if payload.index_attempt_id: - try: - mark_attempt_failed( - payload.index_attempt_id, - db_session, - f"validate_indexing_fence - Canceling index attempt due to missing celery tasks: " - f"index_attempt={payload.index_attempt_id}", - ) - except Exception: - logger.exception( - "validate_indexing_fence - Exception while marking index attempt as failed." - ) - - redis_connector_index.reset() - return - - -def _should_index( - cc_pair: ConnectorCredentialPair, - last_index: IndexAttempt | None, - search_settings_instance: SearchSettings, - search_settings_primary: bool, - secondary_index_building: bool, - db_session: Session, -) -> bool: - """Checks various global settings and past indexing attempts to determine if - we should try to start indexing the cc pair / search setting combination. - - Note that tactical checks such as preventing overlap with a currently running task - are not handled here. - - Return True if we should try to index, False if not. - """ - connector = cc_pair.connector - - # uncomment for debugging - # task_logger.info(f"_should_index: " - # f"cc_pair={cc_pair.id} " - # f"connector={cc_pair.connector_id} " - # f"refresh_freq={connector.refresh_freq}") - - # don't kick off indexing for `NOT_APPLICABLE` sources - if connector.source == DocumentSource.NOT_APPLICABLE: - return False - - # User can still manually create single indexing attempts via the UI for the - # currently in use index - if DISABLE_INDEX_UPDATE_ON_SWAP: - if ( - search_settings_instance.status == IndexModelStatus.PRESENT - and secondary_index_building - ): - return False - - # When switching over models, always index at least once - if search_settings_instance.status == IndexModelStatus.FUTURE: - if last_index: - # No new index if the last index attempt succeeded - # Once is enough. The model will never be able to swap otherwise. - if last_index.status == IndexingStatus.SUCCESS: - return False - - # No new index if the last index attempt is waiting to start - if last_index.status == IndexingStatus.NOT_STARTED: - return False - - # No new index if the last index attempt is running - if last_index.status == IndexingStatus.IN_PROGRESS: - return False - else: - if ( - connector.id == 0 or connector.source == DocumentSource.INGESTION_API - ): # Ingestion API - return False - return True - - # If the connector is paused or is the ingestion API, don't index - # NOTE: during an embedding model switch over, the following logic - # is bypassed by the above check for a future model - if ( - not cc_pair.status.is_active() - or connector.id == 0 - or connector.source == DocumentSource.INGESTION_API - ): - return False - - if search_settings_primary: - if cc_pair.indexing_trigger is not None: - # if a manual indexing trigger is on the cc pair, honor it for primary search settings - return True - - # if no attempt has ever occurred, we should index regardless of refresh_freq - if not last_index: - return True - - if connector.refresh_freq is None: - return False - - current_db_time = get_db_current_time(db_session) - time_since_index = current_db_time - last_index.time_updated - if time_since_index.total_seconds() < connector.refresh_freq: - return False - - return True - - -def try_creating_indexing_task( - celery_app: Celery, - cc_pair: ConnectorCredentialPair, - search_settings: SearchSettings, - reindex: bool, - db_session: Session, - r: Redis, - tenant_id: str | None, -) -> int | None: - """Checks for any conditions that should block the indexing task from being - created, then creates the task. - - Does not check for scheduling related conditions as this function - is used to trigger indexing immediately. - """ - - LOCK_TIMEOUT = 30 - index_attempt_id: int | None = None - - # we need to serialize any attempt to trigger indexing since it can be triggered - # either via celery beat or manually (API call) - lock: RedisLock = r.lock( - DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_creating_indexing_task", - timeout=LOCK_TIMEOUT, - ) - - acquired = lock.acquire(blocking_timeout=LOCK_TIMEOUT / 2) - if not acquired: - return None - - try: - redis_connector = RedisConnector(tenant_id, cc_pair.id) - redis_connector_index = redis_connector.new_index(search_settings.id) - - # skip if already indexing - if redis_connector_index.fenced: - return None - - # skip indexing if the cc_pair is deleting - if redis_connector.delete.fenced: - return None - - db_session.refresh(cc_pair) - if cc_pair.status == ConnectorCredentialPairStatus.DELETING: - return None - - # add a long running generator task to the queue - redis_connector_index.generator_clear() - - # set a basic fence to start - payload = RedisConnectorIndexPayload( - index_attempt_id=None, - started=None, - submitted=datetime.now(timezone.utc), - celery_task_id=None, - ) - - redis_connector_index.set_active() - redis_connector_index.set_fence(payload) - - # create the index attempt for tracking purposes - # code elsewhere checks for index attempts without an associated redis key - # and cleans them up - # therefore we must create the attempt and the task after the fence goes up - index_attempt_id = create_index_attempt( - cc_pair.id, - search_settings.id, - from_beginning=reindex, - db_session=db_session, - ) - - custom_task_id = redis_connector_index.generate_generator_task_id() - - # when the task is sent, we have yet to finish setting up the fence - # therefore, the task must contain code that blocks until the fence is ready - result = celery_app.send_task( - OnyxCeleryTask.CONNECTOR_INDEXING_PROXY_TASK, - kwargs=dict( - index_attempt_id=index_attempt_id, - cc_pair_id=cc_pair.id, - search_settings_id=search_settings.id, - tenant_id=tenant_id, - ), - queue=OnyxCeleryQueues.CONNECTOR_INDEXING, - task_id=custom_task_id, - priority=OnyxCeleryPriority.MEDIUM, - ) - if not result: - raise RuntimeError("send_task for connector_indexing_proxy_task failed.") - - # now fill out the fence with the rest of the data - redis_connector_index.set_active() - - payload.index_attempt_id = index_attempt_id - payload.celery_task_id = result.id - redis_connector_index.set_fence(payload) - except Exception: - task_logger.exception( - f"try_creating_indexing_task - Unexpected exception: " - f"cc_pair={cc_pair.id} " - f"search_settings={search_settings.id}" - ) - - if index_attempt_id is not None: - delete_index_attempt(db_session, index_attempt_id) - redis_connector_index.set_fence(None) - return None - finally: - if lock.owned(): - lock.release() - - return index_attempt_id - - -@shared_task( - name=OnyxCeleryTask.CONNECTOR_INDEXING_PROXY_TASK, - bind=True, - acks_late=False, - track_started=True, -) -def connector_indexing_proxy_task( - self: Task, - index_attempt_id: int, - cc_pair_id: int, - search_settings_id: int, - tenant_id: str | None, -) -> None: - """celery tasks are forked, but forking is unstable. - This is a thread that proxies work to a spawned task.""" - - task_logger.info( - f"Indexing watchdog - starting: attempt={index_attempt_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id} " - f"mp_start_method={multiprocessing.get_start_method()}" - ) - - if not self.request.id: - task_logger.error("self.request.id is None!") - - client = SimpleJobClient() - - job = client.submit( - connector_indexing_task_wrapper, - index_attempt_id, - cc_pair_id, - search_settings_id, - tenant_id, - global_version.is_ee_version(), - pure=False, - ) - - if not job: - task_logger.info( - f"Indexing watchdog - spawn failed: attempt={index_attempt_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) - return - - task_logger.info( - f"Indexing watchdog - spawn succeeded: attempt={index_attempt_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) - - redis_connector = RedisConnector(tenant_id, cc_pair_id) - redis_connector_index = redis_connector.new_index(search_settings_id) - - while True: - sleep(5) - - # renew active signal - redis_connector_index.set_active() - - # if the job is done, clean up and break - if job.done(): - try: - if job.status == "error": - ignore_exitcode = False - - exit_code: int | None = None - if job.process: - exit_code = job.process.exitcode - - # seeing odd behavior where spawned tasks usually return exit code 1 in the cloud, - # even though logging clearly indicates successful completion - # to work around this, we ignore the job error state if the completion signal is OK - status_int = redis_connector_index.get_completion() - if status_int: - status_enum = HTTPStatus(status_int) - if status_enum == HTTPStatus.OK: - ignore_exitcode = True - - if not ignore_exitcode: - raise RuntimeError("Spawned task exceptioned.") - - task_logger.warning( - "Indexing watchdog - spawned task has non-zero exit code " - "but completion signal is OK. Continuing...: " - f"attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id} " - f"exit_code={exit_code}" - ) - except Exception: - task_logger.error( - "Indexing watchdog - spawned task exceptioned: " - f"attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id} " - f"exit_code={exit_code} " - f"error={job.exception()}" - ) - - raise - finally: - job.release() - - break - - # if a termination signal is detected, clean up and break - if self.request.id and redis_connector_index.terminating(self.request.id): - task_logger.warning( - "Indexing watchdog - termination signal detected: " - f"attempt={index_attempt_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) - - try: - with get_session_with_tenant(tenant_id) as db_session: - mark_attempt_canceled( - index_attempt_id, - db_session, - "Connector termination signal detected", - ) - except Exception: - # if the DB exceptions, we'll just get an unfriendly failure message - # in the UI instead of the cancellation message - logger.exception( - "Indexing watchdog - transient exception marking index attempt as canceled: " - f"attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) - - job.cancel() - break - - # if the spawned task is still running, restart the check once again - # if the index attempt is not in a finished status - try: - with get_session_with_tenant(tenant_id) as db_session: - index_attempt = get_index_attempt( - db_session=db_session, index_attempt_id=index_attempt_id - ) - - if not index_attempt: - continue - - if not index_attempt.is_finished(): - continue - except Exception: - # if the DB exceptioned, just restart the check. - # polling the index attempt status doesn't need to be strongly consistent - logger.exception( - "Indexing watchdog - transient exception looking up index attempt: " - f"attempt={index_attempt_id} " - f"tenant={tenant_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) - continue - - task_logger.info( - f"Indexing watchdog - finished: attempt={index_attempt_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) - return - - -def connector_indexing_task_wrapper( - index_attempt_id: int, - cc_pair_id: int, - search_settings_id: int, - tenant_id: str | None, - is_ee: bool, -) -> int | None: - """Just wraps connector_indexing_task so we can log any exceptions before - re-raising it.""" - result: int | None = None - - try: - result = connector_indexing_task( - index_attempt_id, - cc_pair_id, - search_settings_id, - tenant_id, - is_ee, - ) - except Exception: - logger.exception( - f"connector_indexing_task exceptioned: " - f"tenant={tenant_id} " - f"index_attempt={index_attempt_id} " - f"cc_pair={cc_pair_id} " - f"search_settings={search_settings_id}" - ) - - # There is a cloud related bug outside of our code - # where spawned tasks return with an exit code of 1. - # Unfortunately, exceptions also return with an exit code of 1, - # so just raising an exception isn't informative - # Exiting with 255 makes it possible to distinguish between normal exits - # and exceptions. - sys.exit(255) - - return result - - def connector_indexing_task( index_attempt_id: int, cc_pair_id: int, @@ -1278,3 +577,270 @@ def connector_indexing_task( f"search_settings={search_settings_id}" ) return n_final_progress + + +def connector_indexing_task_wrapper( + index_attempt_id: int, + cc_pair_id: int, + search_settings_id: int, + tenant_id: str | None, + is_ee: bool, +) -> int | None: + """Just wraps connector_indexing_task so we can log any exceptions before + re-raising it.""" + result: int | None = None + + try: + result = connector_indexing_task( + index_attempt_id, + cc_pair_id, + search_settings_id, + tenant_id, + is_ee, + ) + except Exception: + logger.exception( + f"connector_indexing_task exceptioned: " + f"tenant={tenant_id} " + f"index_attempt={index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + + # There is a cloud related bug outside of our code + # where spawned tasks return with an exit code of 1. + # Unfortunately, exceptions also return with an exit code of 1, + # so just raising an exception isn't informative + # Exiting with 255 makes it possible to distinguish between normal exits + # and exceptions. + sys.exit(255) + + return result + + +@shared_task( + name=OnyxCeleryTask.CONNECTOR_INDEXING_PROXY_TASK, + bind=True, + acks_late=False, + track_started=True, +) +def connector_indexing_proxy_task( + self: Task, + index_attempt_id: int, + cc_pair_id: int, + search_settings_id: int, + tenant_id: str | None, +) -> None: + """celery tasks are forked, but forking is unstable. This proxies work to a spawned task.""" + task_logger.info( + f"Indexing watchdog - starting: attempt={index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"mp_start_method={multiprocessing.get_start_method()}" + ) + + if not self.request.id: + task_logger.error("self.request.id is None!") + + client = SimpleJobClient() + + job = client.submit( + connector_indexing_task_wrapper, + index_attempt_id, + cc_pair_id, + search_settings_id, + tenant_id, + global_version.is_ee_version(), + pure=False, + ) + + if not job: + task_logger.info( + f"Indexing watchdog - spawn failed: attempt={index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + return + + task_logger.info( + f"Indexing watchdog - spawn succeeded: attempt={index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + + redis_connector = RedisConnector(tenant_id, cc_pair_id) + redis_connector_index = redis_connector.new_index(search_settings_id) + + while True: + sleep(5) + + # renew active signal + redis_connector_index.set_active() + + # if the job is done, clean up and break + if job.done(): + try: + if job.status == "error": + ignore_exitcode = False + + exit_code: int | None = None + if job.process: + exit_code = job.process.exitcode + + # seeing odd behavior where spawned tasks usually return exit code 1 in the cloud, + # even though logging clearly indicates successful completion + # to work around this, we ignore the job error state if the completion signal is OK + status_int = redis_connector_index.get_completion() + if status_int: + status_enum = HTTPStatus(status_int) + if status_enum == HTTPStatus.OK: + ignore_exitcode = True + + if not ignore_exitcode: + raise RuntimeError("Spawned task exceptioned.") + + task_logger.warning( + "Indexing watchdog - spawned task has non-zero exit code " + "but completion signal is OK. Continuing...: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"exit_code={exit_code}" + ) + except Exception: + task_logger.error( + "Indexing watchdog - spawned task exceptioned: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"exit_code={exit_code} " + f"error={job.exception()}" + ) + + raise + finally: + job.release() + + break + + # if a termination signal is detected, clean up and break + if self.request.id and redis_connector_index.terminating(self.request.id): + task_logger.warning( + "Indexing watchdog - termination signal detected: " + f"attempt={index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + + try: + with get_session_with_tenant(tenant_id) as db_session: + mark_attempt_canceled( + index_attempt_id, + db_session, + "Connector termination signal detected", + ) + except Exception: + # if the DB exceptions, we'll just get an unfriendly failure message + # in the UI instead of the cancellation message + logger.exception( + "Indexing watchdog - transient exception marking index attempt as canceled: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + + job.cancel() + break + + # if the spawned task is still running, restart the check once again + # if the index attempt is not in a finished status + try: + with get_session_with_tenant(tenant_id) as db_session: + index_attempt = get_index_attempt( + db_session=db_session, index_attempt_id=index_attempt_id + ) + + if not index_attempt: + continue + + if not index_attempt.is_finished(): + continue + except Exception: + # if the DB exceptioned, just restart the check. + # polling the index attempt status doesn't need to be strongly consistent + logger.exception( + "Indexing watchdog - transient exception looking up index attempt: " + f"attempt={index_attempt_id} " + f"tenant={tenant_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + continue + + task_logger.info( + f"Indexing watchdog - finished: attempt={index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id}" + ) + return + + +@shared_task( + name=OnyxCeleryTask.CLOUD_CHECK_FOR_INDEXING, + trail=False, + bind=True, +) +def cloud_check_for_indexing(self: Task) -> bool | None: + """a lightweight task used to kick off individual check tasks for each tenant.""" + time_start = time.monotonic() + + redis_client = get_redis_client(tenant_id=ONYX_CLOUD_TENANT_ID) + + lock_beat: RedisLock = redis_client.lock( + OnyxRedisLocks.CLOUD_CHECK_INDEXING_BEAT_LOCK, + timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, + ) + + # these tasks should never overlap + if not lock_beat.acquire(blocking=False): + return None + + last_lock_time = time.monotonic() + + try: + tenant_ids = get_all_tenant_ids() + for tenant_id in tenant_ids: + current_time = time.monotonic() + if current_time - last_lock_time >= (CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4): + lock_beat.reacquire() + last_lock_time = current_time + + self.app.send_task( + OnyxCeleryTask.CHECK_FOR_INDEXING, + kwargs=dict( + tenant_id=tenant_id, + ), + priority=OnyxCeleryPriority.HIGH, + expires=BEAT_EXPIRES_DEFAULT, + ) + except SoftTimeLimitExceeded: + task_logger.info( + "Soft time limit exceeded, task is being terminated gracefully." + ) + except Exception: + task_logger.exception("Unexpected exception during cloud indexing check") + finally: + if lock_beat.owned(): + lock_beat.release() + else: + task_logger.error("cloud_check_for_indexing - Lock not owned on completion") + redis_lock_dump(lock_beat, redis_client) + + time_elapsed = time.monotonic() - time_start + task_logger.info( + f"cloud_check_for_indexing finished: num_tenants={len(tenant_ids)} elapsed={time_elapsed:.2f}" + ) + return True diff --git a/backend/onyx/background/celery/tasks/indexing/utils.py b/backend/onyx/background/celery/tasks/indexing/utils.py new file mode 100644 index 000000000..2ef185646 --- /dev/null +++ b/backend/onyx/background/celery/tasks/indexing/utils.py @@ -0,0 +1,519 @@ +import time +from datetime import datetime +from datetime import timezone + +import redis +from celery import Celery +from redis import Redis +from redis.exceptions import LockError +from redis.lock import Lock as RedisLock +from sqlalchemy.orm import Session + +from onyx.background.celery.apps.app_base import task_logger +from onyx.background.celery.celery_redis import celery_find_task +from onyx.background.celery.celery_redis import celery_get_unacked_task_ids +from onyx.configs.app_configs import DISABLE_INDEX_UPDATE_ON_SWAP +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT +from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX +from onyx.configs.constants import DocumentSource +from onyx.configs.constants import OnyxCeleryPriority +from onyx.configs.constants import OnyxCeleryQueues +from onyx.configs.constants import OnyxCeleryTask +from onyx.db.engine import get_db_current_time +from onyx.db.engine import get_session_with_tenant +from onyx.db.enums import ConnectorCredentialPairStatus +from onyx.db.enums import IndexingStatus +from onyx.db.enums import IndexModelStatus +from onyx.db.index_attempt import create_index_attempt +from onyx.db.index_attempt import delete_index_attempt +from onyx.db.index_attempt import get_all_index_attempts_by_status +from onyx.db.index_attempt import get_index_attempt +from onyx.db.index_attempt import mark_attempt_failed +from onyx.db.models import ConnectorCredentialPair +from onyx.db.models import IndexAttempt +from onyx.db.models import SearchSettings +from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface +from onyx.redis.redis_connector import RedisConnector +from onyx.redis.redis_connector_index import RedisConnectorIndex +from onyx.redis.redis_connector_index import RedisConnectorIndexPayload +from onyx.redis.redis_pool import redis_lock_dump +from onyx.redis.redis_pool import SCAN_ITER_COUNT_DEFAULT +from onyx.utils.logger import setup_logger + +logger = setup_logger() + + +def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[int]: + """Gets a list of unfenced index attempts. Should not be possible, so we'd typically + want to clean them up. + + Unfenced = attempt not in terminal state and fence does not exist. + """ + unfenced_attempts: list[int] = [] + + # inner/outer/inner double check pattern to avoid race conditions when checking for + # bad state + # inner = index_attempt in non terminal state + # outer = r.fence_key down + + # check the db for index attempts in a non terminal state + attempts: list[IndexAttempt] = [] + attempts.extend( + get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session) + ) + attempts.extend( + get_all_index_attempts_by_status(IndexingStatus.IN_PROGRESS, db_session) + ) + + for attempt in attempts: + fence_key = RedisConnectorIndex.fence_key_with_ids( + attempt.connector_credential_pair_id, attempt.search_settings_id + ) + + # if the fence is down / doesn't exist, possible error but not confirmed + if r.exists(fence_key): + continue + + # Between the time the attempts are first looked up and the time we see the fence down, + # the attempt may have completed and taken down the fence normally. + + # We need to double check that the index attempt is still in a non terminal state + # and matches the original state, which confirms we are really in a bad state. + attempt_2 = get_index_attempt(db_session, attempt.id) + if not attempt_2: + continue + + if attempt.status != attempt_2.status: + continue + + unfenced_attempts.append(attempt.id) + + return unfenced_attempts + + +class IndexingCallback(IndexingHeartbeatInterface): + PARENT_CHECK_INTERVAL = 60 + + def __init__( + self, + parent_pid: int, + stop_key: str, + generator_progress_key: str, + redis_lock: RedisLock, + redis_client: Redis, + ): + super().__init__() + self.parent_pid = parent_pid + self.redis_lock: RedisLock = redis_lock + self.stop_key: str = stop_key + self.generator_progress_key: str = generator_progress_key + self.redis_client = redis_client + self.started: datetime = datetime.now(timezone.utc) + self.redis_lock.reacquire() + + self.last_tag: str = "IndexingCallback.__init__" + self.last_lock_reacquire: datetime = datetime.now(timezone.utc) + self.last_lock_monotonic = time.monotonic() + + self.last_parent_check = time.monotonic() + + def should_stop(self) -> bool: + if self.redis_client.exists(self.stop_key): + return True + + return False + + def progress(self, tag: str, amount: int) -> None: + # rkuo: this shouldn't be necessary yet because we spawn the process this runs inside + # with daemon = True. It seems likely some indexing tasks will need to spawn other processes eventually + # so leave this code in until we're ready to test it. + + # if self.parent_pid: + # # check if the parent pid is alive so we aren't running as a zombie + # now = time.monotonic() + # if now - self.last_parent_check > IndexingCallback.PARENT_CHECK_INTERVAL: + # try: + # # this is unintuitive, but it checks if the parent pid is still running + # os.kill(self.parent_pid, 0) + # except Exception: + # logger.exception("IndexingCallback - parent pid check exceptioned") + # raise + # self.last_parent_check = now + + try: + current_time = time.monotonic() + if current_time - self.last_lock_monotonic >= ( + CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4 + ): + self.redis_lock.reacquire() + self.last_lock_reacquire = datetime.now(timezone.utc) + self.last_lock_monotonic = time.monotonic() + + self.last_tag = tag + except LockError: + logger.exception( + f"IndexingCallback - lock.reacquire exceptioned: " + f"lock_timeout={self.redis_lock.timeout} " + f"start={self.started} " + f"last_tag={self.last_tag} " + f"last_reacquired={self.last_lock_reacquire} " + f"now={datetime.now(timezone.utc)}" + ) + + redis_lock_dump(self.redis_lock, self.redis_client) + raise + + self.redis_client.incrby(self.generator_progress_key, amount) + + +def validate_indexing_fence( + tenant_id: str | None, + key_bytes: bytes, + reserved_tasks: set[str], + r_celery: Redis, + db_session: Session, +) -> None: + """Checks for the error condition where an indexing fence is set but the associated celery tasks don't exist. + This can happen if the indexing worker hard crashes or is terminated. + Being in this bad state means the fence will never clear without help, so this function + gives the help. + + How this works: + 1. This function renews the active signal with a 5 minute TTL under the following conditions + 1.2. When the task is seen in the redis queue + 1.3. When the task is seen in the reserved / prefetched list + + 2. Externally, the active signal is renewed when: + 2.1. The fence is created + 2.2. The indexing watchdog checks the spawned task. + + 3. The TTL allows us to get through the transitions on fence startup + and when the task starts executing. + + More TTL clarification: it is seemingly impossible to exactly query Celery for + whether a task is in the queue or currently executing. + 1. An unknown task id is always returned as state PENDING. + 2. Redis can be inspected for the task id, but the task id is gone between the time a worker receives the task + and the time it actually starts on the worker. + """ + # if the fence doesn't exist, there's nothing to do + fence_key = key_bytes.decode("utf-8") + composite_id = RedisConnector.get_id_from_fence_key(fence_key) + if composite_id is None: + task_logger.warning( + f"validate_indexing_fence - could not parse composite_id from {fence_key}" + ) + return + + # parse out metadata and initialize the helper class with it + parts = composite_id.split("/") + if len(parts) != 2: + return + + cc_pair_id = int(parts[0]) + search_settings_id = int(parts[1]) + + redis_connector = RedisConnector(tenant_id, cc_pair_id) + redis_connector_index = redis_connector.new_index(search_settings_id) + + # check to see if the fence/payload exists + if not redis_connector_index.fenced: + return + + payload = redis_connector_index.payload + if not payload: + return + + # OK, there's actually something for us to validate + + if payload.celery_task_id is None: + # the fence is just barely set up. + if redis_connector_index.active(): + return + + # it would be odd to get here as there isn't that much that can go wrong during + # initial fence setup, but it's still worth making sure we can recover + logger.info( + f"validate_indexing_fence - Resetting fence in basic state without any activity: fence={fence_key}" + ) + redis_connector_index.reset() + return + + found = celery_find_task( + payload.celery_task_id, OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery + ) + if found: + # the celery task exists in the redis queue + redis_connector_index.set_active() + return + + if payload.celery_task_id in reserved_tasks: + # the celery task was prefetched and is reserved within the indexing worker + redis_connector_index.set_active() + return + + # we may want to enable this check if using the active task list somehow isn't good enough + # if redis_connector_index.generator_locked(): + # logger.info(f"{payload.celery_task_id} is currently executing.") + + # if we get here, we didn't find any direct indication that the associated celery tasks exist, + # but they still might be there due to gaps in our ability to check states during transitions + # Checking the active signal safeguards us against these transition periods + # (which has a duration that allows us to bridge those gaps) + if redis_connector_index.active(): + return + + # celery tasks don't exist and the active signal has expired, possibly due to a crash. Clean it up. + logger.warning( + f"validate_indexing_fence - Resetting fence because no associated celery tasks were found: " + f"index_attempt={payload.index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"fence={fence_key}" + ) + if payload.index_attempt_id: + try: + mark_attempt_failed( + payload.index_attempt_id, + db_session, + "validate_indexing_fence - Canceling index attempt due to missing celery tasks: " + f"index_attempt={payload.index_attempt_id}", + ) + except Exception: + logger.exception( + "validate_indexing_fence - Exception while marking index attempt as failed: " + f"index_attempt={payload.index_attempt_id}", + ) + + redis_connector_index.reset() + return + + +def validate_indexing_fences( + tenant_id: str | None, + celery_app: Celery, + r: Redis, + r_celery: Redis, + lock_beat: RedisLock, +) -> None: + reserved_indexing_tasks = celery_get_unacked_task_ids( + OnyxCeleryQueues.CONNECTOR_INDEXING, r_celery + ) + + # validate all existing indexing jobs + for key_bytes in r.scan_iter( + RedisConnectorIndex.FENCE_PREFIX + "*", count=SCAN_ITER_COUNT_DEFAULT + ): + lock_beat.reacquire() + with get_session_with_tenant(tenant_id) as db_session: + validate_indexing_fence( + tenant_id, + key_bytes, + reserved_indexing_tasks, + r_celery, + db_session, + ) + return + + +def _should_index( + cc_pair: ConnectorCredentialPair, + last_index: IndexAttempt | None, + search_settings_instance: SearchSettings, + search_settings_primary: bool, + secondary_index_building: bool, + db_session: Session, +) -> bool: + """Checks various global settings and past indexing attempts to determine if + we should try to start indexing the cc pair / search setting combination. + + Note that tactical checks such as preventing overlap with a currently running task + are not handled here. + + Return True if we should try to index, False if not. + """ + connector = cc_pair.connector + + # uncomment for debugging + # task_logger.info(f"_should_index: " + # f"cc_pair={cc_pair.id} " + # f"connector={cc_pair.connector_id} " + # f"refresh_freq={connector.refresh_freq}") + + # don't kick off indexing for `NOT_APPLICABLE` sources + if connector.source == DocumentSource.NOT_APPLICABLE: + return False + + # User can still manually create single indexing attempts via the UI for the + # currently in use index + if DISABLE_INDEX_UPDATE_ON_SWAP: + if ( + search_settings_instance.status == IndexModelStatus.PRESENT + and secondary_index_building + ): + return False + + # When switching over models, always index at least once + if search_settings_instance.status == IndexModelStatus.FUTURE: + if last_index: + # No new index if the last index attempt succeeded + # Once is enough. The model will never be able to swap otherwise. + if last_index.status == IndexingStatus.SUCCESS: + return False + + # No new index if the last index attempt is waiting to start + if last_index.status == IndexingStatus.NOT_STARTED: + return False + + # No new index if the last index attempt is running + if last_index.status == IndexingStatus.IN_PROGRESS: + return False + else: + if ( + connector.id == 0 or connector.source == DocumentSource.INGESTION_API + ): # Ingestion API + return False + return True + + # If the connector is paused or is the ingestion API, don't index + # NOTE: during an embedding model switch over, the following logic + # is bypassed by the above check for a future model + if ( + not cc_pair.status.is_active() + or connector.id == 0 + or connector.source == DocumentSource.INGESTION_API + ): + return False + + if search_settings_primary: + if cc_pair.indexing_trigger is not None: + # if a manual indexing trigger is on the cc pair, honor it for primary search settings + return True + + # if no attempt has ever occurred, we should index regardless of refresh_freq + if not last_index: + return True + + if connector.refresh_freq is None: + return False + + current_db_time = get_db_current_time(db_session) + time_since_index = current_db_time - last_index.time_updated + if time_since_index.total_seconds() < connector.refresh_freq: + return False + + return True + + +def try_creating_indexing_task( + celery_app: Celery, + cc_pair: ConnectorCredentialPair, + search_settings: SearchSettings, + reindex: bool, + db_session: Session, + r: Redis, + tenant_id: str | None, +) -> int | None: + """Checks for any conditions that should block the indexing task from being + created, then creates the task. + + Does not check for scheduling related conditions as this function + is used to trigger indexing immediately. + """ + + LOCK_TIMEOUT = 30 + index_attempt_id: int | None = None + + # we need to serialize any attempt to trigger indexing since it can be triggered + # either via celery beat or manually (API call) + lock: RedisLock = r.lock( + DANSWER_REDIS_FUNCTION_LOCK_PREFIX + "try_creating_indexing_task", + timeout=LOCK_TIMEOUT, + ) + + acquired = lock.acquire(blocking_timeout=LOCK_TIMEOUT / 2) + if not acquired: + return None + + try: + redis_connector = RedisConnector(tenant_id, cc_pair.id) + redis_connector_index = redis_connector.new_index(search_settings.id) + + # skip if already indexing + if redis_connector_index.fenced: + return None + + # skip indexing if the cc_pair is deleting + if redis_connector.delete.fenced: + return None + + db_session.refresh(cc_pair) + if cc_pair.status == ConnectorCredentialPairStatus.DELETING: + return None + + # add a long running generator task to the queue + redis_connector_index.generator_clear() + + # set a basic fence to start + payload = RedisConnectorIndexPayload( + index_attempt_id=None, + started=None, + submitted=datetime.now(timezone.utc), + celery_task_id=None, + ) + + redis_connector_index.set_active() + redis_connector_index.set_fence(payload) + + # create the index attempt for tracking purposes + # code elsewhere checks for index attempts without an associated redis key + # and cleans them up + # therefore we must create the attempt and the task after the fence goes up + index_attempt_id = create_index_attempt( + cc_pair.id, + search_settings.id, + from_beginning=reindex, + db_session=db_session, + ) + + custom_task_id = redis_connector_index.generate_generator_task_id() + + # when the task is sent, we have yet to finish setting up the fence + # therefore, the task must contain code that blocks until the fence is ready + result = celery_app.send_task( + OnyxCeleryTask.CONNECTOR_INDEXING_PROXY_TASK, + kwargs=dict( + index_attempt_id=index_attempt_id, + cc_pair_id=cc_pair.id, + search_settings_id=search_settings.id, + tenant_id=tenant_id, + ), + queue=OnyxCeleryQueues.CONNECTOR_INDEXING, + task_id=custom_task_id, + priority=OnyxCeleryPriority.MEDIUM, + ) + if not result: + raise RuntimeError("send_task for connector_indexing_proxy_task failed.") + + # now fill out the fence with the rest of the data + redis_connector_index.set_active() + + payload.index_attempt_id = index_attempt_id + payload.celery_task_id = result.id + redis_connector_index.set_fence(payload) + except Exception: + task_logger.exception( + f"try_creating_indexing_task - Unexpected exception: " + f"cc_pair={cc_pair.id} " + f"search_settings={search_settings.id}" + ) + + if index_attempt_id is not None: + delete_index_attempt(db_session, index_attempt_id) + redis_connector_index.set_fence(None) + return None + finally: + if lock.owned(): + lock.release() + + return index_attempt_id diff --git a/backend/onyx/background/celery/tasks/pruning/tasks.py b/backend/onyx/background/celery/tasks/pruning/tasks.py index c9483e97f..122dfd580 100644 --- a/backend/onyx/background/celery/tasks/pruning/tasks.py +++ b/backend/onyx/background/celery/tasks/pruning/tasks.py @@ -13,11 +13,11 @@ from sqlalchemy.orm import Session from onyx.background.celery.apps.app_base import task_logger from onyx.background.celery.celery_utils import extract_ids_from_runnable_connector -from onyx.background.celery.tasks.indexing.tasks import IndexingCallback +from onyx.background.celery.tasks.indexing.utils import IndexingCallback from onyx.configs.app_configs import ALLOW_SIMULTANEOUS_PRUNING from onyx.configs.app_configs import JOB_TIMEOUT +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import CELERY_PRUNING_LOCK_TIMEOUT -from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import DANSWER_REDIS_FUNCTION_LOCK_PREFIX from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues @@ -86,7 +86,7 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None: lock_beat: RedisLock = r.lock( OnyxRedisLocks.CHECK_PRUNE_BEAT_LOCK, - timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT, + timeout=CELERY_GENERIC_BEAT_LOCK_TIMEOUT, ) # these tasks should never overlap diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 9f8cb9717..129019a4c 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -79,6 +79,8 @@ KV_DOCUMENTS_SEEDED_KEY = "documents_seeded" # NOTE: we use this timeout / 4 in various places to refresh a lock # might be worth separating this timeout into separate timeouts for each situation +CELERY_GENERIC_BEAT_LOCK_TIMEOUT = 120 + CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT = 120 CELERY_PRIMARY_WORKER_LOCK_TIMEOUT = 120 @@ -291,6 +293,8 @@ class OnyxRedisLocks: SLACK_BOT_HEARTBEAT_PREFIX = "da_heartbeat:slack_bot" ANONYMOUS_USER_ENABLED = "anonymous_user_enabled" + CLOUD_CHECK_INDEXING_BEAT_LOCK = "da_lock:cloud_check_indexing_beat" + class OnyxRedisSignals: VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences" @@ -304,6 +308,13 @@ class OnyxCeleryPriority(int, Enum): LOWEST = auto() +# a prefix used to distinguish system wide tasks in the cloud +ONYX_CLOUD_CELERY_TASK_PREFIX = "cloud" + +# the tenant id we use for system level redis operations +ONYX_CLOUD_TENANT_ID = "cloud" + + class OnyxCeleryTask: CHECK_FOR_CONNECTOR_DELETION = "check_for_connector_deletion_task" CHECK_FOR_VESPA_SYNC_TASK = "check_for_vespa_sync_task" @@ -331,6 +342,8 @@ class OnyxCeleryTask: CHECK_TTL_MANAGEMENT_TASK = "check_ttl_management_task" AUTOGENERATE_USAGE_REPORT_TASK = "autogenerate_usage_report_task" + CLOUD_CHECK_FOR_INDEXING = f"{ONYX_CLOUD_CELERY_TASK_PREFIX}_check_for_indexing" + REDIS_SOCKET_KEEPALIVE_OPTIONS = {} REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15 diff --git a/backend/onyx/db/engine.py b/backend/onyx/db/engine.py index 30d8d3ab4..b62530b65 100644 --- a/backend/onyx/db/engine.py +++ b/backend/onyx/db/engine.py @@ -240,8 +240,11 @@ class SqlEngine: def get_all_tenant_ids() -> list[str] | list[None]: + """Returning [None] means the only tenant is the 'public' or self hosted tenant.""" + if not MULTI_TENANT: return [None] + with get_session_with_tenant(tenant_id=POSTGRES_DEFAULT_SCHEMA) as session: result = session.execute( text( diff --git a/backend/onyx/redis/redis_connector_doc_perm_sync.py b/backend/onyx/redis/redis_connector_doc_perm_sync.py index 62c98a66f..99f891e14 100644 --- a/backend/onyx/redis/redis_connector_doc_perm_sync.py +++ b/backend/onyx/redis/redis_connector_doc_perm_sync.py @@ -9,7 +9,7 @@ from pydantic import BaseModel from redis.lock import Lock as RedisLock from onyx.access.models import DocExternalAccess -from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask @@ -147,7 +147,7 @@ class RedisConnectorPermissionSync: for doc_perm in new_permissions: current_time = time.monotonic() if lock and current_time - last_lock_time >= ( - CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 + CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4 ): lock.reacquire() last_lock_time = current_time diff --git a/backend/onyx/redis/redis_connector_prune.py b/backend/onyx/redis/redis_connector_prune.py index 371c7ad18..75279028f 100644 --- a/backend/onyx/redis/redis_connector_prune.py +++ b/backend/onyx/redis/redis_connector_prune.py @@ -7,7 +7,7 @@ from celery import Celery from redis.lock import Lock as RedisLock from sqlalchemy.orm import Session -from onyx.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT +from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask @@ -125,7 +125,7 @@ class RedisConnectorPrune: for doc_id in documents_to_prune: current_time = time.monotonic() if lock and current_time - last_lock_time >= ( - CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 + CELERY_GENERIC_BEAT_LOCK_TIMEOUT / 4 ): lock.reacquire() last_lock_time = current_time