mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-05 04:31:03 +02:00
no thread local locks in callbacks and raise permission sync timeout … (#3977)
* no thread local locks in callbacks and raise permission sync timeout by a lot based on empirical log observations * more fixes --------- Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
parent
9db7b67a6c
commit
5ed83f1148
@ -367,6 +367,7 @@ def connector_permission_sync_generator_task(
|
|||||||
OnyxRedisLocks.CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX
|
OnyxRedisLocks.CONNECTOR_DOC_PERMISSIONS_SYNC_LOCK_PREFIX
|
||||||
+ f"_{redis_connector.id}",
|
+ f"_{redis_connector.id}",
|
||||||
timeout=CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT,
|
timeout=CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT,
|
||||||
|
thread_local=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
acquired = lock.acquire(blocking=False)
|
acquired = lock.acquire(blocking=False)
|
||||||
|
@ -120,9 +120,8 @@ def check_for_vespa_sync_task(self: Task, *, tenant_id: str | None) -> bool | No
|
|||||||
# endregion
|
# endregion
|
||||||
|
|
||||||
# check if any user groups are not synced
|
# check if any user groups are not synced
|
||||||
|
lock_beat.reacquire()
|
||||||
if global_version.is_ee_version():
|
if global_version.is_ee_version():
|
||||||
lock_beat.reacquire()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
fetch_user_groups = fetch_versioned_implementation(
|
fetch_user_groups = fetch_versioned_implementation(
|
||||||
"onyx.db.user_group", "fetch_user_groups"
|
"onyx.db.user_group", "fetch_user_groups"
|
||||||
|
@ -107,9 +107,9 @@ CELERY_TASK_WAIT_FOR_FENCE_TIMEOUT = 5 * 60 # 5 min
|
|||||||
|
|
||||||
# needs to be long enough to cover the maximum time it takes to download an object
|
# needs to be long enough to cover the maximum time it takes to download an object
|
||||||
# if we can get callbacks as object bytes download, we could lower this a lot.
|
# if we can get callbacks as object bytes download, we could lower this a lot.
|
||||||
CELERY_PRUNING_LOCK_TIMEOUT = 300 # 5 min
|
CELERY_PRUNING_LOCK_TIMEOUT = 3600 # 1 hour (in seconds)
|
||||||
|
|
||||||
CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT = 300 # 5 min
|
CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT = 3600 # 1 hour (in seconds)
|
||||||
|
|
||||||
CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT = 300 # 5 min
|
CELERY_EXTERNAL_GROUP_SYNC_LOCK_TIMEOUT = 300 # 5 min
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ from redis.lock import Lock as RedisLock
|
|||||||
|
|
||||||
from onyx.access.models import DocExternalAccess
|
from onyx.access.models import DocExternalAccess
|
||||||
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
||||||
|
from onyx.configs.constants import CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT
|
||||||
from onyx.configs.constants import OnyxCeleryPriority
|
from onyx.configs.constants import OnyxCeleryPriority
|
||||||
from onyx.configs.constants import OnyxCeleryQueues
|
from onyx.configs.constants import OnyxCeleryQueues
|
||||||
from onyx.configs.constants import OnyxCeleryTask
|
from onyx.configs.constants import OnyxCeleryTask
|
||||||
@ -49,7 +50,7 @@ class RedisConnectorPermissionSync:
|
|||||||
# it's impossible to get the exact state of the system at a single point in time
|
# it's impossible to get the exact state of the system at a single point in time
|
||||||
# so we need a signal with a TTL to bridge gaps in our checks
|
# so we need a signal with a TTL to bridge gaps in our checks
|
||||||
ACTIVE_PREFIX = PREFIX + "_active"
|
ACTIVE_PREFIX = PREFIX + "_active"
|
||||||
ACTIVE_TTL = 3600
|
ACTIVE_TTL = CELERY_PERMISSIONS_SYNC_LOCK_TIMEOUT * 2
|
||||||
|
|
||||||
def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None:
|
def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None:
|
||||||
self.tenant_id: str | None = tenant_id
|
self.tenant_id: str | None = tenant_id
|
||||||
|
@ -10,6 +10,7 @@ from redis.lock import Lock as RedisLock
|
|||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
||||||
|
from onyx.configs.constants import CELERY_PRUNING_LOCK_TIMEOUT
|
||||||
from onyx.configs.constants import OnyxCeleryPriority
|
from onyx.configs.constants import OnyxCeleryPriority
|
||||||
from onyx.configs.constants import OnyxCeleryQueues
|
from onyx.configs.constants import OnyxCeleryQueues
|
||||||
from onyx.configs.constants import OnyxCeleryTask
|
from onyx.configs.constants import OnyxCeleryTask
|
||||||
@ -49,7 +50,7 @@ class RedisConnectorPrune:
|
|||||||
# it's impossible to get the exact state of the system at a single point in time
|
# it's impossible to get the exact state of the system at a single point in time
|
||||||
# so we need a signal with a TTL to bridge gaps in our checks
|
# so we need a signal with a TTL to bridge gaps in our checks
|
||||||
ACTIVE_PREFIX = PREFIX + "_active"
|
ACTIVE_PREFIX = PREFIX + "_active"
|
||||||
ACTIVE_TTL = 3600
|
ACTIVE_TTL = CELERY_PRUNING_LOCK_TIMEOUT * 2
|
||||||
|
|
||||||
def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None:
|
def __init__(self, tenant_id: str | None, id: int, redis: redis.Redis) -> None:
|
||||||
self.tenant_id: str | None = tenant_id
|
self.tenant_id: str | None = tenant_id
|
||||||
|
Loading…
x
Reference in New Issue
Block a user