Bugfix/indexing redux (#3151)

* raise indexing lock timeout

* refactor unknown index attempts and redis lock
This commit is contained in:
rkuo-danswer 2024-11-18 14:47:31 -08:00 committed by GitHub
parent a7d95661b3
commit aa37051be9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 213 additions and 110 deletions

View File

@ -14,10 +14,14 @@ from celery.signals import worker_shutdown
import danswer.background.celery.apps.app_base as app_base
from danswer.background.celery.apps.app_base import task_logger
from danswer.background.celery.celery_utils import celery_is_worker_primary
from danswer.background.celery.tasks.vespa.tasks import get_unfenced_index_attempt_ids
from danswer.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT
from danswer.configs.constants import DanswerRedisLocks
from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
from danswer.db.engine import get_session_with_default_tenant
from danswer.db.engine import SqlEngine
from danswer.db.index_attempt import get_index_attempt
from danswer.db.index_attempt import mark_attempt_failed
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
from danswer.redis.redis_connector_delete import RedisConnectorDelete
from danswer.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync
@ -140,6 +144,23 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
RedisConnectorExternalGroupSync.reset_all(r)
# mark orphaned index attempts as failed
with get_session_with_default_tenant() as db_session:
unfenced_attempt_ids = get_unfenced_index_attempt_ids(db_session, r)
for attempt_id in unfenced_attempt_ids:
attempt = get_index_attempt(db_session, attempt_id)
if not attempt:
continue
failure_reason = (
f"Orphaned index attempt found on startup: "
f"index_attempt={attempt.id} "
f"cc_pair={attempt.connector_credential_pair_id} "
f"search_settings={attempt.search_settings_id}"
)
logger.warning(failure_reason)
mark_attempt_failed(attempt.id, db_session, failure_reason)
@worker_ready.connect
def on_worker_ready(sender: Any, **kwargs: Any) -> None:

View File

@ -1,12 +1,12 @@
from datetime import datetime
from datetime import timezone
import redis
from celery import Celery
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.background.celery.apps.app_base import task_logger
@ -87,7 +87,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
cc_pair_id: int,
db_session: Session,
r: Redis,
lock_beat: redis.lock.Lock,
lock_beat: RedisLock,
tenant_id: str | None,
) -> int | None:
"""Returns an int if syncing is needed. The int represents the number of sync tasks generated.

View File

@ -3,13 +3,14 @@ from datetime import timezone
from http import HTTPStatus
from time import sleep
import redis
import sentry_sdk
from celery import Celery
from celery import shared_task
from celery import Task
from celery.exceptions import SoftTimeLimitExceeded
from redis import Redis
from redis.exceptions import LockError
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.background.celery.apps.app_base import task_logger
@ -44,7 +45,7 @@ from danswer.db.swap_index import check_index_swap
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
from danswer.redis.redis_connector import RedisConnector
from danswer.redis.redis_connector_index import RedisConnectorIndexingFenceData
from danswer.redis.redis_connector_index import RedisConnectorIndexPayload
from danswer.redis.redis_pool import get_redis_client
from danswer.utils.logger import setup_logger
from danswer.utils.variable_functionality import global_version
@ -61,14 +62,18 @@ class RunIndexingCallback(RunIndexingCallbackInterface):
self,
stop_key: str,
generator_progress_key: str,
redis_lock: redis.lock.Lock,
redis_lock: RedisLock,
redis_client: Redis,
):
super().__init__()
self.redis_lock: redis.lock.Lock = redis_lock
self.redis_lock: RedisLock = redis_lock
self.stop_key: str = stop_key
self.generator_progress_key: str = generator_progress_key
self.redis_client = redis_client
self.started: datetime = datetime.now(timezone.utc)
self.redis_lock.reacquire()
self.last_lock_reacquire: datetime = datetime.now(timezone.utc)
def should_stop(self) -> bool:
if self.redis_client.exists(self.stop_key):
@ -76,7 +81,19 @@ class RunIndexingCallback(RunIndexingCallbackInterface):
return False
def progress(self, amount: int) -> None:
self.redis_lock.reacquire()
try:
self.redis_lock.reacquire()
self.last_lock_reacquire = datetime.now(timezone.utc)
except LockError:
logger.exception(
f"RunIndexingCallback - lock.reacquire exceptioned. "
f"lock_timeout={self.redis_lock.timeout} "
f"start={self.started} "
f"last_reacquired={self.last_lock_reacquire} "
f"now={datetime.now(timezone.utc)}"
)
raise
self.redis_client.incrby(self.generator_progress_key, amount)
@ -325,7 +342,7 @@ def try_creating_indexing_task(
redis_connector_index.generator_clear()
# set a basic fence to start
payload = RedisConnectorIndexingFenceData(
payload = RedisConnectorIndexPayload(
index_attempt_id=None,
started=None,
submitted=datetime.now(timezone.utc),
@ -368,7 +385,7 @@ def try_creating_indexing_task(
redis_connector_index.set_fence(payload)
except Exception:
redis_connector_index.set_fence(payload)
redis_connector_index.set_fence(None)
task_logger.exception(
f"Unexpected exception: "
f"tenant={tenant_id} "

View File

@ -13,6 +13,7 @@ from celery.exceptions import SoftTimeLimitExceeded
from celery.result import AsyncResult
from celery.states import READY_STATES
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from tenacity import RetryError
@ -167,7 +168,7 @@ def try_generate_stale_document_sync_tasks(
celery_app: Celery,
db_session: Session,
r: Redis,
lock_beat: redis.lock.Lock,
lock_beat: RedisLock,
tenant_id: str | None,
) -> int | None:
# the fence is up, do nothing
@ -185,7 +186,12 @@ def try_generate_stale_document_sync_tasks(
f"Stale documents found (at least {stale_doc_count}). Generating sync tasks by cc pair."
)
task_logger.info("RedisConnector.generate_tasks starting by cc_pair.")
task_logger.info(
"RedisConnector.generate_tasks starting by cc_pair. "
"Documents spanning multiple cc_pairs will only be synced once."
)
docs_to_skip: set[str] = set()
# rkuo: we could technically sync all stale docs in one big pass.
# but I feel it's more understandable to group the docs by cc_pair
@ -193,22 +199,21 @@ def try_generate_stale_document_sync_tasks(
cc_pairs = get_connector_credential_pairs(db_session)
for cc_pair in cc_pairs:
rc = RedisConnectorCredentialPair(tenant_id, cc_pair.id)
tasks_generated = rc.generate_tasks(
celery_app, db_session, r, lock_beat, tenant_id
)
rc.set_skip_docs(docs_to_skip)
result = rc.generate_tasks(celery_app, db_session, r, lock_beat, tenant_id)
if tasks_generated is None:
if result is None:
continue
if tasks_generated == 0:
if result[1] == 0:
continue
task_logger.info(
f"RedisConnector.generate_tasks finished for single cc_pair. "
f"cc_pair_id={cc_pair.id} tasks_generated={tasks_generated}"
f"cc_pair={cc_pair.id} tasks_generated={result[0]} tasks_possible={result[1]}"
)
total_tasks_generated += tasks_generated
total_tasks_generated += result[0]
task_logger.info(
f"RedisConnector.generate_tasks finished for all cc_pairs. total_tasks_generated={total_tasks_generated}"
@ -223,7 +228,7 @@ def try_generate_document_set_sync_tasks(
document_set_id: int,
db_session: Session,
r: Redis,
lock_beat: redis.lock.Lock,
lock_beat: RedisLock,
tenant_id: str | None,
) -> int | None:
lock_beat.reacquire()
@ -251,12 +256,11 @@ def try_generate_document_set_sync_tasks(
)
# Add all documents that need to be updated into the queue
tasks_generated = rds.generate_tasks(
celery_app, db_session, r, lock_beat, tenant_id
)
if tasks_generated is None:
result = rds.generate_tasks(celery_app, db_session, r, lock_beat, tenant_id)
if result is None:
return None
tasks_generated = result[0]
# Currently we are allowing the sync to proceed with 0 tasks.
# It's possible for sets/groups to be generated initially with no entries
# and they still need to be marked as up to date.
@ -265,7 +269,7 @@ def try_generate_document_set_sync_tasks(
task_logger.info(
f"RedisDocumentSet.generate_tasks finished. "
f"document_set_id={document_set.id} tasks_generated={tasks_generated}"
f"document_set={document_set.id} tasks_generated={tasks_generated}"
)
# set this only after all tasks have been added
@ -278,7 +282,7 @@ def try_generate_user_group_sync_tasks(
usergroup_id: int,
db_session: Session,
r: Redis,
lock_beat: redis.lock.Lock,
lock_beat: RedisLock,
tenant_id: str | None,
) -> int | None:
lock_beat.reacquire()
@ -307,12 +311,11 @@ def try_generate_user_group_sync_tasks(
task_logger.info(
f"RedisUserGroup.generate_tasks starting. usergroup_id={usergroup.id}"
)
tasks_generated = rug.generate_tasks(
celery_app, db_session, r, lock_beat, tenant_id
)
if tasks_generated is None:
result = rug.generate_tasks(celery_app, db_session, r, lock_beat, tenant_id)
if result is None:
return None
tasks_generated = result[0]
# Currently we are allowing the sync to proceed with 0 tasks.
# It's possible for sets/groups to be generated initially with no entries
# and they still need to be marked as up to date.
@ -321,7 +324,7 @@ def try_generate_user_group_sync_tasks(
task_logger.info(
f"RedisUserGroup.generate_tasks finished. "
f"usergroup_id={usergroup.id} tasks_generated={tasks_generated}"
f"usergroup={usergroup.id} tasks_generated={tasks_generated}"
)
# set this only after all tasks have been added
@ -626,8 +629,8 @@ def monitor_ccpair_indexing_taskset(
progress = redis_connector_index.get_progress()
if progress is not None:
task_logger.info(
f"Connector indexing progress: cc_pair_id={cc_pair_id} "
f"search_settings_id={search_settings_id} "
f"Connector indexing progress: cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"progress={progress} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
)
@ -648,8 +651,8 @@ def monitor_ccpair_indexing_taskset(
# if it isn't, then the worker crashed
task_logger.info(
f"Connector indexing aborted: "
f"cc_pair_id={cc_pair_id} "
f"search_settings_id={search_settings_id} "
f"cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
)
@ -667,8 +670,8 @@ def monitor_ccpair_indexing_taskset(
status_enum = HTTPStatus(status_int)
task_logger.info(
f"Connector indexing finished: cc_pair_id={cc_pair_id} "
f"search_settings_id={search_settings_id} "
f"Connector indexing finished: cc_pair={cc_pair_id} "
f"search_settings={search_settings_id} "
f"status={status_enum.name} "
f"elapsed_submitted={elapsed_submitted.total_seconds():.2f}"
)
@ -676,6 +679,37 @@ def monitor_ccpair_indexing_taskset(
redis_connector_index.reset()
def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[int]:
"""Gets a list of unfenced index attempts. Should not be possible, so we'd typically
want to clean them up.
Unfenced = attempt not in terminal state and fence does not exist.
"""
unfenced_attempts: list[int] = []
# do some cleanup before clearing fences
# check the db for any outstanding index attempts
attempts: list[IndexAttempt] = []
attempts.extend(
get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session)
)
attempts.extend(
get_all_index_attempts_by_status(IndexingStatus.IN_PROGRESS, db_session)
)
for attempt in attempts:
# if attempts exist in the db but we don't detect them in redis, mark them as failed
fence_key = RedisConnectorIndex.fence_key_with_ids(
attempt.connector_credential_pair_id, attempt.search_settings_id
)
if r.exists(fence_key):
continue
unfenced_attempts.append(attempt.id)
return unfenced_attempts
@shared_task(name="monitor_vespa_sync", soft_time_limit=300, bind=True)
def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
"""This is a celery beat task that monitors and finalizes metadata sync tasksets.
@ -689,7 +723,7 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
"""
r = get_redis_client(tenant_id=tenant_id)
lock_beat: redis.lock.Lock = r.lock(
lock_beat: RedisLock = r.lock(
DanswerRedisLocks.MONITOR_VESPA_SYNC_BEAT_LOCK,
timeout=CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT,
)
@ -727,33 +761,24 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
f"permissions_sync={n_permissions_sync} "
)
# do some cleanup before clearing fences
# check the db for any outstanding index attempts
# Fail any index attempts in the DB that don't have fences
with get_session_with_tenant(tenant_id) as db_session:
attempts: list[IndexAttempt] = []
attempts.extend(
get_all_index_attempts_by_status(IndexingStatus.NOT_STARTED, db_session)
)
attempts.extend(
get_all_index_attempts_by_status(IndexingStatus.IN_PROGRESS, db_session)
)
unfenced_attempt_ids = get_unfenced_index_attempt_ids(db_session, r)
for attempt_id in unfenced_attempt_ids:
attempt = get_index_attempt(db_session, attempt_id)
if not attempt:
continue
for attempt in attempts:
# if attempts exist in the db but we don't detect them in redis, mark them as failed
fence_key = RedisConnectorIndex.fence_key_with_ids(
attempt.connector_credential_pair_id, attempt.search_settings_id
failure_reason = (
f"Unfenced index attempt found in DB: "
f"index_attempt={attempt.id} "
f"cc_pair={attempt.connector_credential_pair_id} "
f"search_settings={attempt.search_settings_id}"
)
task_logger.warning(failure_reason)
mark_attempt_failed(
attempt.id, db_session, failure_reason=failure_reason
)
if not r.exists(fence_key):
failure_reason = (
f"Unknown index attempt. Might be left over from a process restart: "
f"index_attempt={attempt.id} "
f"cc_pair={attempt.connector_credential_pair_id} "
f"search_settings={attempt.search_settings_id}"
)
task_logger.warning(failure_reason)
mark_attempt_failed(
attempt.id, db_session, failure_reason=failure_reason
)
lock_beat.reacquire()
if r.exists(RedisConnectorCredentialPair.get_fence_key()):

View File

@ -74,7 +74,7 @@ CELERY_PRIMARY_WORKER_LOCK_TIMEOUT = 120
# needs to be long enough to cover the maximum time it takes to download an object
# if we can get callbacks as object bytes download, we could lower this a lot.
CELERY_INDEXING_LOCK_TIMEOUT = 60 * 60 # 60 min
CELERY_INDEXING_LOCK_TIMEOUT = 3 * 60 * 60 # 60 min
# needs to be long enough to cover the maximum time it takes to download an object
# if we can get callbacks as object bytes download, we could lower this a lot.

View File

@ -209,6 +209,7 @@ def get_document_connector_counts(
def get_document_counts_for_cc_pairs(
db_session: Session, cc_pair_identifiers: list[ConnectorCredentialPairIdentifier]
) -> Sequence[tuple[int, int, int]]:
"""Returns a sequence of tuples of (connector_id, credential_id, document count)"""
stmt = (
select(
DocumentByConnectorCredentialPair.connector_id,
@ -550,7 +551,7 @@ def prepare_to_modify_documents(
db_session.commit() # ensure that we're not in a transaction
lock_acquired = False
for _ in range(_NUM_LOCK_ATTEMPTS):
for i in range(_NUM_LOCK_ATTEMPTS):
try:
with db_session.begin() as transaction:
lock_acquired = acquire_document_locks(
@ -561,7 +562,7 @@ def prepare_to_modify_documents(
break
except OperationalError as e:
logger.warning(
f"Failed to acquire locks for documents, retrying. Error: {e}"
f"Failed to acquire locks for documents on attempt {i}, retrying. Error: {e}"
)
time.sleep(retry_delay)

View File

@ -93,14 +93,15 @@ def _upsert_documents_in_db(
document_id=doc.id,
db_session=db_session,
)
else:
create_or_add_document_tag(
tag_key=k,
tag_value=v,
source=doc.source,
document_id=doc.id,
db_session=db_session,
)
continue
create_or_add_document_tag(
tag_key=k,
tag_value=v,
source=doc.source,
document_id=doc.id,
db_session=db_session,
)
def get_doc_ids_to_update(
@ -196,7 +197,7 @@ def index_doc_batch_prepare(
db_session: Session,
ignore_time_skip: bool = False,
) -> DocumentBatchPrepareContext | None:
"""This sets up the documents in the relational DB (source of truth) for permissions, metadata, etc.
"""Sets up the documents in the relational DB (source of truth) for permissions, metadata, etc.
This preceeds indexing it into the actual document index."""
documents: list[Document] = []
for document in document_batch:
@ -213,16 +214,17 @@ def index_doc_batch_prepare(
logger.warning(
f"Skipping document with ID {document.id} as it has neither title nor content."
)
elif (
document.title is not None and not document.title.strip() and empty_contents
):
continue
if document.title is not None and not document.title.strip() and empty_contents:
# The title is explicitly empty ("" and not None) and the document is empty
# so when building the chunk text representation, it will be empty and unuseable
logger.warning(
f"Skipping document with ID {document.id} as the chunks will be empty."
)
else:
documents.append(document)
continue
documents.append(document)
# Create a trimmed list of docs that don't have a newer updated at
# Shortcuts the time-consuming flow on connector index retries
@ -284,7 +286,10 @@ def index_doc_batch(
) -> tuple[int, int]:
"""Takes different pieces of the indexing pipeline and applies it to a batch of documents
Note that the documents should already be batched at this point so that it does not inflate the
memory requirements"""
memory requirements
Returns a tuple where the first element is the number of new docs and the
second element is the number of chunks."""
no_access = DocumentAccess.build(
user_emails=[],
@ -327,9 +332,9 @@ def index_doc_batch(
# we're concerned about race conditions where multiple simultaneous indexings might result
# in one set of metadata overwriting another one in vespa.
# we still write data here for immediate and most likely correct sync, but
# we still write data here for the immediate and most likely correct sync, but
# to resolve this, an update of the last modified field at the end of this loop
# always triggers a final metadata sync
# always triggers a final metadata sync via the celery queue
access_aware_chunks = [
DocMetadataAwareIndexChunk.from_index_chunk(
index_chunk=chunk,
@ -366,7 +371,8 @@ def index_doc_batch(
ids_to_new_updated_at = {}
for doc in successful_docs:
last_modified_ids.append(doc.id)
# doc_updated_at is the connector source's idea of when the doc was last modified
# doc_updated_at is the source's idea (on the other end of the connector)
# of when the doc was last modified
if doc.doc_updated_at is None:
continue
ids_to_new_updated_at[doc.id] = doc.doc_updated_at
@ -381,10 +387,13 @@ def index_doc_batch(
db_session.commit()
return len([r for r in insertion_records if r.already_existed is False]), len(
access_aware_chunks
result = (
len([r for r in insertion_records if r.already_existed is False]),
len(access_aware_chunks),
)
return result
def build_indexing_pipeline(
*,

View File

@ -1,9 +1,10 @@
import time
from typing import cast
from uuid import uuid4
import redis
from celery import Celery
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
@ -13,6 +14,7 @@ from danswer.db.connector_credential_pair import get_connector_credential_pair_f
from danswer.db.document import (
construct_document_select_for_connector_credential_pair_by_needs_sync,
)
from danswer.db.models import Document
from danswer.redis.redis_object_helper import RedisObjectHelper
@ -30,6 +32,9 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
def __init__(self, tenant_id: str | None, id: int) -> None:
super().__init__(tenant_id, str(id))
# documents that should be skipped
self.skip_docs: set[str] = set()
@classmethod
def get_fence_key(cls) -> str:
return RedisConnectorCredentialPair.FENCE_PREFIX
@ -45,14 +50,19 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
# example: connector_taskset
return f"{self.TASKSET_PREFIX}"
def set_skip_docs(self, skip_docs: set[str]) -> None:
# documents that should be skipped. Note that this classes updates
# the list on the fly
self.skip_docs = skip_docs
def generate_tasks(
self,
celery_app: Celery,
db_session: Session,
redis_client: Redis,
lock: redis.lock.Lock,
lock: RedisLock,
tenant_id: str | None,
) -> int | None:
) -> tuple[int, int] | None:
last_lock_time = time.monotonic()
async_results = []
@ -64,7 +74,10 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
cc_pair.connector_id, cc_pair.credential_id
)
num_docs = 0
for doc in db_session.scalars(stmt).yield_per(1):
doc = cast(Document, doc)
current_time = time.monotonic()
if current_time - last_lock_time >= (
CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4
@ -72,6 +85,12 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
lock.reacquire()
last_lock_time = current_time
num_docs += 1
# check if we should skip the document (typically because it's already syncing)
if doc.id in self.skip_docs:
continue
# celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac"
# the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac"
# we prefix the task id so it's easier to keep track of who created the task
@ -94,5 +113,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
)
async_results.append(result)
self.skip_docs.add(doc.id)
return len(async_results)
return len(async_results), num_docs

View File

@ -6,6 +6,7 @@ from uuid import uuid4
import redis
from celery import Celery
from pydantic import BaseModel
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
@ -83,7 +84,7 @@ class RedisConnectorDelete:
self,
celery_app: Celery,
db_session: Session,
lock: redis.lock.Lock,
lock: RedisLock,
) -> int | None:
"""Returns None if the cc_pair doesn't exist.
Otherwise, returns an int with the number of generated tasks."""

View File

@ -6,6 +6,7 @@ from uuid import uuid4
import redis
from celery import Celery
from pydantic import BaseModel
from redis.lock import Lock as RedisLock
from danswer.access.models import DocExternalAccess
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
@ -127,7 +128,7 @@ class RedisConnectorPermissionSync:
def generate_tasks(
self,
celery_app: Celery,
lock: redis.lock.Lock | None,
lock: RedisLock | None,
new_permissions: list[DocExternalAccess],
source_string: str,
) -> int | None:

View File

@ -2,6 +2,7 @@ from typing import cast
import redis
from celery import Celery
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
@ -103,7 +104,7 @@ class RedisConnectorExternalGroupSync:
self,
celery_app: Celery,
db_session: Session,
lock: redis.lock.Lock | None,
lock: RedisLock | None,
) -> int | None:
pass

View File

@ -6,7 +6,7 @@ import redis
from pydantic import BaseModel
class RedisConnectorIndexingFenceData(BaseModel):
class RedisConnectorIndexPayload(BaseModel):
index_attempt_id: int | None
started: datetime | None
submitted: datetime
@ -71,22 +71,20 @@ class RedisConnectorIndex:
return False
@property
def payload(self) -> RedisConnectorIndexingFenceData | None:
def payload(self) -> RedisConnectorIndexPayload | None:
# read related data and evaluate/print task progress
fence_bytes = cast(bytes, self.redis.get(self.fence_key))
if fence_bytes is None:
return None
fence_str = fence_bytes.decode("utf-8")
payload = RedisConnectorIndexingFenceData.model_validate_json(
cast(str, fence_str)
)
payload = RedisConnectorIndexPayload.model_validate_json(cast(str, fence_str))
return payload
def set_fence(
self,
payload: RedisConnectorIndexingFenceData | None,
payload: RedisConnectorIndexPayload | None,
) -> None:
if not payload:
self.redis.delete(self.fence_key)

View File

@ -4,6 +4,7 @@ from uuid import uuid4
import redis
from celery import Celery
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
@ -105,7 +106,7 @@ class RedisConnectorPrune:
documents_to_prune: set[str],
celery_app: Celery,
db_session: Session,
lock: redis.lock.Lock | None,
lock: RedisLock | None,
) -> int | None:
last_lock_time = time.monotonic()

View File

@ -5,6 +5,7 @@ from uuid import uuid4
import redis
from celery import Celery
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
@ -50,9 +51,9 @@ class RedisDocumentSet(RedisObjectHelper):
celery_app: Celery,
db_session: Session,
redis_client: Redis,
lock: redis.lock.Lock,
lock: RedisLock,
tenant_id: str | None,
) -> int | None:
) -> tuple[int, int] | None:
last_lock_time = time.monotonic()
async_results = []
@ -84,7 +85,7 @@ class RedisDocumentSet(RedisObjectHelper):
async_results.append(result)
return len(async_results)
return len(async_results), len(async_results)
def reset(self) -> None:
self.redis.delete(self.taskset_key)

View File

@ -1,9 +1,9 @@
from abc import ABC
from abc import abstractmethod
import redis
from celery import Celery
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.redis.redis_pool import get_redis_client
@ -85,7 +85,13 @@ class RedisObjectHelper(ABC):
celery_app: Celery,
db_session: Session,
redis_client: Redis,
lock: redis.lock.Lock,
lock: RedisLock,
tenant_id: str | None,
) -> int | None:
pass
) -> tuple[int, int] | None:
"""First element should be the number of actual tasks generated, second should
be the number of docs that were candidates to be synced for the cc pair.
The need for this is when we are syncing stale docs referenced by multiple
connectors. In a single pass across multiple cc pairs, we only want a task
for be created for a particular document id the first time we see it.
The rest can be skipped."""

View File

@ -5,6 +5,7 @@ from uuid import uuid4
import redis
from celery import Celery
from redis import Redis
from redis.lock import Lock as RedisLock
from sqlalchemy.orm import Session
from danswer.configs.constants import CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT
@ -51,15 +52,15 @@ class RedisUserGroup(RedisObjectHelper):
celery_app: Celery,
db_session: Session,
redis_client: Redis,
lock: redis.lock.Lock,
lock: RedisLock,
tenant_id: str | None,
) -> int | None:
) -> tuple[int, int] | None:
last_lock_time = time.monotonic()
async_results = []
if not global_version.is_ee_version():
return 0
return 0, 0
try:
construct_document_select_by_usergroup = fetch_versioned_implementation(
@ -67,7 +68,7 @@ class RedisUserGroup(RedisObjectHelper):
"construct_document_select_by_usergroup",
)
except ModuleNotFoundError:
return 0
return 0, 0
stmt = construct_document_select_by_usergroup(int(self._id))
for doc in db_session.scalars(stmt).yield_per(1):
@ -97,7 +98,7 @@ class RedisUserGroup(RedisObjectHelper):
async_results.append(result)
return len(async_results)
return len(async_results), len(async_results)
def reset(self) -> None:
self.redis.delete(self.taskset_key)