diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index bf0a827383..650b3da83f 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -674,6 +674,9 @@ def connector_indexing_proxy_task( while True: sleep(5) + # renew watchdog signal (this has a shorter timeout than set_active) + redis_connector_index.set_watchdog(True) + # renew active signal redis_connector_index.set_active() @@ -780,6 +783,7 @@ def connector_indexing_proxy_task( ) continue + redis_connector_index.set_watchdog(False) task_logger.info( f"Indexing watchdog - finished: attempt={index_attempt_id} " f"cc_pair={cc_pair_id} " diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 821f03189e..a350d2ce0c 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -735,7 +735,7 @@ def monitor_ccpair_indexing_taskset( composite_id = RedisConnector.get_id_from_fence_key(fence_key) if composite_id is None: task_logger.warning( - f"monitor_ccpair_indexing_taskset: could not parse composite_id from {fence_key}" + f"Connector indexing: could not parse composite_id from {fence_key}" ) return @@ -785,6 +785,7 @@ def monitor_ccpair_indexing_taskset( # inner/outer/inner double check pattern to avoid race conditions when checking for # bad state + # Verify: if the generator isn't complete, the task must not be in READY state # inner = get_completion / generator_complete not signaled # outer = result.state in READY state status_int = redis_connector_index.get_completion() @@ -830,7 +831,7 @@ def monitor_ccpair_indexing_taskset( ) except Exception: task_logger.exception( - "monitor_ccpair_indexing_taskset - transient exception marking index attempt as failed: " + "Connector indexing - Transient exception marking index attempt as failed: " f"attempt={payload.index_attempt_id} " f"tenant={tenant_id} " f"cc_pair={cc_pair_id} " @@ -840,6 +841,20 @@ def monitor_ccpair_indexing_taskset( redis_connector_index.reset() return + if redis_connector_index.watchdog_signaled(): + # if the generator is complete, don't clean up until the watchdog has exited + task_logger.info( + f"Connector indexing - Delaying finalization until watchdog has exited: " + f"attempt={payload.index_attempt_id} " + f"cc_pair={cc_pair_id} " + f"search_settings={search_settings_id} " + f"progress={progress} " + f"elapsed_submitted={elapsed_submitted.total_seconds():.2f} " + f"elapsed_started={elapsed_started_str}" + ) + + return + status_enum = HTTPStatus(status_int) task_logger.info( @@ -858,9 +873,13 @@ def monitor_ccpair_indexing_taskset( @shared_task(name=OnyxCeleryTask.MONITOR_VESPA_SYNC, soft_time_limit=300, bind=True) def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: - """This is a celery beat task that monitors and finalizes metadata sync tasksets. + """This is a celery beat task that monitors and finalizes various long running tasks. + + The name monitor_vespa_sync is a bit of a misnomer since it checks many different tasks + now. Should change that at some point. + It scans for fence values and then gets the counts of any associated tasksets. - If the count is 0, that means all tasks finished and we should clean up. + For many tasks, the count is 0, that means all tasks finished and we should clean up. This task lock timeout is CELERY_METADATA_SYNC_BEAT_LOCK_TIMEOUT seconds, so don't do anything too expensive in this function! @@ -1045,6 +1064,8 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None: def vespa_metadata_sync_task( self: Task, document_id: str, tenant_id: str | None ) -> bool: + start = time.monotonic() + try: with get_session_with_tenant(tenant_id) as db_session: curr_ind_name, sec_ind_name = get_both_index_names(db_session) @@ -1095,7 +1116,13 @@ def vespa_metadata_sync_task( # r = get_redis_client(tenant_id=tenant_id) # r.delete(redis_syncing_key) - task_logger.info(f"doc={document_id} action=sync chunks={chunks_affected}") + elapsed = time.monotonic() - start + task_logger.info( + f"doc={document_id} " + f"action=sync " + f"chunks={chunks_affected} " + f"elapsed={elapsed:.2f}" + ) except SoftTimeLimitExceeded: task_logger.info(f"SoftTimeLimitExceeded exception. doc={document_id}") except Exception as ex: diff --git a/backend/onyx/redis/redis_connector_index.py b/backend/onyx/redis/redis_connector_index.py index 5cf5d449d2..5b62da7b6b 100644 --- a/backend/onyx/redis/redis_connector_index.py +++ b/backend/onyx/redis/redis_connector_index.py @@ -30,10 +30,17 @@ class RedisConnectorIndex: GENERATOR_LOCK_PREFIX = "da_lock:indexing" TERMINATE_PREFIX = PREFIX + "_terminate" # connectorindexing_terminate + TERMINATE_TTL = 600 # used to signal the overall workflow is still active - # it's difficult to prevent + # there are gaps in time between states where we need some slack + # to correctly transition ACTIVE_PREFIX = PREFIX + "_active" + ACTIVE_TTL = 3600 + + # used to signal that the watchdog is running + WATCHDOG_PREFIX = PREFIX + "_watchdog" + WATCHDOG_TTL = 300 def __init__( self, @@ -59,6 +66,7 @@ class RedisConnectorIndex: ) self.terminate_key = f"{self.TERMINATE_PREFIX}_{id}/{search_settings_id}" self.active_key = f"{self.ACTIVE_PREFIX}_{id}/{search_settings_id}" + self.watchdog_key = f"{self.WATCHDOG_PREFIX}_{id}/{search_settings_id}" @classmethod def fence_key_with_ids(cls, cc_pair_id: int, search_settings_id: int) -> str: @@ -110,7 +118,24 @@ class RedisConnectorIndex: """This sets a signal. It does not block!""" # We shouldn't need very long to terminate the spawned task. # 10 minute TTL is good. - self.redis.set(f"{self.terminate_key}_{celery_task_id}", 0, ex=600) + self.redis.set( + f"{self.terminate_key}_{celery_task_id}", 0, ex=self.TERMINATE_TTL + ) + + def set_watchdog(self, value: bool) -> None: + """Signal the state of the watchdog.""" + if not value: + self.redis.delete(self.watchdog_key) + return + + self.redis.set(self.watchdog_key, 0, ex=self.WATCHDOG_TTL) + + def watchdog_signaled(self) -> bool: + """Check the state of the watchdog.""" + if self.redis.exists(self.watchdog_key): + return True + + return False def set_active(self) -> None: """This sets a signal to keep the indexing flow from getting cleaned up within @@ -118,7 +143,7 @@ class RedisConnectorIndex: The slack in timing is needed to avoid race conditions where simply checking the celery queue and task status could result in race conditions.""" - self.redis.set(self.active_key, 0, ex=3600) + self.redis.set(self.active_key, 0, ex=self.ACTIVE_TTL) def active(self) -> bool: if self.redis.exists(self.active_key):