mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-03-17 21:32:36 +01:00
Feature/redis prod tool (#3619)
* prototype tools for handling prod issues * add some commands * add batching and dry run options * custom redis tool * comment * default to app config settings for redis --------- Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
parent
d40fd82803
commit
2ae91f0f2b
87
backend/scripts/celery_purge_queue.py
Normal file
87
backend/scripts/celery_purge_queue.py
Normal file
@ -0,0 +1,87 @@
|
||||
# Tool to run operations on Celery/Redis in production
|
||||
# this is a work in progress and isn't completely put together yet
|
||||
# but can serve as a stub for future operations
|
||||
import argparse
|
||||
import logging
|
||||
from logging import getLogger
|
||||
|
||||
from redis import Redis
|
||||
|
||||
from onyx.background.celery.celery_redis import celery_get_queue_length
|
||||
from onyx.configs.app_configs import REDIS_DB_NUMBER_CELERY
|
||||
from onyx.redis.redis_pool import RedisPool
|
||||
|
||||
# Configure the logger
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format
|
||||
handlers=[logging.StreamHandler()], # Output logs to console
|
||||
)
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
REDIS_PASSWORD = ""
|
||||
|
||||
|
||||
def celery_purge_queue(queue: str, tenant_id: str) -> None:
|
||||
"""Purging a celery queue is extremely difficult because the queue is a list
|
||||
and the only way an item can be removed from a list is by VALUE, which is
|
||||
a linear scan. Therefore, to purge the list of many values is roughly
|
||||
n^2.
|
||||
|
||||
The other alternative is to pop values and push them back, but that raises
|
||||
questions about behavior while operating on a live queue.
|
||||
"""
|
||||
|
||||
pool = RedisPool.create_pool(
|
||||
host="127.0.0.1",
|
||||
port=6380,
|
||||
db=REDIS_DB_NUMBER_CELERY,
|
||||
password=REDIS_PASSWORD,
|
||||
ssl=True,
|
||||
ssl_cert_reqs="optional",
|
||||
ssl_ca_certs=None,
|
||||
)
|
||||
|
||||
r = Redis(connection_pool=pool)
|
||||
|
||||
length = celery_get_queue_length(queue, r)
|
||||
|
||||
logger.info(f"queue={queue} length={length}")
|
||||
|
||||
# processed = 0
|
||||
# deleted = 0
|
||||
# for i in range(len(OnyxCeleryPriority)):
|
||||
# queue_name = queue
|
||||
# if i > 0:
|
||||
# queue_name += CELERY_SEPARATOR
|
||||
# queue_name += str(i)
|
||||
|
||||
# length = r.llen(queue_name)
|
||||
# for i in range(length):
|
||||
# task_raw: bytes | None = r.lindex(queue_name, i)
|
||||
# if not task_raw:
|
||||
# break
|
||||
|
||||
# processed += 1
|
||||
# task_str = task_raw.decode("utf-8")
|
||||
# task = json.loads(task_str)
|
||||
# task_kwargs_str = task["headers"]["kwargsrepr"]
|
||||
# task_kwargs = json.loads(task_kwargs_str)
|
||||
# task_tenant_id = task_kwargs["tenant_id"]
|
||||
# if task_tenant_id and task_tenant_id == "tenant_id":
|
||||
# print("Delete tenant_id={tenant_id}")
|
||||
# if
|
||||
# deleted += 1
|
||||
|
||||
# logger.info(f"processed={processed} deleted={deleted}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Purge celery queue by tenant id")
|
||||
parser.add_argument("--queue", type=str, help="Queue to purge", required=True)
|
||||
|
||||
parser.add_argument("--tenant", type=str, help="Tenant ID to purge", required=True)
|
||||
|
||||
args = parser.parse_args()
|
||||
celery_purge_queue(queue=args.queue, tenant_id=args.tenant)
|
198
backend/scripts/onyx_redis.py
Normal file
198
backend/scripts/onyx_redis.py
Normal file
@ -0,0 +1,198 @@
|
||||
# Tool to run helpful operations on Redis in production
|
||||
# This is targeted for internal usage and may not have all the necessary parameters
|
||||
# for general usage across custom deployments
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
from logging import getLogger
|
||||
from typing import cast
|
||||
|
||||
from redis import Redis
|
||||
|
||||
from onyx.configs.app_configs import REDIS_DB_NUMBER
|
||||
from onyx.configs.app_configs import REDIS_HOST
|
||||
from onyx.configs.app_configs import REDIS_PASSWORD
|
||||
from onyx.configs.app_configs import REDIS_PORT
|
||||
from onyx.redis.redis_pool import RedisPool
|
||||
|
||||
# Configure the logger
|
||||
logging.basicConfig(
|
||||
level=logging.INFO, # Set the log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # Log format
|
||||
handlers=[logging.StreamHandler()], # Output logs to console
|
||||
)
|
||||
|
||||
logger = getLogger(__name__)
|
||||
|
||||
SCAN_ITER_COUNT = 10000
|
||||
BATCH_DEFAULT = 1000
|
||||
|
||||
|
||||
def onyx_redis(
|
||||
command: str,
|
||||
batch: int,
|
||||
dry_run: bool,
|
||||
host: str,
|
||||
port: int,
|
||||
db: int,
|
||||
password: str | None,
|
||||
) -> int:
|
||||
pool = RedisPool.create_pool(
|
||||
host=host,
|
||||
port=port,
|
||||
db=db,
|
||||
password=password if password else "",
|
||||
ssl=True,
|
||||
ssl_cert_reqs="optional",
|
||||
ssl_ca_certs=None,
|
||||
)
|
||||
|
||||
r = Redis(connection_pool=pool)
|
||||
|
||||
try:
|
||||
r.ping()
|
||||
except:
|
||||
logger.exception("Redis ping exceptioned")
|
||||
raise
|
||||
|
||||
if command == "purge_connectorsync_taskset":
|
||||
"""Purge connector tasksets. Used when the tasks represented in the tasksets
|
||||
have been purged."""
|
||||
return purge_by_match_and_type(
|
||||
"*connectorsync_taskset*", "set", batch, dry_run, r
|
||||
)
|
||||
elif command == "purge_documentset_taskset":
|
||||
return purge_by_match_and_type(
|
||||
"*documentset_taskset*", "set", batch, dry_run, r
|
||||
)
|
||||
elif command == "purge_usergroup_taskset":
|
||||
return purge_by_match_and_type("*usergroup_taskset*", "set", batch, dry_run, r)
|
||||
elif command == "purge_vespa_syncing":
|
||||
return purge_by_match_and_type(
|
||||
"*connectorsync:vespa_syncing*", "string", batch, dry_run, r
|
||||
)
|
||||
else:
|
||||
pass
|
||||
|
||||
return 255
|
||||
|
||||
|
||||
def flush_batch_delete(batch_keys: list[bytes], r: Redis) -> None:
|
||||
logger.info(f"Flushing {len(batch_keys)} operations to Redis.")
|
||||
with r.pipeline() as pipe:
|
||||
for batch_key in batch_keys:
|
||||
pipe.delete(batch_key)
|
||||
pipe.execute()
|
||||
|
||||
|
||||
def purge_by_match_and_type(
|
||||
match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis
|
||||
) -> int:
|
||||
"""match_pattern: glob style expression
|
||||
match_type: https://redis.io/docs/latest/commands/type/
|
||||
"""
|
||||
|
||||
# cursor = "0"
|
||||
# while cursor != 0:
|
||||
# cursor, data = self.scan(
|
||||
# cursor=cursor, match=match, count=count, _type=_type, **kwargs
|
||||
# )
|
||||
|
||||
start = time.monotonic()
|
||||
|
||||
count = 0
|
||||
batch_keys: list[bytes] = []
|
||||
for key in r.scan_iter(match_pattern, count=SCAN_ITER_COUNT, _type=match_type):
|
||||
# key_type = r.type(key)
|
||||
# if key_type != match_type.encode("utf-8"):
|
||||
# continue
|
||||
|
||||
key = cast(bytes, key)
|
||||
key_str = key.decode("utf-8")
|
||||
|
||||
count += 1
|
||||
if dry_run:
|
||||
logger.info(f"(DRY-RUN) Deleting item {count}: {key_str}")
|
||||
continue
|
||||
|
||||
logger.info(f"Deleting item {count}: {key_str}")
|
||||
|
||||
batch_keys.append(key)
|
||||
if len(batch_keys) >= batch_size:
|
||||
flush_batch_delete(batch_keys, r)
|
||||
batch_keys.clear()
|
||||
|
||||
if len(batch_keys) >= batch_size:
|
||||
flush_batch_delete(batch_keys, r)
|
||||
batch_keys.clear()
|
||||
|
||||
logger.info(f"Deleted {count} matches.")
|
||||
|
||||
elapsed = time.monotonic() - start
|
||||
logger.info(f"Time elapsed: {elapsed:.2f}s")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Onyx Redis Manager")
|
||||
parser.add_argument("--command", type=str, help="Operation to run", required=True)
|
||||
|
||||
parser.add_argument(
|
||||
"--host",
|
||||
type=str,
|
||||
default=REDIS_HOST,
|
||||
help="The redis host",
|
||||
required=False,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--port",
|
||||
type=int,
|
||||
default=REDIS_PORT,
|
||||
help="The redis port",
|
||||
required=False,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--db",
|
||||
type=int,
|
||||
default=REDIS_DB_NUMBER,
|
||||
help="The redis db",
|
||||
required=False,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--password",
|
||||
type=str,
|
||||
default=REDIS_PASSWORD,
|
||||
help="The redis password",
|
||||
required=False,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--batch",
|
||||
type=int,
|
||||
default=BATCH_DEFAULT,
|
||||
help="Size of operation batches to send to Redis",
|
||||
required=False,
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--dry-run",
|
||||
action="store_true",
|
||||
help="Perform a dry run without actually executing modifications",
|
||||
required=False,
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
exitcode = onyx_redis(
|
||||
command=args.command,
|
||||
batch=args.batch,
|
||||
dry_run=args.dry_run,
|
||||
host=args.host,
|
||||
port=args.port,
|
||||
db=args.db,
|
||||
password=args.password,
|
||||
)
|
||||
sys.exit(exitcode)
|
Loading…
x
Reference in New Issue
Block a user