mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-03-26 17:51:54 +01:00
use replica, remove some commented code
This commit is contained in:
parent
fd947aadea
commit
6f018d75ee
@ -59,6 +59,7 @@ from onyx.redis.redis_connector import RedisConnector
|
||||
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
|
||||
from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyncPayload
|
||||
from onyx.redis.redis_pool import get_redis_client
|
||||
from onyx.redis.redis_pool import get_redis_replica_client
|
||||
from onyx.redis.redis_pool import redis_lock_dump
|
||||
from onyx.server.utils import make_short_id
|
||||
from onyx.utils.logger import doc_permission_sync_ctx
|
||||
@ -124,6 +125,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
|
||||
# we need to use celery's redis client to access its redis data
|
||||
# (which lives on a different db number)
|
||||
r = get_redis_client(tenant_id=tenant_id)
|
||||
r_replica = get_redis_replica_client(tenant_id=tenant_id)
|
||||
r_celery: Redis = self.app.broker_connection().channel().client # type: ignore
|
||||
|
||||
lock_beat: RedisLock = r.lock(
|
||||
@ -164,7 +166,9 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool
|
||||
# tasks can be in the queue in redis, in reserved tasks (prefetched by the worker),
|
||||
# or be currently executing
|
||||
try:
|
||||
validate_permission_sync_fences(tenant_id, r, r_celery, lock_beat)
|
||||
validate_permission_sync_fences(
|
||||
tenant_id, r, r_replica, r_celery, lock_beat
|
||||
)
|
||||
except Exception:
|
||||
task_logger.exception(
|
||||
"Exception while validating permission sync fences"
|
||||
@ -487,6 +491,7 @@ def update_external_document_permissions_task(
|
||||
def validate_permission_sync_fences(
|
||||
tenant_id: str | None,
|
||||
r: Redis,
|
||||
r_replica: Redis,
|
||||
r_celery: Redis,
|
||||
lock_beat: RedisLock,
|
||||
) -> None:
|
||||
@ -509,7 +514,7 @@ def validate_permission_sync_fences(
|
||||
|
||||
# validate all existing permission sync jobs
|
||||
lock_beat.reacquire()
|
||||
keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES))
|
||||
keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES))
|
||||
for key in keys:
|
||||
key_bytes = cast(bytes, key)
|
||||
key_str = key_bytes.decode("utf-8")
|
||||
|
@ -1025,15 +1025,6 @@ def vespa_metadata_sync_task(
|
||||
# the sync might repeat again later
|
||||
mark_document_as_synced(document_id, db_session)
|
||||
|
||||
# this code checks for and removes a per document sync key that is
|
||||
# used to block out the same doc from continualy resyncing
|
||||
# a quick hack that is only needed for production issues
|
||||
# redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key(
|
||||
# document_id
|
||||
# )
|
||||
# r = get_redis_client(tenant_id=tenant_id)
|
||||
# r.delete(redis_syncing_key)
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
task_logger.info(
|
||||
f"doc={document_id} "
|
||||
|
@ -32,8 +32,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
|
||||
PREFIX = "connectorsync"
|
||||
TASKSET_PREFIX = PREFIX + "_taskset"
|
||||
|
||||
# SYNCING_PREFIX = PREFIX + ":vespa_syncing"
|
||||
|
||||
def __init__(self, tenant_id: str | None, id: int) -> None:
|
||||
super().__init__(tenant_id, str(id))
|
||||
|
||||
@ -56,11 +54,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
|
||||
# the list on the fly
|
||||
self.skip_docs = skip_docs
|
||||
|
||||
# @staticmethod
|
||||
# def make_redis_syncing_key(doc_id: str) -> str:
|
||||
# """used to create a key in redis to block a doc from syncing"""
|
||||
# return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}"
|
||||
|
||||
def generate_tasks(
|
||||
self,
|
||||
max_tasks: int,
|
||||
@ -108,15 +101,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
|
||||
if doc.id in self.skip_docs:
|
||||
continue
|
||||
|
||||
# an arbitrary number in seconds to prevent the same doc from syncing repeatedly
|
||||
# SYNC_EXPIRATION = 24 * 60 * 60
|
||||
|
||||
# a quick hack that can be uncommented to prevent a doc from resyncing over and over
|
||||
# redis_syncing_key = self.make_redis_syncing_key(doc.id)
|
||||
# if redis_client.exists(redis_syncing_key):
|
||||
# continue
|
||||
# redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION)
|
||||
|
||||
# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
|
||||
# the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac"
|
||||
# we prefix the task id so it's easier to keep track of who created the task
|
||||
|
Loading…
x
Reference in New Issue
Block a user