From 2ae91f0f2ba25f4f5a86ed848a7c99e404480361 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Thu, 9 Jan 2025 13:34:07 -0800 Subject: [PATCH] Feature/redis prod tool (#3619) * prototype tools for handling prod issues * add some commands * add batching and dry run options * custom redis tool * comment * default to app config settings for redis --------- Co-authored-by: Richard Kuo (Danswer) --- backend/scripts/celery_purge_queue.py | 87 +++++++++++ backend/scripts/onyx_redis.py | 198 ++++++++++++++++++++++++++ 2 files changed, 285 insertions(+) create mode 100644 backend/scripts/celery_purge_queue.py create mode 100644 backend/scripts/onyx_redis.py diff --git a/backend/scripts/celery_purge_queue.py b/backend/scripts/celery_purge_queue.py new file mode 100644 index 000000000..cbaed2de4 --- /dev/null +++ b/backend/scripts/celery_purge_queue.py @@ -0,0 +1,87 @@ +# Tool to run operations on Celery/Redis in production +# this is a work in progress and isn't completely put together yet +# but can serve as a stub for future operations +import argparse +import logging +from logging import getLogger + +from redis import Redis + +from onyx.background.celery.celery_redis import celery_get_queue_length +from onyx.configs.app_configs import REDIS_DB_NUMBER_CELERY +from onyx.redis.redis_pool import RedisPool + +# Configure the logger +logging.basicConfig( + level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format + handlers=[logging.StreamHandler()], # Output logs to console +) + +logger = getLogger(__name__) + +REDIS_PASSWORD = "" + + +def celery_purge_queue(queue: str, tenant_id: str) -> None: + """Purging a celery queue is extremely difficult because the queue is a list + and the only way an item can be removed from a list is by VALUE, which is + a linear scan. Therefore, to purge the list of many values is roughly + n^2. + + The other alternative is to pop values and push them back, but that raises + questions about behavior while operating on a live queue. + """ + + pool = RedisPool.create_pool( + host="127.0.0.1", + port=6380, + db=REDIS_DB_NUMBER_CELERY, + password=REDIS_PASSWORD, + ssl=True, + ssl_cert_reqs="optional", + ssl_ca_certs=None, + ) + + r = Redis(connection_pool=pool) + + length = celery_get_queue_length(queue, r) + + logger.info(f"queue={queue} length={length}") + + # processed = 0 + # deleted = 0 + # for i in range(len(OnyxCeleryPriority)): + # queue_name = queue + # if i > 0: + # queue_name += CELERY_SEPARATOR + # queue_name += str(i) + + # length = r.llen(queue_name) + # for i in range(length): + # task_raw: bytes | None = r.lindex(queue_name, i) + # if not task_raw: + # break + + # processed += 1 + # task_str = task_raw.decode("utf-8") + # task = json.loads(task_str) + # task_kwargs_str = task["headers"]["kwargsrepr"] + # task_kwargs = json.loads(task_kwargs_str) + # task_tenant_id = task_kwargs["tenant_id"] + # if task_tenant_id and task_tenant_id == "tenant_id": + # print("Delete tenant_id={tenant_id}") + # if + # deleted += 1 + + # logger.info(f"processed={processed} deleted={deleted}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Purge celery queue by tenant id") + parser.add_argument("--queue", type=str, help="Queue to purge", required=True) + + parser.add_argument("--tenant", type=str, help="Tenant ID to purge", required=True) + + args = parser.parse_args() + celery_purge_queue(queue=args.queue, tenant_id=args.tenant) diff --git a/backend/scripts/onyx_redis.py b/backend/scripts/onyx_redis.py new file mode 100644 index 000000000..c7eb7fbef --- /dev/null +++ b/backend/scripts/onyx_redis.py @@ -0,0 +1,198 @@ +# Tool to run helpful operations on Redis in production +# This is targeted for internal usage and may not have all the necessary parameters +# for general usage across custom deployments +import argparse +import logging +import sys +import time +from logging import getLogger +from typing import cast + +from redis import Redis + +from onyx.configs.app_configs import REDIS_DB_NUMBER +from onyx.configs.app_configs import REDIS_HOST +from onyx.configs.app_configs import REDIS_PASSWORD +from onyx.configs.app_configs import REDIS_PORT +from onyx.redis.redis_pool import RedisPool + +# Configure the logger +logging.basicConfig( + level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL) + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format + handlers=[logging.StreamHandler()], # Output logs to console +) + +logger = getLogger(__name__) + +SCAN_ITER_COUNT = 10000 +BATCH_DEFAULT = 1000 + + +def onyx_redis( + command: str, + batch: int, + dry_run: bool, + host: str, + port: int, + db: int, + password: str | None, +) -> int: + pool = RedisPool.create_pool( + host=host, + port=port, + db=db, + password=password if password else "", + ssl=True, + ssl_cert_reqs="optional", + ssl_ca_certs=None, + ) + + r = Redis(connection_pool=pool) + + try: + r.ping() + except: + logger.exception("Redis ping exceptioned") + raise + + if command == "purge_connectorsync_taskset": + """Purge connector tasksets. Used when the tasks represented in the tasksets + have been purged.""" + return purge_by_match_and_type( + "*connectorsync_taskset*", "set", batch, dry_run, r + ) + elif command == "purge_documentset_taskset": + return purge_by_match_and_type( + "*documentset_taskset*", "set", batch, dry_run, r + ) + elif command == "purge_usergroup_taskset": + return purge_by_match_and_type("*usergroup_taskset*", "set", batch, dry_run, r) + elif command == "purge_vespa_syncing": + return purge_by_match_and_type( + "*connectorsync:vespa_syncing*", "string", batch, dry_run, r + ) + else: + pass + + return 255 + + +def flush_batch_delete(batch_keys: list[bytes], r: Redis) -> None: + logger.info(f"Flushing {len(batch_keys)} operations to Redis.") + with r.pipeline() as pipe: + for batch_key in batch_keys: + pipe.delete(batch_key) + pipe.execute() + + +def purge_by_match_and_type( + match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis +) -> int: + """match_pattern: glob style expression + match_type: https://redis.io/docs/latest/commands/type/ + """ + + # cursor = "0" + # while cursor != 0: + # cursor, data = self.scan( + # cursor=cursor, match=match, count=count, _type=_type, **kwargs + # ) + + start = time.monotonic() + + count = 0 + batch_keys: list[bytes] = [] + for key in r.scan_iter(match_pattern, count=SCAN_ITER_COUNT, _type=match_type): + # key_type = r.type(key) + # if key_type != match_type.encode("utf-8"): + # continue + + key = cast(bytes, key) + key_str = key.decode("utf-8") + + count += 1 + if dry_run: + logger.info(f"(DRY-RUN) Deleting item {count}: {key_str}") + continue + + logger.info(f"Deleting item {count}: {key_str}") + + batch_keys.append(key) + if len(batch_keys) >= batch_size: + flush_batch_delete(batch_keys, r) + batch_keys.clear() + + if len(batch_keys) >= batch_size: + flush_batch_delete(batch_keys, r) + batch_keys.clear() + + logger.info(f"Deleted {count} matches.") + + elapsed = time.monotonic() - start + logger.info(f"Time elapsed: {elapsed:.2f}s") + return 0 + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Onyx Redis Manager") + parser.add_argument("--command", type=str, help="Operation to run", required=True) + + parser.add_argument( + "--host", + type=str, + default=REDIS_HOST, + help="The redis host", + required=False, + ) + + parser.add_argument( + "--port", + type=int, + default=REDIS_PORT, + help="The redis port", + required=False, + ) + + parser.add_argument( + "--db", + type=int, + default=REDIS_DB_NUMBER, + help="The redis db", + required=False, + ) + + parser.add_argument( + "--password", + type=str, + default=REDIS_PASSWORD, + help="The redis password", + required=False, + ) + + parser.add_argument( + "--batch", + type=int, + default=BATCH_DEFAULT, + help="Size of operation batches to send to Redis", + required=False, + ) + + parser.add_argument( + "--dry-run", + action="store_true", + help="Perform a dry run without actually executing modifications", + required=False, + ) + + args = parser.parse_args() + exitcode = onyx_redis( + command=args.command, + batch=args.batch, + dry_run=args.dry_run, + host=args.host, + port=args.port, + db=args.db, + password=args.password, + ) + sys.exit(exitcode)