From 687122911dd59347e15ce80dbaa07d673771b22e Mon Sep 17 00:00:00 2001 From: pablonyx Date: Wed, 5 Mar 2025 15:27:14 -0800 Subject: [PATCH] k --- .../versions/3bd4c84fe72f_improved_index.py | 371 ++++++++++++++++-- 1 file changed, 342 insertions(+), 29 deletions(-) diff --git a/backend/alembic/versions/3bd4c84fe72f_improved_index.py b/backend/alembic/versions/3bd4c84fe72f_improved_index.py index ab9497619c..3a08812819 100644 --- a/backend/alembic/versions/3bd4c84fe72f_improved_index.py +++ b/backend/alembic/versions/3bd4c84fe72f_improved_index.py @@ -6,7 +6,8 @@ Create Date: 2025-02-26 13:07:56.217791 """ from alembic import op - +import time +from sqlalchemy import text # revision identifiers, used by Alembic. revision = "3bd4c84fe72f" @@ -27,45 +28,357 @@ depends_on = None # 4. Adds indexes to both chat_message and chat_session tables for comprehensive search -def upgrade() -> None: - # Create a GIN index for full-text search on chat_message.message +def upgrade(): + # --- PART 1: chat_message table --- + # Step 1: Add nullable column (quick, minimal locking) + # op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv") + # op.execute("DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message") + # op.execute("DROP FUNCTION IF EXISTS update_chat_message_tsv()") + # op.execute("ALTER TABLE chat_message DROP COLUMN IF EXISTS message_tsv") + # # Drop chat_session tsv trigger if it exists + # op.execute("DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session") + # op.execute("DROP FUNCTION IF EXISTS update_chat_session_tsv()") + # op.execute("ALTER TABLE chat_session DROP COLUMN IF EXISTS title_tsv") + # raise Exception("Stop here") + time.time() + op.execute("ALTER TABLE chat_message ADD COLUMN IF NOT EXISTS message_tsv tsvector") + + # Step 2: Create function and trigger for new/updated rows op.execute( """ - ALTER TABLE chat_message - ADD COLUMN message_tsv tsvector - GENERATED ALWAYS AS (to_tsvector('english', message)) STORED; - """ + CREATE OR REPLACE FUNCTION update_chat_message_tsv() + RETURNS TRIGGER AS $$ + BEGIN + NEW.message_tsv = to_tsvector('english', NEW.message); + RETURN NEW; + END; + $$ LANGUAGE plpgsql + """ ) - # Commit the current transaction before creating concurrent indexes - op.execute("COMMIT") - + # Create trigger in a separate execute call op.execute( """ - CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv - ON chat_message - USING GIN (message_tsv) - """ + CREATE TRIGGER chat_message_tsv_trigger + BEFORE INSERT OR UPDATE ON chat_message + FOR EACH ROW EXECUTE FUNCTION update_chat_message_tsv() + """ ) - # Also add a stored tsvector column for chat_session.description - op.execute( - """ - ALTER TABLE chat_session - ADD COLUMN description_tsv tsvector - GENERATED ALWAYS AS (to_tsvector('english', coalesce(description, ''))) STORED; - """ + # Step 3: Update existing rows in batches using Python + time.time() + + # Get connection and count total rows + connection = op.get_bind() + total_count_result = connection.execute( + text("SELECT COUNT(*) FROM chat_message") + ).scalar() + total_count = total_count_result if total_count_result is not None else 0 + batch_size = 5000 + batches = 0 + + # Calculate total batches needed + total_batches = ( + (total_count + batch_size - 1) // batch_size if total_count > 0 else 0 ) - # Commit again before creating the second concurrent index - op.execute("COMMIT") + # Process in batches - properly handling UUIDs by using OFFSET/LIMIT approach + for batch_num in range(total_batches): + offset = batch_num * batch_size - op.execute( - """ - CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv - ON chat_session - USING GIN (description_tsv) - """ + # Execute update for this batch using OFFSET/LIMIT which works with UUIDs + connection.execute( + text( + """ + UPDATE chat_message + SET message_tsv = to_tsvector('english', message) + WHERE id IN ( + SELECT id FROM chat_message + WHERE message_tsv IS NULL + ORDER BY id + LIMIT :batch_size OFFSET :offset + ) + """ + ).bindparams(batch_size=batch_size, offset=offset) + ) + + # Commit each batch + connection.execute(text("COMMIT")) + # Start a new transaction + connection.execute(text("BEGIN")) + + batches += 1 + + # Final check for any remaining NULL values + connection.execute( + text( + """ + UPDATE chat_message SET message_tsv = to_tsvector('english', message) + WHERE message_tsv IS NULL + """ + ) + ) + + # Create GIN index concurrently + connection.execute(text("COMMIT")) + + time.time() + + connection.execute( + text( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv + ON chat_message USING GIN (message_tsv) + """ + ) + ) + + # First drop the trigger as it won't be needed anymore + connection.execute( + text( + """ + DROP TRIGGER IF EXISTS chat_message_tsv_trigger ON chat_message; + """ + ) + ) + + connection.execute( + text( + """ + DROP FUNCTION IF EXISTS update_chat_message_tsv(); + """ + ) + ) + + # Add new generated column + time.time() + connection.execute( + text( + """ + ALTER TABLE chat_message + ADD COLUMN message_tsv_gen tsvector + GENERATED ALWAYS AS (to_tsvector('english', message)) STORED; + """ + ) + ) + + connection.execute(text("COMMIT")) + + time.time() + + connection.execute( + text( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_message_tsv_gen + ON chat_message USING GIN (message_tsv_gen) + """ + ) + ) + + # Drop old index and column + connection.execute(text("COMMIT")) + + connection.execute( + text( + """ + DROP INDEX CONCURRENTLY IF EXISTS idx_chat_message_tsv; + """ + ) + ) + connection.execute(text("COMMIT")) + connection.execute( + text( + """ + ALTER TABLE chat_message DROP COLUMN message_tsv; + """ + ) + ) + + # Rename new column to old name + connection.execute( + text( + """ + ALTER TABLE chat_message RENAME COLUMN message_tsv_gen TO message_tsv; + """ + ) + ) + + # --- PART 2: chat_session table --- + + # Step 1: Add nullable column (quick, minimal locking) + time.time() + connection.execute( + text( + "ALTER TABLE chat_session ADD COLUMN IF NOT EXISTS description_tsv tsvector" + ) + ) + + # Step 2: Create function and trigger for new/updated rows - SPLIT INTO SEPARATE CALLS + connection.execute( + text( + """ + CREATE OR REPLACE FUNCTION update_chat_session_tsv() + RETURNS TRIGGER AS $$ + BEGIN + NEW.description_tsv = to_tsvector('english', COALESCE(NEW.description, '')); + RETURN NEW; + END; + $$ LANGUAGE plpgsql + """ + ) + ) + + # Create trigger in a separate execute call + connection.execute( + text( + """ + CREATE TRIGGER chat_session_tsv_trigger + BEFORE INSERT OR UPDATE ON chat_session + FOR EACH ROW EXECUTE FUNCTION update_chat_session_tsv() + """ + ) + ) + + # Step 3: Update existing rows in batches using Python + time.time() + + # Get the maximum ID to determine batch count + # Cast id to text for MAX function since it's a UUID + max_id_result = connection.execute( + text("SELECT COALESCE(MAX(id::text), '0') FROM chat_session") + ).scalar() + max_id_result if max_id_result is not None else "0" + batch_size = 5000 + batches = 0 + + # Get all IDs ordered to process in batches + rows = connection.execute( + text("SELECT id FROM chat_session ORDER BY id") + ).fetchall() + total_rows = len(rows) + + # Process in batches + for batch_num, batch_start in enumerate(range(0, total_rows, batch_size)): + batch_end = min(batch_start + batch_size, total_rows) + batch_ids = [row[0] for row in rows[batch_start:batch_end]] + + if not batch_ids: + continue + + # Use IN clause instead of BETWEEN for UUIDs + placeholders = ", ".join([f":id{i}" for i in range(len(batch_ids))]) + params = {f"id{i}": id_val for i, id_val in enumerate(batch_ids)} + + # Execute update for this batch + connection.execute( + text( + f""" + UPDATE chat_session + SET description_tsv = to_tsvector('english', COALESCE(description, '')) + WHERE id IN ({placeholders}) + AND description_tsv IS NULL + """ + ).bindparams(**params) + ) + + # Commit each batch + connection.execute(text("COMMIT")) + # Start a new transaction + connection.execute(text("BEGIN")) + + batches += 1 + + # Final check for any remaining NULL values + connection.execute( + text( + """ + UPDATE chat_session SET description_tsv = to_tsvector('english', COALESCE(description, '')) + WHERE description_tsv IS NULL + """ + ) + ) + + # Create GIN index concurrently + connection.execute(text("COMMIT")) + + time.time() + connection.execute( + text( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv + ON chat_session USING GIN (description_tsv) + """ + ) + ) + + # After Final check for chat_session + # First drop the trigger as it won't be needed anymore + connection.execute( + text( + """ + DROP TRIGGER IF EXISTS chat_session_tsv_trigger ON chat_session; + """ + ) + ) + + connection.execute( + text( + """ + DROP FUNCTION IF EXISTS update_chat_session_tsv(); + """ + ) + ) + # Add new generated column + time.time() + connection.execute( + text( + """ + ALTER TABLE chat_session + ADD COLUMN description_tsv_gen tsvector + GENERATED ALWAYS AS (to_tsvector('english', COALESCE(description, ''))) STORED; + """ + ) + ) + + # Create new index on generated column + connection.execute(text("COMMIT")) + + time.time() + connection.execute( + text( + """ + CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_chat_session_desc_tsv_gen + ON chat_session USING GIN (description_tsv_gen) + """ + ) + ) + + # Drop old index and column + connection.execute(text("COMMIT")) + + connection.execute( + text( + """ + DROP INDEX CONCURRENTLY IF EXISTS idx_chat_session_desc_tsv; + """ + ) + ) + connection.execute(text("COMMIT")) + connection.execute( + text( + """ + ALTER TABLE chat_session DROP COLUMN description_tsv; + """ + ) + ) + + # Rename new column to old name + connection.execute( + text( + """ + ALTER TABLE chat_session RENAME COLUMN description_tsv_gen TO description_tsv; + """ + ) )