diff --git a/backend/onyx/background/celery/apps/primary.py b/backend/onyx/background/celery/apps/primary.py index 37838e9b0..6c7e2d4e7 100644 --- a/backend/onyx/background/celery/apps/primary.py +++ b/backend/onyx/background/celery/apps/primary.py @@ -108,14 +108,19 @@ def on_worker_init(sender: Worker, **kwargs: Any) -> None: r = get_redis_client(tenant_id=POSTGRES_DEFAULT_SCHEMA) # Log the role and slave count - being connected to a slave or slave count > 0 could be problematic - info: dict[str, Any] = cast(dict, r.info("replication")) - role: str = cast(str, info.get("role")) - connected_slaves: int = info.get("connected_slaves", 0) + replication_info: dict[str, Any] = cast(dict, r.info("replication")) + role: str = cast(str, replication_info.get("role", "")) + connected_slaves: int = replication_info.get("connected_slaves", 0) logger.info( f"Redis INFO REPLICATION: role={role} connected_slaves={connected_slaves}" ) + memory_info: dict[str, Any] = cast(dict, r.info("memory")) + maxmemory_policy: str = cast(str, memory_info.get("maxmemory_policy", "")) + + logger.info(f"Redis INFO MEMORY: maxmemory_policy={maxmemory_policy}") + # For the moment, we're assuming that we are the only primary worker # that should be running. # TODO: maybe check for or clean up another zombie primary worker if we detect it diff --git a/backend/onyx/background/celery/tasks/indexing/tasks.py b/backend/onyx/background/celery/tasks/indexing/tasks.py index acce48f20..1f2dc482f 100644 --- a/backend/onyx/background/celery/tasks/indexing/tasks.py +++ b/backend/onyx/background/celery/tasks/indexing/tasks.py @@ -1061,6 +1061,10 @@ def connector_indexing_proxy_task( # Track the last time memory info was emitted last_memory_emit_time = 0.0 + # track the last ttl and the time it was observed + last_activity_ttl_observed: float = time.monotonic() + last_activity_ttl: int = 0 + try: with get_session_with_current_tenant() as db_session: index_attempt = get_index_attempt( @@ -1074,11 +1078,15 @@ def connector_indexing_proxy_task( ) redis_connector_index.set_active() # renew active signal - redis_connector_index.set_connector_active() # prime the connective active signal + + # prime the connector active signal (renewed inside the connector) + redis_connector_index.set_connector_active() while True: sleep(5) + now = time.monotonic() + # renew watchdog signal (this has a shorter timeout than set_active) redis_connector_index.set_watchdog(True) @@ -1128,18 +1136,37 @@ def connector_indexing_proxy_task( break # if activity timeout is detected, break (exit point will clean up) - if not redis_connector_index.connector_active(): - task_logger.warning( - log_builder.build( - "Indexing watchdog - activity timeout exceeded", - timeout=f"{CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s", + ttl = redis_connector_index.connector_active_ttl() + if ttl < 0: + # verify expectations around ttl + last_observed = last_activity_ttl_observed - now + if now > last_activity_ttl_observed + last_activity_ttl: + task_logger.warning( + log_builder.build( + "Indexing watchdog - activity timeout exceeded", + last_observed=f"{last_observed:.2f}s", + last_ttl=f"{last_activity_ttl}", + timeout=f"{CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s", + ) ) - ) - result.status = ( - IndexingWatchdogTerminalStatus.TERMINATED_BY_ACTIVITY_TIMEOUT - ) - break + result.status = ( + IndexingWatchdogTerminalStatus.TERMINATED_BY_ACTIVITY_TIMEOUT + ) + break + else: + task_logger.warning( + log_builder.build( + "Indexing watchdog - activity timeout expired unexpectedly, " + "waiting for last observed TTL before exiting", + last_observed=f"{last_observed:.2f}s", + last_ttl=f"{last_activity_ttl}", + timeout=f"{CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT}s", + ) + ) + else: + last_activity_ttl_observed = now + last_activity_ttl = ttl # if the spawned task is still running, restart the check once again # if the index attempt is not in a finished status diff --git a/backend/onyx/connectors/slack/connector.py b/backend/onyx/connectors/slack/connector.py index cdba0b078..bc6396579 100644 --- a/backend/onyx/connectors/slack/connector.py +++ b/backend/onyx/connectors/slack/connector.py @@ -255,7 +255,6 @@ def default_msg_filter(message: MessageType) -> bool: # Don't keep messages from bots if message.get("bot_id") or message.get("app_id"): bot_profile_name = message.get("bot_profile", {}).get("name") - print(f"bot_profile_name: {bot_profile_name}") if bot_profile_name == "DanswerBot Testing": return False return True diff --git a/backend/onyx/redis/redis_connector_index.py b/backend/onyx/redis/redis_connector_index.py index 5a67879e5..0c0687f8b 100644 --- a/backend/onyx/redis/redis_connector_index.py +++ b/backend/onyx/redis/redis_connector_index.py @@ -165,6 +165,16 @@ class RedisConnectorIndex: return False + def connector_active_ttl(self) -> int: + """Refer to https://redis.io/docs/latest/commands/ttl/ + + -2 means the key does not exist + -1 means the key exists but has no associated expire + Otherwise, returns the actual TTL of the key + """ + ttl = cast(int, self.redis.ttl(self.connector_active_key)) + return ttl + def generator_locked(self) -> bool: return bool(self.redis.exists(self.generator_lock_key))