out of process retry for multitenant test reset (#4566)

* tool to generate vespa schema variations for our cloud

* extraneous assign

* use a real templating system instead of search/replace

* fix float

* maybe this should be double

* remove redundant var

* template the other files

* try a spawned process

* move the wrapper

* fix args

* increase timeout

* run multitenant reset operations out of process as well

---------

Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
Co-authored-by: Richard Kuo <rkuo@rkuo.com>
This commit is contained in:
rkuo-danswer
2025-04-21 16:30:18 -07:00
committed by GitHub
parent eb569bf79d
commit 0d4c600852
3 changed files with 106 additions and 43 deletions

View File

@@ -112,7 +112,8 @@ class DocumentManager:
)
response.raise_for_status()
print("Seeding completed successfully.")
api_key_id = api_key.api_key_id if api_key else ""
print(f"Seeding docs for api_key_id={api_key_id} completed successfully.")
return [
SimpleTestDocument(
id=document["document"]["id"],
@@ -140,7 +141,8 @@ class DocumentManager:
)
response.raise_for_status()
print("Seeding completed successfully.")
api_key_id = api_key.api_key_id if api_key else ""
print(f"Seeding doc for api_key_id={api_key_id} completed successfully.")
return SimpleTestDocument(
id=document["document"]["id"],

View File

@@ -24,7 +24,11 @@ class LLMProviderManager:
is_public: bool | None = None,
user_performing_action: DATestUser | None = None,
) -> DATestLLMProvider:
print("Seeding LLM Providers...")
email = "Unknown"
if user_performing_action:
email = user_performing_action.email
print(f"Seeding LLM Providers for {email}...")
llm_provider = LLMProviderUpsertRequest(
name=name or f"test-provider-{uuid4()}",

View File

@@ -83,6 +83,7 @@ def downgrade_postgres(
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
application_name="downgrade_postgres",
)
conn.autocommit = True # Need autocommit for dropping schema
cur = conn.cursor()
@@ -139,6 +140,7 @@ def upgrade_postgres(
host=POSTGRES_HOST,
port=POSTGRES_PORT,
db_api=SYNC_DB_API,
app_name="upgrade_postgres",
)
_run_migrations(
conn_str,
@@ -148,6 +150,100 @@ def upgrade_postgres(
)
def drop_multitenant_postgres(
database: str = "postgres",
) -> None:
"""Reset the Postgres database."""
# this seems to hang due to locking issues, so run with a timeout with a few retries
NUM_TRIES = 10
TIMEOUT = 40
success = False
for _ in range(NUM_TRIES):
logger.info(f"drop_multitenant_postgres_task starting... ({_ + 1}/{NUM_TRIES})")
try:
run_with_timeout_multiproc(
drop_multitenant_postgres_task,
TIMEOUT,
kwargs={
"dbname": database,
},
)
success = True
break
except TimeoutError:
logger.warning(
f"drop_multitenant_postgres_task timed out, retrying... ({_ + 1}/{NUM_TRIES})"
)
except RuntimeError:
logger.warning(
f"drop_multitenant_postgres_task exceptioned, retrying... ({_ + 1}/{NUM_TRIES})"
)
if not success:
raise RuntimeError("drop_multitenant_postgres_task failed after 10 timeouts.")
def drop_multitenant_postgres_task(dbname: str) -> None:
conn = psycopg2.connect(
dbname=dbname,
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
connect_timeout=10,
application_name="drop_multitenant_postgres_task",
)
conn.autocommit = True
cur = conn.cursor()
logger.info("Selecting tenant schemas.")
# Get all tenant schemas
cur.execute(
"""
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name LIKE 'tenant_%'
"""
)
tenant_schemas = cur.fetchall()
# Drop all tenant schemas
logger.info("Dropping all tenant schemas.")
for schema in tenant_schemas:
# Close any existing connections to the schema before dropping
cur.execute(
"""
SELECT pg_terminate_backend(pg_stat_activity.pid)
FROM pg_stat_activity
WHERE pg_stat_activity.datname = 'postgres'
AND pg_stat_activity.state = 'idle in transaction'
AND pid <> pg_backend_pid();
"""
)
schema_name = schema[0]
cur.execute(f'DROP SCHEMA "{schema_name}" CASCADE')
# Drop tables in the public schema
logger.info("Selecting public schema tables.")
cur.execute(
"""
SELECT tablename FROM pg_tables
WHERE schemaname = 'public'
"""
)
public_tables = cur.fetchall()
logger.info("Dropping public schema tables.")
for table in public_tables:
table_name = table[0]
cur.execute(f'DROP TABLE IF EXISTS public."{table_name}" CASCADE')
cur.close()
conn.close()
def reset_postgres(
database: str = "postgres",
config_name: str = "alembic",
@@ -240,46 +336,7 @@ def reset_vespa() -> None:
def reset_postgres_multitenant() -> None:
"""Reset the Postgres database for all tenants in a multitenant setup."""
conn = psycopg2.connect(
dbname="postgres",
user=POSTGRES_USER,
password=POSTGRES_PASSWORD,
host=POSTGRES_HOST,
port=POSTGRES_PORT,
)
conn.autocommit = True
cur = conn.cursor()
# Get all tenant schemas
cur.execute(
"""
SELECT schema_name
FROM information_schema.schemata
WHERE schema_name LIKE 'tenant_%'
"""
)
tenant_schemas = cur.fetchall()
# Drop all tenant schemas
for schema in tenant_schemas:
schema_name = schema[0]
cur.execute(f'DROP SCHEMA "{schema_name}" CASCADE')
# Drop tables in the public schema
cur.execute(
"""
SELECT tablename FROM pg_tables
WHERE schemaname = 'public'
"""
)
public_tables = cur.fetchall()
for table in public_tables:
table_name = table[0]
cur.execute(f'DROP TABLE IF EXISTS public."{table_name}" CASCADE')
cur.close()
conn.close()
drop_multitenant_postgres()
reset_postgres(config_name="schema_private", setup_onyx=False)