mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-08-08 22:12:30 +02:00
redis -> pg advisory lock (https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS)
This commit is contained in:
@@ -7,10 +7,12 @@ Create Date: 2025-02-26 13:07:56.217791
|
|||||||
"""
|
"""
|
||||||
from alembic import op
|
from alembic import op
|
||||||
import time
|
import time
|
||||||
|
import hashlib
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
|
||||||
from onyx.redis.redis_pool import get_redis_client
|
# Remove Redis import as we're not using it anymore
|
||||||
from onyx.configs.app_configs import ALEMBIC_MIGRATION_LOCK_KEY
|
# from onyx.redis.redis_pool import get_redis_client
|
||||||
|
# from onyx.configs.app_configs import ALEMBIC_MIGRATION_LOCK_KEY
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
# revision identifiers, used by Alembic.
|
||||||
revision = "3bd4c84fe72f"
|
revision = "3bd4c84fe72f"
|
||||||
@@ -18,6 +20,12 @@ down_revision = "8f43500ee275"
|
|||||||
branch_labels = None
|
branch_labels = None
|
||||||
depends_on = None
|
depends_on = None
|
||||||
|
|
||||||
|
# Define a constant for our advisory lock
|
||||||
|
# Converting a string to a bigint for advisory lock
|
||||||
|
ALEMBIC_MIGRATION_LOCK_KEY = int(
|
||||||
|
hashlib.md5("alembic_migration_lock".encode()).hexdigest()[:15], 16
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# NOTE:
|
# NOTE:
|
||||||
# This migration addresses issues with the previous migration (8f43500ee275) which caused
|
# This migration addresses issues with the previous migration (8f43500ee275) which caused
|
||||||
@@ -32,13 +40,22 @@ depends_on = None
|
|||||||
|
|
||||||
|
|
||||||
def upgrade():
|
def upgrade():
|
||||||
# # Use Redis to ensure only one migration runs at a time
|
# Use PostgreSQL advisory locks to ensure only one migration runs at a time
|
||||||
redis_client = get_redis_client()
|
connection = op.get_bind()
|
||||||
|
|
||||||
# Try to acquire lock (without expiration)
|
# Try to acquire an advisory lock (exclusive, session level)
|
||||||
if not redis_client.set(ALEMBIC_MIGRATION_LOCK_KEY, "1", nx=True):
|
lock_acquired = connection.execute(
|
||||||
raise Exception("Migration already in progress. Try again later.")
|
text("SELECT pg_try_advisory_lock(:lock_key)").bindparams(
|
||||||
|
lock_key=ALEMBIC_MIGRATION_LOCK_KEY
|
||||||
|
)
|
||||||
|
).scalar()
|
||||||
|
|
||||||
|
if not lock_acquired:
|
||||||
|
raise Exception(
|
||||||
|
"Migration already in progress by another process. Try again later."
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
# --- PART 1: chat_message table ---
|
# --- PART 1: chat_message table ---
|
||||||
# Step 1: Add nullable column (quick, minimal locking)
|
# Step 1: Add nullable column (quick, minimal locking)
|
||||||
op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv_gen")
|
op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv_gen")
|
||||||
@@ -72,7 +89,9 @@ def upgrade():
|
|||||||
op.execute("BEGIN")
|
op.execute("BEGIN")
|
||||||
|
|
||||||
time.time()
|
time.time()
|
||||||
op.execute("ALTER TABLE chat_message ADD COLUMN IF NOT EXISTS message_tsv tsvector")
|
op.execute(
|
||||||
|
"ALTER TABLE chat_message ADD COLUMN IF NOT EXISTS message_tsv tsvector"
|
||||||
|
)
|
||||||
|
|
||||||
# Step 2: Create function and trigger for new/updated rows
|
# Step 2: Create function and trigger for new/updated rows
|
||||||
op.execute(
|
op.execute(
|
||||||
@@ -412,6 +431,22 @@ def upgrade():
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# Make sure to release the lock in case of error
|
||||||
|
connection.execute(
|
||||||
|
text("SELECT pg_advisory_unlock(:lock_key)").bindparams(
|
||||||
|
lock_key=ALEMBIC_MIGRATION_LOCK_KEY
|
||||||
|
)
|
||||||
|
)
|
||||||
|
raise e
|
||||||
|
finally:
|
||||||
|
# Release the advisory lock when done
|
||||||
|
connection.execute(
|
||||||
|
text("SELECT pg_advisory_unlock(:lock_key)").bindparams(
|
||||||
|
lock_key=ALEMBIC_MIGRATION_LOCK_KEY
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
def downgrade() -> None:
|
||||||
# Drop the indexes first (use CONCURRENTLY for dropping too)
|
# Drop the indexes first (use CONCURRENTLY for dropping too)
|
||||||
|
45
backend/asdf.py
Normal file
45
backend/asdf.py
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
"""
|
||||||
|
Simple script that keeps trying to run 'alembic upgrade head' until it succeeds.
|
||||||
|
"""
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Path to alembic.ini (change this if needed)
|
||||||
|
ALEMBIC_CONFIG = "alembic.ini"
|
||||||
|
|
||||||
|
# Time to wait between attempts (in seconds)
|
||||||
|
WAIT_TIME = 10
|
||||||
|
|
||||||
|
print("Starting continuous alembic upgrade attempts")
|
||||||
|
print(f"Using config: {ALEMBIC_CONFIG}")
|
||||||
|
print(f"Will retry every {WAIT_TIME} seconds until successful")
|
||||||
|
|
||||||
|
attempt = 1
|
||||||
|
|
||||||
|
while True:
|
||||||
|
print(f"\nAttempt #{attempt} to run alembic upgrade head")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run the alembic upgrade head command
|
||||||
|
result = subprocess.run(
|
||||||
|
["alembic", "-c", ALEMBIC_CONFIG, "upgrade", "head"],
|
||||||
|
check=True,
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# If we get here, the command was successful
|
||||||
|
print("SUCCESS! Alembic upgrade completed successfully.")
|
||||||
|
print(f"Output: {result.stdout}")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
except subprocess.CalledProcessError as e:
|
||||||
|
# Command failed, print error and try again
|
||||||
|
print(f"FAILED with return code {e.returncode}")
|
||||||
|
print(f"Error output: {e.stderr}")
|
||||||
|
|
||||||
|
print(f"Waiting {WAIT_TIME} seconds before next attempt...")
|
||||||
|
time.sleep(WAIT_TIME)
|
||||||
|
attempt += 1
|
Reference in New Issue
Block a user