tool fixes (#4075)

This commit is contained in:
rkuo-danswer 2025-02-21 12:30:33 -08:00 committed by GitHub
parent e1ff9086a4
commit 61d536c782
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -3,6 +3,7 @@ import json
import logging import logging
import sys import sys
import time import time
from enum import Enum
from logging import getLogger from logging import getLogger
from typing import cast from typing import cast
from uuid import UUID from uuid import UUID
@ -20,10 +21,13 @@ from onyx.configs.app_configs import REDIS_PORT
from onyx.configs.app_configs import REDIS_SSL from onyx.configs.app_configs import REDIS_SSL
from onyx.db.engine import get_session_with_tenant from onyx.db.engine import get_session_with_tenant
from onyx.db.users import get_user_by_email from onyx.db.users import get_user_by_email
from onyx.redis.redis_connector import RedisConnector
from onyx.redis.redis_connector_index import RedisConnectorIndex
from onyx.redis.redis_pool import RedisPool from onyx.redis.redis_pool import RedisPool
from shared_configs.configs import MULTI_TENANT from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
from shared_configs.contextvars import get_current_tenant_id
# Tool to run helpful operations on Redis in production # Tool to run helpful operations on Redis in production
# This is targeted for internal usage and may not have all the necessary parameters # This is targeted for internal usage and may not have all the necessary parameters
@ -42,6 +46,19 @@ SCAN_ITER_COUNT = 10000
BATCH_DEFAULT = 1000 BATCH_DEFAULT = 1000
class OnyxRedisCommand(Enum):
purge_connectorsync_taskset = "purge_connectorsync_taskset"
purge_documentset_taskset = "purge_documentset_taskset"
purge_usergroup_taskset = "purge_usergroup_taskset"
purge_locks_blocking_deletion = "purge_locks_blocking_deletion"
purge_vespa_syncing = "purge_vespa_syncing"
get_user_token = "get_user_token"
delete_user_token = "delete_user_token"
def __str__(self) -> str:
return self.value
def get_user_id(user_email: str) -> tuple[UUID, str]: def get_user_id(user_email: str) -> tuple[UUID, str]:
tenant_id = ( tenant_id = (
get_tenant_id_for_email(user_email) if MULTI_TENANT else POSTGRES_DEFAULT_SCHEMA get_tenant_id_for_email(user_email) if MULTI_TENANT else POSTGRES_DEFAULT_SCHEMA
@ -55,50 +72,79 @@ def get_user_id(user_email: str) -> tuple[UUID, str]:
def onyx_redis( def onyx_redis(
command: str, command: OnyxRedisCommand,
batch: int, batch: int,
dry_run: bool, dry_run: bool,
ssl: bool,
host: str, host: str,
port: int, port: int,
db: int, db: int,
password: str | None, password: str | None,
user_email: str | None = None, user_email: str | None = None,
cc_pair_id: int | None = None,
) -> int: ) -> int:
# this is global and not tenant aware
pool = RedisPool.create_pool( pool = RedisPool.create_pool(
host=host, host=host,
port=port, port=port,
db=db, db=db,
password=password if password else "", password=password if password else "",
ssl=REDIS_SSL, ssl=ssl,
ssl_cert_reqs="optional", ssl_cert_reqs="optional",
ssl_ca_certs=None, ssl_ca_certs=None,
) )
r = Redis(connection_pool=pool) r = Redis(connection_pool=pool)
logger.info("Redis ping starting. This may hang if your settings are incorrect.")
try: try:
r.ping() r.ping()
except: except:
logger.exception("Redis ping exceptioned") logger.exception("Redis ping exceptioned")
raise raise
if command == "purge_connectorsync_taskset": logger.info("Redis ping succeeded.")
if command == OnyxRedisCommand.purge_connectorsync_taskset:
"""Purge connector tasksets. Used when the tasks represented in the tasksets """Purge connector tasksets. Used when the tasks represented in the tasksets
have been purged.""" have been purged."""
return purge_by_match_and_type( return purge_by_match_and_type(
"*connectorsync_taskset*", "set", batch, dry_run, r "*connectorsync_taskset*", "set", batch, dry_run, r
) )
elif command == "purge_documentset_taskset": elif command == OnyxRedisCommand.purge_documentset_taskset:
return purge_by_match_and_type( return purge_by_match_and_type(
"*documentset_taskset*", "set", batch, dry_run, r "*documentset_taskset*", "set", batch, dry_run, r
) )
elif command == "purge_usergroup_taskset": elif command == OnyxRedisCommand.purge_usergroup_taskset:
return purge_by_match_and_type("*usergroup_taskset*", "set", batch, dry_run, r) return purge_by_match_and_type("*usergroup_taskset*", "set", batch, dry_run, r)
elif command == "purge_vespa_syncing": elif command == OnyxRedisCommand.purge_locks_blocking_deletion:
if cc_pair_id is None:
logger.error("You must specify --cc-pair with purge_deletion_locks")
return 1
tenant_id = get_current_tenant_id()
logger.info(f"Purging locks associated with deleting cc_pair={cc_pair_id}.")
redis_connector = RedisConnector(tenant_id, cc_pair_id)
match_pattern = f"{tenant_id}:{RedisConnectorIndex.FENCE_PREFIX}_{cc_pair_id}/*"
purge_by_match_and_type(match_pattern, "string", batch, dry_run, r)
redis_delete_if_exists_helper(
f"{tenant_id}:{redis_connector.prune.fence_key}", dry_run, r
)
redis_delete_if_exists_helper(
f"{tenant_id}:{redis_connector.permissions.fence_key}", dry_run, r
)
redis_delete_if_exists_helper(
f"{tenant_id}:{redis_connector.external_group_sync.fence_key}", dry_run, r
)
return 0
elif command == OnyxRedisCommand.purge_vespa_syncing:
return purge_by_match_and_type( return purge_by_match_and_type(
"*connectorsync:vespa_syncing*", "string", batch, dry_run, r "*connectorsync:vespa_syncing*", "string", batch, dry_run, r
) )
elif command == "get_user_token": elif command == OnyxRedisCommand.get_user_token:
if not user_email: if not user_email:
logger.error("You must specify --user-email with get_user_token") logger.error("You must specify --user-email with get_user_token")
return 1 return 1
@ -109,7 +155,7 @@ def onyx_redis(
else: else:
print(f"No token found for user {user_email}") print(f"No token found for user {user_email}")
return 2 return 2
elif command == "delete_user_token": elif command == OnyxRedisCommand.delete_user_token:
if not user_email: if not user_email:
logger.error("You must specify --user-email with delete_user_token") logger.error("You must specify --user-email with delete_user_token")
return 1 return 1
@ -131,6 +177,25 @@ def flush_batch_delete(batch_keys: list[bytes], r: Redis) -> None:
pipe.execute() pipe.execute()
def redis_delete_if_exists_helper(key: str, dry_run: bool, r: Redis) -> bool:
"""Returns True if the key was found, False if not.
This function exists for logging purposes as the delete operation itself
doesn't really need to check the existence of the key.
"""
if not r.exists(key):
logger.info(f"Did not find {key}.")
return False
if dry_run:
logger.info(f"(DRY-RUN) Deleting {key}.")
else:
logger.info(f"Deleting {key}.")
r.delete(key)
return True
def purge_by_match_and_type( def purge_by_match_and_type(
match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis match_pattern: str, match_type: str, batch_size: int, dry_run: bool, r: Redis
) -> int: ) -> int:
@ -138,6 +203,12 @@ def purge_by_match_and_type(
match_type: https://redis.io/docs/latest/commands/type/ match_type: https://redis.io/docs/latest/commands/type/
""" """
logger.info(
f"purge_by_match_and_type start: "
f"match_pattern={match_pattern} "
f"match_type={match_type}"
)
# cursor = "0" # cursor = "0"
# while cursor != 0: # while cursor != 0:
# cursor, data = self.scan( # cursor, data = self.scan(
@ -164,11 +235,13 @@ def purge_by_match_and_type(
logger.info(f"Deleting item {count}: {key_str}") logger.info(f"Deleting item {count}: {key_str}")
batch_keys.append(key) batch_keys.append(key)
# flush if batch size has been reached
if len(batch_keys) >= batch_size: if len(batch_keys) >= batch_size:
flush_batch_delete(batch_keys, r) flush_batch_delete(batch_keys, r)
batch_keys.clear() batch_keys.clear()
if len(batch_keys) >= batch_size: # final flush
flush_batch_delete(batch_keys, r) flush_batch_delete(batch_keys, r)
batch_keys.clear() batch_keys.clear()
@ -279,7 +352,21 @@ def delete_user_token_from_redis(
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Onyx Redis Manager") parser = argparse.ArgumentParser(description="Onyx Redis Manager")
parser.add_argument("--command", type=str, help="Operation to run", required=True) parser.add_argument(
"--command",
type=OnyxRedisCommand,
help="The command to run",
choices=list(OnyxRedisCommand),
required=True,
)
parser.add_argument(
"--ssl",
type=bool,
default=REDIS_SSL,
help="Use SSL when connecting to Redis. Usually True for prod and False for local testing",
required=False,
)
parser.add_argument( parser.add_argument(
"--host", "--host",
@ -342,6 +429,13 @@ if __name__ == "__main__":
required=False, required=False,
) )
parser.add_argument(
"--cc-pair",
type=int,
help="A connector credential pair id. Used with the purge_deletion_locks command.",
required=False,
)
args = parser.parse_args() args = parser.parse_args()
if args.tenant_id: if args.tenant_id:
@ -368,10 +462,12 @@ if __name__ == "__main__":
command=args.command, command=args.command,
batch=args.batch, batch=args.batch,
dry_run=args.dry_run, dry_run=args.dry_run,
ssl=args.ssl,
host=args.host, host=args.host,
port=args.port, port=args.port,
db=args.db, db=args.db,
password=args.password, password=args.password,
user_email=args.user_email, user_email=args.user_email,
cc_pair_id=args.cc_pair,
) )
sys.exit(exitcode) sys.exit(exitcode)