mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-01 18:49:27 +02:00
various lock diagnostics and timing adjustments
This commit is contained in:
parent
1db778baa8
commit
b027a08698
@ -5,7 +5,6 @@ from datetime import datetime
|
||||
from datetime import timezone
|
||||
from http import HTTPStatus
|
||||
from time import sleep
|
||||
from typing import cast
|
||||
|
||||
import redis
|
||||
import sentry_sdk
|
||||
@ -63,6 +62,7 @@ from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.redis.redis_connector_index import RedisConnectorIndex
|
||||
from onyx.redis.redis_connector_index import RedisConnectorIndexPayload
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.redis.redis_pool import redis_lock_dump
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.variable_functionality import global_version
|
||||
from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
|
||||
@ -95,6 +95,7 @@ class IndexingCallback(IndexingHeartbeatInterface):
|
||||
|
||||
self.last_tag: str = "IndexingCallback.__init__"
|
||||
self.last_lock_reacquire: datetime = datetime.now(timezone.utc)
|
||||
self.last_lock_monotonic = time.monotonic()
|
||||
|
||||
self.last_parent_check = time.monotonic()
|
||||
|
||||
@ -122,9 +123,15 @@ class IndexingCallback(IndexingHeartbeatInterface):
|
||||
# self.last_parent_check = now
|
||||
|
||||
try:
|
||||
self.redis_lock.reacquire()
|
||||
current_time = time.monotonic()
|
||||
if current_time - self.last_lock_monotonic >= (
|
||||
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
|
||||
):
|
||||
self.redis_lock.reacquire()
|
||||
self.last_lock_reacquire = datetime.now(timezone.utc)
|
||||
self.last_lock_monotonic = time.monotonic()
|
||||
|
||||
self.last_tag = tag
|
||||
self.last_lock_reacquire = datetime.now(timezone.utc)
|
||||
except LockError:
|
||||
logger.exception(
|
||||
f"IndexingCallback - lock.reacquire exceptioned: "
|
||||
@ -135,29 +142,7 @@ class IndexingCallback(IndexingHeartbeatInterface):
|
||||
f"now={datetime.now(timezone.utc)}"
|
||||
)
|
||||
|
||||
# diagnostic logging for lock errors
|
||||
name = self.redis_lock.name
|
||||
ttl = self.redis_client.ttl(name)
|
||||
locked = self.redis_lock.locked()
|
||||
owned = self.redis_lock.owned()
|
||||
local_token: str | None = self.redis_lock.local.token # type: ignore
|
||||
|
||||
remote_token_raw = self.redis_client.get(self.redis_lock.name)
|
||||
if remote_token_raw:
|
||||
remote_token_bytes = cast(bytes, remote_token_raw)
|
||||
remote_token = remote_token_bytes.decode("utf-8")
|
||||
else:
|
||||
remote_token = None
|
||||
|
||||
logger.warning(
|
||||
f"IndexingCallback - lock diagnostics: "
|
||||
f"name={name} "
|
||||
f"locked={locked} "
|
||||
f"owned={owned} "
|
||||
f"local_token={local_token} "
|
||||
f"remote_token={remote_token} "
|
||||
f"ttl={ttl}"
|
||||
)
|
||||
redis_lock_dump(self.redis_lock, self.redis_client)
|
||||
raise
|
||||
|
||||
self.redis_client.incrby(self.generator_progress_key, amount)
|
||||
@ -349,6 +334,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
# Fail any index attempts in the DB that don't have fences
|
||||
# This shouldn't ever happen!
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
lock_beat.reacquire()
|
||||
unfenced_attempt_ids = get_unfenced_index_attempt_ids(
|
||||
db_session, redis_client
|
||||
)
|
||||
@ -372,6 +358,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
|
||||
# we want to run this less frequently than the overall task
|
||||
if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
|
||||
lock_beat.reacquire()
|
||||
# clear any indexing fences that don't have associated celery tasks in progress
|
||||
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
|
||||
# or be currently executing
|
||||
@ -399,6 +386,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
"check_for_indexing - Lock not owned on completion: "
|
||||
f"tenant={tenant_id}"
|
||||
)
|
||||
redis_lock_dump(lock_beat, redis_client)
|
||||
|
||||
time_elapsed = time.monotonic() - time_start
|
||||
task_logger.debug(f"check_for_indexing finished: elapsed={time_elapsed:.2f}")
|
||||
|
@ -68,6 +68,7 @@ from onyx.redis.redis_connector_index import RedisConnectorIndex
|
||||
from onyx.redis.redis_connector_prune import RedisConnectorPrune
|
||||
from onyx.redis.redis_document_set import RedisDocumentSet
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.redis.redis_pool import redis_lock_dump
|
||||
from onyx.redis.redis_usergroup import RedisUserGroup
|
||||
from onyx.utils.logger import setup_logger
|
||||
from onyx.utils.variable_functionality import fetch_versioned_implementation
|
||||
@ -111,6 +112,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
|
||||
)
|
||||
|
||||
# region document set scan
|
||||
lock_beat.reacquire()
|
||||
document_set_ids: list[int] = []
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
# check if any document sets are not synced
|
||||
@ -122,6 +124,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
|
||||
document_set_ids.append(document_set.id)
|
||||
|
||||
for document_set_id in document_set_ids:
|
||||
lock_beat.reacquire()
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
try_generate_document_set_sync_tasks(
|
||||
self.app, document_set_id, db_session, r, lock_beat, tenant_id
|
||||
@ -130,6 +133,8 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
|
||||
|
||||
# check if any user groups are not synced
|
||||
if global_version.is_ee_version():
|
||||
lock_beat.reacquire()
|
||||
|
||||
try:
|
||||
fetch_user_groups = fetch_versioned_implementation(
|
||||
"onyx.db.user_group", "fetch_user_groups"
|
||||
@ -149,6 +154,7 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
|
||||
usergroup_ids.append(usergroup.id)
|
||||
|
||||
for usergroup_id in usergroup_ids:
|
||||
lock_beat.reacquire()
|
||||
with get_session_with_tenant(tenant_id) as db_session:
|
||||
try_generate_user_group_sync_tasks(
|
||||
self.app, usergroup_id, db_session, r, lock_beat, tenant_id
|
||||
@ -163,6 +169,12 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
|
||||
finally:
|
||||
if lock_beat.owned():
|
||||
lock_beat.release()
|
||||
else:
|
||||
task_logger.error(
|
||||
"check_for_vespa_sync_task - Lock not owned on completion: "
|
||||
f"tenant={tenant_id}"
|
||||
)
|
||||
redis_lock_dump(lock_beat, r)
|
||||
|
||||
time_elapsed = time.monotonic() - time_start
|
||||
task_logger.debug(f"check_for_vespa_sync_task finished: elapsed={time_elapsed:.2f}")
|
||||
|
@ -76,7 +76,7 @@ KV_ENTERPRISE_SETTINGS_KEY = "onyx_enterprise_settings"
|
||||
KV_CUSTOM_ANALYTICS_SCRIPT_KEY = "__custom_analytics_script__"
|
||||
KV_DOCUMENTS_SEEDED_KEY = "documents_seeded"
|
||||
|
||||
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT = 60
|
||||
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT = 120
|
||||
CELERY_PRIMARY_WORKER_LOCK_TIMEOUT = 120
|
||||
|
||||
# needs to be long enough to cover the maximum time it takes to download an object
|
||||
|
@ -13,6 +13,7 @@ from onyx.configs.constants import OnyxCeleryPriority
|
||||
from onyx.configs.constants import OnyxCeleryQueues
|
||||
from onyx.configs.constants import OnyxCeleryTask
|
||||
from onyx.db.document_set import construct_document_select_by_docset
|
||||
from onyx.db.models import Document
|
||||
from onyx.redis.redis_object_helper import RedisObjectHelper
|
||||
|
||||
|
||||
@ -60,6 +61,7 @@ class RedisDocumentSet(RedisObjectHelper):
|
||||
async_results = []
|
||||
stmt = construct_document_select_by_docset(int(self._id), current_only=False)
|
||||
for doc in db_session.scalars(stmt).yield_per(1):
|
||||
doc = cast(Document, doc)
|
||||
current_time = time.monotonic()
|
||||
if current_time - last_lock_time >= (
|
||||
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
|
||||
|
@ -4,12 +4,14 @@ import json
|
||||
import threading
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
from typing import Optional
|
||||
|
||||
import redis
|
||||
from fastapi import Request
|
||||
from redis import asyncio as aioredis
|
||||
from redis.client import Redis
|
||||
from redis.lock import Lock as RedisLock
|
||||
|
||||
from onyx.configs.app_configs import REDIS_AUTH_KEY_PREFIX
|
||||
from onyx.configs.app_configs import REDIS_DB_NUMBER
|
||||
@ -262,3 +264,29 @@ async def retrieve_auth_token_data_from_redis(request: Request) -> dict | None:
|
||||
raise ValueError(
|
||||
f"Unexpected error in retrieve_auth_token_data_from_redis: {str(e)}"
|
||||
)
|
||||
|
||||
|
||||
def redis_lock_dump(lock: RedisLock, r: Redis) -> None:
|
||||
# diagnostic logging for lock errors
|
||||
name = lock.name
|
||||
ttl = r.ttl(name)
|
||||
locked = lock.locked()
|
||||
owned = lock.owned()
|
||||
local_token: str | None = lock.local.token # type: ignore
|
||||
|
||||
remote_token_raw = r.get(lock.name)
|
||||
if remote_token_raw:
|
||||
remote_token_bytes = cast(bytes, remote_token_raw)
|
||||
remote_token = remote_token_bytes.decode("utf-8")
|
||||
else:
|
||||
remote_token = None
|
||||
|
||||
logger.warning(
|
||||
f"RedisLock diagnostic logging: "
|
||||
f"name={name} "
|
||||
f"locked={locked} "
|
||||
f"owned={owned} "
|
||||
f"local_token={local_token} "
|
||||
f"remote_token={remote_token} "
|
||||
f"ttl={ttl}"
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user