mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-29 13:25:50 +02:00
better signal names
This commit is contained in:
@@ -158,7 +158,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
|
|||||||
|
|
||||||
# we want to run this less frequently than the overall task
|
# we want to run this less frequently than the overall task
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
if not r.exists(OnyxRedisSignals.VALIDATE_PERMISSION_SYNC_FENCES):
|
if not r.exists(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES):
|
||||||
# clear any permission fences that don't have associated celery tasks in progress
|
# clear any permission fences that don't have associated celery tasks in progress
|
||||||
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
|
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
|
||||||
# or be currently executing
|
# or be currently executing
|
||||||
@@ -169,7 +169,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
|
|||||||
"Exception while validating permission sync fences"
|
"Exception while validating permission sync fences"
|
||||||
)
|
)
|
||||||
|
|
||||||
r.set(OnyxRedisSignals.VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=60)
|
r.set(OnyxRedisSignals.BLOCK_VALIDATE_PERMISSION_SYNC_FENCES, 1, ex=60)
|
||||||
except SoftTimeLimitExceeded:
|
except SoftTimeLimitExceeded:
|
||||||
task_logger.info(
|
task_logger.info(
|
||||||
"Soft time limit exceeded, task is being terminated gracefully."
|
"Soft time limit exceeded, task is being terminated gracefully."
|
||||||
|
@@ -224,7 +224,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
|||||||
|
|
||||||
lock_beat.reacquire()
|
lock_beat.reacquire()
|
||||||
# we want to run this less frequently than the overall task
|
# we want to run this less frequently than the overall task
|
||||||
if not redis_client.exists(OnyxRedisSignals.VALIDATE_INDEXING_FENCES):
|
if not redis_client.exists(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES):
|
||||||
# clear any indexing fences that don't have associated celery tasks in progress
|
# clear any indexing fences that don't have associated celery tasks in progress
|
||||||
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
|
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
|
||||||
# or be currently executing
|
# or be currently executing
|
||||||
@@ -235,7 +235,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
|||||||
except Exception:
|
except Exception:
|
||||||
task_logger.exception("Exception while validating indexing fences")
|
task_logger.exception("Exception while validating indexing fences")
|
||||||
|
|
||||||
redis_client.set(OnyxRedisSignals.VALIDATE_INDEXING_FENCES, 1, ex=60)
|
redis_client.set(OnyxRedisSignals.BLOCK_VALIDATE_INDEXING_FENCES, 1, ex=60)
|
||||||
except SoftTimeLimitExceeded:
|
except SoftTimeLimitExceeded:
|
||||||
task_logger.info(
|
task_logger.info(
|
||||||
"Soft time limit exceeded, task is being terminated gracefully."
|
"Soft time limit exceeded, task is being terminated gracefully."
|
||||||
|
@@ -892,7 +892,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# we want to run this less frequently than the overall task
|
# we want to run this less frequently than the overall task
|
||||||
if not r.exists(OnyxRedisSignals.BUILD_FENCE_LOOKUP_TABLE):
|
if not r.exists(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE):
|
||||||
# build a lookup table of existing fences
|
# build a lookup table of existing fences
|
||||||
# this is just a migration concern and should be unnecessary once
|
# this is just a migration concern and should be unnecessary once
|
||||||
# lookup tables are rolled out
|
# lookup tables are rolled out
|
||||||
@@ -903,7 +903,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool | None:
|
|||||||
logger.warning(f"Adding {key_bytes} to the lookup table.")
|
logger.warning(f"Adding {key_bytes} to the lookup table.")
|
||||||
r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes)
|
r.sadd(OnyxRedisConstants.ACTIVE_FENCES, key_bytes)
|
||||||
|
|
||||||
r.set(OnyxRedisSignals.BUILD_FENCE_LOOKUP_TABLE, 1, ex=120)
|
r.set(OnyxRedisSignals.BLOCK_BUILD_FENCE_LOOKUP_TABLE, 1, ex=120)
|
||||||
|
|
||||||
# use a lookup table to find active fences. We still have to verify the fence
|
# use a lookup table to find active fences. We still have to verify the fence
|
||||||
# exists since it is an optimization and not the source of truth.
|
# exists since it is an optimization and not the source of truth.
|
||||||
|
@@ -299,10 +299,14 @@ class OnyxRedisLocks:
|
|||||||
|
|
||||||
|
|
||||||
class OnyxRedisSignals:
|
class OnyxRedisSignals:
|
||||||
VALIDATE_INDEXING_FENCES = "signal:validate_indexing_fences"
|
BLOCK_VALIDATE_INDEXING_FENCES = "signal:block_validate_indexing_fences"
|
||||||
VALIDATE_EXTERNAL_GROUP_SYNC_FENCES = "signal:validate_external_group_sync_fences"
|
BLOCK_VALIDATE_EXTERNAL_GROUP_SYNC_FENCES = (
|
||||||
VALIDATE_PERMISSION_SYNC_FENCES = "signal:validate_permission_sync_fences"
|
"signal:block_validate_external_group_sync_fences"
|
||||||
BUILD_FENCE_LOOKUP_TABLE = "signal:build_fence_lookup_table"
|
)
|
||||||
|
BLOCK_VALIDATE_PERMISSION_SYNC_FENCES = (
|
||||||
|
"signal:block_validate_permission_sync_fences"
|
||||||
|
)
|
||||||
|
BLOCK_BUILD_FENCE_LOOKUP_TABLE = "signal:block_build_fence_lookup_table"
|
||||||
|
|
||||||
|
|
||||||
class OnyxRedisConstants:
|
class OnyxRedisConstants:
|
||||||
|
Reference in New Issue
Block a user