mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-20 13:01:34 +02:00
k
This commit is contained in:
parent
40953bd4fe
commit
687122911d
@ -6,7 +6,8 @@ Create Date: 2025-02-26 13:07:56.217791
|
|||||||
|
|
||||||
"""
|
"""
|
||||||
from alembic import op
|
from alembic import op
|
||||||
|
import time
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
# revision identifiers, used by Alembic.
|
||||||
revision = "3bd4c84fe72f"
|
revision = "3bd4c84fe72f"
|
||||||
@ -27,46 +28,358 @@ depends_on = None
|
|||||||
# 4. Adds indexes to both chat_message and chat_session tables for comprehensive search
|
# 4. Adds indexes to both chat_message and chat_session tables for comprehensive search
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
def upgrade():
|
||||||
# Create a GIN index for full-text search on chat_message.message
|
# --- PART 1: chat_message table ---
|
||||||
|
# Step 1: Add nullable column (quick, minimal locking)
|
||||||
|
# op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv")
|
||||||
|
# op.execute("DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message")
|
||||||
|
# op.execute("DROP FUNCTION IF EXISTS update_chat_message_tsv()")
|
||||||
|
# op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv")
|
||||||
|
# # Drop chat_session tsv trigger if it exists
|
||||||
|
# op.execute("DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session")
|
||||||
|
# op.execute("DROP FUNCTION IF EXISTS update_chat_session_tsv()")
|
||||||
|
# op.execute("ALTER TABLE chat_session DROP COLUMN IF EXISTS title_tsv")
|
||||||
|
# raise Exception("Stop here")
|
||||||
|
time.time()
|
||||||
|
op.execute("ALTER TABLE chat_message ADD COLUMN IF NOT EXISTS message_tsv tsvector")
|
||||||
|
|
||||||
|
# Step 2: Create function and trigger for new/updated rows
|
||||||
op.execute(
|
op.execute(
|
||||||
"""
|
"""
|
||||||
|
CREATE OR REPLACE FUNCTION update_chat_message_tsv()
|
||||||
|
RETURNS TRIGGER AS $$
|
||||||
|
BEGIN
|
||||||
|
NEW.message_tsv = to_tsvector('english', NEW.message);
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create trigger in a separate execute call
|
||||||
|
op.execute(
|
||||||
|
"""
|
||||||
|
CREATE TRIGGER chat_message_tsv_trigger
|
||||||
|
BEFORE INSERT OR UPDATE ON chat_message
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION update_chat_message_tsv()
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 3: Update existing rows in batches using Python
|
||||||
|
time.time()
|
||||||
|
|
||||||
|
# Get connection and count total rows
|
||||||
|
connection = op.get_bind()
|
||||||
|
total_count_result = connection.execute(
|
||||||
|
text("SELECT COUNT(*) FROM chat_message")
|
||||||
|
).scalar()
|
||||||
|
total_count = total_count_result if total_count_result is not None else 0
|
||||||
|
batch_size = 5000
|
||||||
|
batches = 0
|
||||||
|
|
||||||
|
# Calculate total batches needed
|
||||||
|
total_batches = (
|
||||||
|
(total_count + batch_size - 1) // batch_size if total_count > 0 else 0
|
||||||
|
)
|
||||||
|
|
||||||
|
# Process in batches - properly handling UUIDs by using OFFSET/LIMIT approach
|
||||||
|
for batch_num in range(total_batches):
|
||||||
|
offset = batch_num * batch_size
|
||||||
|
|
||||||
|
# Execute update for this batch using OFFSET/LIMIT which works with UUIDs
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
UPDATE chat_message
|
||||||
|
SET message_tsv = to_tsvector('english', message)
|
||||||
|
WHERE id IN (
|
||||||
|
SELECT id FROM chat_message
|
||||||
|
WHERE message_tsv IS NULL
|
||||||
|
ORDER BY id
|
||||||
|
LIMIT :batch_size OFFSET :offset
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
).bindparams(batch_size=batch_size, offset=offset)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Commit each batch
|
||||||
|
connection.execute(text("COMMIT"))
|
||||||
|
# Start a new transaction
|
||||||
|
connection.execute(text("BEGIN"))
|
||||||
|
|
||||||
|
batches += 1
|
||||||
|
|
||||||
|
# Final check for any remaining NULL values
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
UPDATE chat_message SET message_tsv = to_tsvector('english', message)
|
||||||
|
WHERE message_tsv IS NULL
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create GIN index concurrently
|
||||||
|
connection.execute(text("COMMIT"))
|
||||||
|
|
||||||
|
time.time()
|
||||||
|
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv
|
||||||
|
ON chat_message USING GIN (message_tsv)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# First drop the trigger as it won't be needed anymore
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
DROP FUNCTION IF EXISTS update_chat_message_tsv();
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add new generated column
|
||||||
|
time.time()
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
ALTER TABLE chat_message
|
ALTER TABLE chat_message
|
||||||
ADD COLUMN message_tsv tsvector
|
ADD COLUMN message_tsv_gen tsvector
|
||||||
GENERATED ALWAYS AS (to_tsvector('english', message)) STORED;
|
GENERATED ALWAYS AS (to_tsvector('english', message)) STORED;
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|
||||||
# Commit the current transaction before creating concurrent indexes
|
|
||||||
op.execute("COMMIT")
|
|
||||||
|
|
||||||
op.execute(
|
|
||||||
"""
|
|
||||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv
|
|
||||||
ON chat_message
|
|
||||||
USING GIN (message_tsv)
|
|
||||||
"""
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Also add a stored tsvector column for chat_session.description
|
connection.execute(text("COMMIT"))
|
||||||
op.execute(
|
|
||||||
|
time.time()
|
||||||
|
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
"""
|
"""
|
||||||
ALTER TABLE chat_session
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv_gen
|
||||||
ADD COLUMN description_tsv tsvector
|
ON chat_message USING GIN (message_tsv_gen)
|
||||||
GENERATED ALWAYS AS (to_tsvector('english', coalesce(description, ''))) STORED;
|
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
# Commit again before creating the second concurrent index
|
# Drop old index and column
|
||||||
op.execute("COMMIT")
|
connection.execute(text("COMMIT"))
|
||||||
|
|
||||||
op.execute(
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
DROP INDEX CONCURRENTLY IF EXISTS idx_chat_message_tsv;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
connection.execute(text("COMMIT"))
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
ALTER TABLE chat_message DROP COLUMN message_tsv;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Rename new column to old name
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
ALTER TABLE chat_message RENAME COLUMN message_tsv_gen TO message_tsv;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# --- PART 2: chat_session table ---
|
||||||
|
|
||||||
|
# Step 1: Add nullable column (quick, minimal locking)
|
||||||
|
time.time()
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"ALTER TABLE chat_session ADD COLUMN IF NOT EXISTS description_tsv tsvector"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 2: Create function and trigger for new/updated rows - SPLIT INTO SEPARATE CALLS
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
CREATE OR REPLACE FUNCTION update_chat_session_tsv()
|
||||||
|
RETURNS TRIGGER AS $$
|
||||||
|
BEGIN
|
||||||
|
NEW.description_tsv = to_tsvector('english', COALESCE(NEW.description, ''));
|
||||||
|
RETURN NEW;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create trigger in a separate execute call
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
CREATE TRIGGER chat_session_tsv_trigger
|
||||||
|
BEFORE INSERT OR UPDATE ON chat_session
|
||||||
|
FOR EACH ROW EXECUTE FUNCTION update_chat_session_tsv()
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Step 3: Update existing rows in batches using Python
|
||||||
|
time.time()
|
||||||
|
|
||||||
|
# Get the maximum ID to determine batch count
|
||||||
|
# Cast id to text for MAX function since it's a UUID
|
||||||
|
max_id_result = connection.execute(
|
||||||
|
text("SELECT COALESCE(MAX(id::text), '0') FROM chat_session")
|
||||||
|
).scalar()
|
||||||
|
max_id_result if max_id_result is not None else "0"
|
||||||
|
batch_size = 5000
|
||||||
|
batches = 0
|
||||||
|
|
||||||
|
# Get all IDs ordered to process in batches
|
||||||
|
rows = connection.execute(
|
||||||
|
text("SELECT id FROM chat_session ORDER BY id")
|
||||||
|
).fetchall()
|
||||||
|
total_rows = len(rows)
|
||||||
|
|
||||||
|
# Process in batches
|
||||||
|
for batch_num, batch_start in enumerate(range(0, total_rows, batch_size)):
|
||||||
|
batch_end = min(batch_start + batch_size, total_rows)
|
||||||
|
batch_ids = [row[0] for row in rows[batch_start:batch_end]]
|
||||||
|
|
||||||
|
if not batch_ids:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Use IN clause instead of BETWEEN for UUIDs
|
||||||
|
placeholders = ", ".join([f":id{i}" for i in range(len(batch_ids))])
|
||||||
|
params = {f"id{i}": id_val for i, id_val in enumerate(batch_ids)}
|
||||||
|
|
||||||
|
# Execute update for this batch
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
f"""
|
||||||
|
UPDATE chat_session
|
||||||
|
SET description_tsv = to_tsvector('english', COALESCE(description, ''))
|
||||||
|
WHERE id IN ({placeholders})
|
||||||
|
AND description_tsv IS NULL
|
||||||
|
"""
|
||||||
|
).bindparams(**params)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Commit each batch
|
||||||
|
connection.execute(text("COMMIT"))
|
||||||
|
# Start a new transaction
|
||||||
|
connection.execute(text("BEGIN"))
|
||||||
|
|
||||||
|
batches += 1
|
||||||
|
|
||||||
|
# Final check for any remaining NULL values
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
UPDATE chat_session SET description_tsv = to_tsvector('english', COALESCE(description, ''))
|
||||||
|
WHERE description_tsv IS NULL
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create GIN index concurrently
|
||||||
|
connection.execute(text("COMMIT"))
|
||||||
|
|
||||||
|
time.time()
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
"""
|
"""
|
||||||
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv
|
||||||
ON chat_session
|
ON chat_session USING GIN (description_tsv)
|
||||||
USING GIN (description_tsv)
|
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# After Final check for chat_session
|
||||||
|
# First drop the trigger as it won't be needed anymore
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
DROP FUNCTION IF EXISTS update_chat_session_tsv();
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Add new generated column
|
||||||
|
time.time()
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
ALTER TABLE chat_session
|
||||||
|
ADD COLUMN description_tsv_gen tsvector
|
||||||
|
GENERATED ALWAYS AS (to_tsvector('english', COALESCE(description, ''))) STORED;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create new index on generated column
|
||||||
|
connection.execute(text("COMMIT"))
|
||||||
|
|
||||||
|
time.time()
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv_gen
|
||||||
|
ON chat_session USING GIN (description_tsv_gen)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Drop old index and column
|
||||||
|
connection.execute(text("COMMIT"))
|
||||||
|
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
DROP INDEX CONCURRENTLY IF EXISTS idx_chat_session_desc_tsv;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
connection.execute(text("COMMIT"))
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
ALTER TABLE chat_session DROP COLUMN description_tsv;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Rename new column to old name
|
||||||
|
connection.execute(
|
||||||
|
text(
|
||||||
|
"""
|
||||||
|
ALTER TABLE chat_session RENAME COLUMN description_tsv_gen TO description_tsv;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
def downgrade() -> None:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user