saving and deleting chunk stats in new table

This commit is contained in:
joachim-danswer 2025-03-10 15:42:32 -07:00
parent 2d81d6082a
commit 4fe5561f44
6 changed files with 143 additions and 1 deletions

View File

@ -0,0 +1,52 @@
"""add chunk stats table
Revision ID: 3781a5eb12cb
Revises: 3934b1bc7b62
Create Date: 2025-03-10 10:02:30.586666
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "3781a5eb12cb"
down_revision = "3934b1bc7b62"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
"chunk_stats",
sa.Column("id", sa.String(), primary_key=True, index=True),
sa.Column(
"document_id",
sa.String(),
sa.ForeignKey("document.id"),
nullable=False,
index=True,
),
sa.Column("chunk_in_doc_id", sa.Integer(), nullable=False),
sa.Column("chunk_boost_components", postgresql.JSONB(), nullable=True),
sa.Column(
"last_modified",
sa.DateTime(timezone=True),
nullable=False,
index=True,
server_default=sa.func.now(),
),
sa.Column("last_synced", sa.DateTime(timezone=True), nullable=True, index=True),
sa.UniqueConstraint(
"document_id", "chunk_in_doc_id", name="uq_chunk_stats_doc_chunk"
),
)
op.create_index(
"ix_chunk_sync_status", "chunk_stats", ["last_modified", "last_synced"]
)
def downgrade() -> None:
op.drop_index("ix_chunk_sync_status", table_name="chunk_stats")
op.drop_table("chunk_stats")

View File

@ -19,6 +19,7 @@ from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisLocks from onyx.configs.constants import OnyxRedisLocks
from onyx.db.chunk import delete_chunk_stats_by_connector_credential_pair__no_commit
from onyx.db.document import delete_document_by_connector_credential_pair__no_commit from onyx.db.document import delete_document_by_connector_credential_pair__no_commit
from onyx.db.document import delete_documents_complete__no_commit from onyx.db.document import delete_documents_complete__no_commit
from onyx.db.document import fetch_chunk_count_for_document from onyx.db.document import fetch_chunk_count_for_document
@ -127,6 +128,11 @@ def document_by_cc_pair_cleanup_task(
chunk_count=chunk_count, chunk_count=chunk_count,
) )
delete_chunk_stats_by_connector_credential_pair__no_commit(
db_session=db_session,
document_ids=[document_id],
)
delete_documents_complete__no_commit( delete_documents_complete__no_commit(
db_session=db_session, db_session=db_session,
document_ids=[document_id], document_ids=[document_id],

62
backend/onyx/db/chunk.py Normal file
View File

@ -0,0 +1,62 @@
from datetime import datetime
from datetime import timezone
from sqlalchemy import delete
from sqlalchemy.orm import Session
from onyx.db.models import ChunkStats
def update_chunk_boost_components__no_commit(
chunk_data: list[dict],
db_session: Session,
) -> None:
"""Updates the chunk_boost_components for chunks in the database.
Args:
chunk_data: List of dicts containing chunk_id, document_id, and boost_score
db_session: SQLAlchemy database session
"""
if not chunk_data:
return
for data in chunk_data:
chunk_in_doc_id = str(data.get("chunk_id", ""))
if len(chunk_in_doc_id) == 0:
raise ValueError(f"Chunk ID is empty for chunk {data}")
chunk_stats = (
db_session.query(ChunkStats)
.filter(
ChunkStats.document_id == data["document_id"],
ChunkStats.chunk_in_doc_id == chunk_in_doc_id,
)
.first()
)
boost_components = {"information_content_boost": data["boost_score"]}
if chunk_stats:
# Update existing record
if chunk_stats.chunk_boost_components:
chunk_stats.chunk_boost_components.update(boost_components)
else:
chunk_stats.chunk_boost_components = boost_components
chunk_stats.last_modified = datetime.now(timezone.utc)
else:
# Create new record
chunk_stats = ChunkStats(
# id=data["chunk_id"],
document_id=data["document_id"],
chunk_in_doc_id=chunk_in_doc_id,
chunk_boost_components=boost_components,
)
db_session.add(chunk_stats)
def delete_chunk_stats_by_connector_credential_pair__no_commit(
db_session: Session, document_ids: list[str]
) -> None:
"""This deletes just chunk stats in postgres."""
stmt = delete(ChunkStats).where(ChunkStats.document_id.in_(document_ids))
db_session.execute(stmt)

View File

@ -23,6 +23,7 @@ from sqlalchemy.sql.expression import null
from onyx.configs.constants import DEFAULT_BOOST from onyx.configs.constants import DEFAULT_BOOST
from onyx.configs.constants import DocumentSource from onyx.configs.constants import DocumentSource
from onyx.db.chunk import delete_chunk_stats_by_connector_credential_pair__no_commit
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.engine import get_session_context_manager from onyx.db.engine import get_session_context_manager
from onyx.db.enums import AccessType from onyx.db.enums import AccessType
@ -562,6 +563,13 @@ def delete_documents_complete__no_commit(
db_session: Session, document_ids: list[str] db_session: Session, document_ids: list[str]
) -> None: ) -> None:
"""This completely deletes the documents from the db, including all foreign key relationships""" """This completely deletes the documents from the db, including all foreign key relationships"""
# Start by deleting the chunk stats for the documents
delete_chunk_stats_by_connector_credential_pair__no_commit(
db_session=db_session,
document_ids=document_ids,
)
delete_documents_by_connector_credential_pair__no_commit(db_session, document_ids) delete_documents_by_connector_credential_pair__no_commit(db_session, document_ids)
delete_document_feedback_for_documents__no_commit( delete_document_feedback_for_documents__no_commit(
document_ids=document_ids, db_session=db_session document_ids=document_ids, db_session=db_session

View File

@ -618,7 +618,7 @@ class ChunkStats(Base):
) )
# 0 for neutral, positive for mostly endorse, negative for mostly reject # 0 for neutral, positive for mostly endorse, negative for mostly reject
boost_components: Mapped[dict[str, Any]] = mapped_column(postgresql.JSONB()) chunk_boost_components: Mapped[dict[str, Any]] = mapped_column(postgresql.JSONB())
last_modified: Mapped[datetime.datetime | None] = mapped_column( last_modified: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True), nullable=False, index=True, default=func.now() DateTime(timezone=True), nullable=False, index=True, default=func.now()

View File

@ -20,6 +20,7 @@ from onyx.connectors.models import Document
from onyx.connectors.models import DocumentFailure from onyx.connectors.models import DocumentFailure
from onyx.connectors.models import ImageSection from onyx.connectors.models import ImageSection
from onyx.connectors.models import IndexAttemptMetadata from onyx.connectors.models import IndexAttemptMetadata
from onyx.db.chunk import update_chunk_boost_components__no_commit
from onyx.connectors.models import IndexingDocument from onyx.connectors.models import IndexingDocument
from onyx.connectors.models import Section from onyx.connectors.models import Section
from onyx.connectors.models import TextSection from onyx.connectors.models import TextSection
@ -612,6 +613,15 @@ def index_doc_batch(
) )
updatable_ids = [doc.id for doc in ctx.updatable_docs] updatable_ids = [doc.id for doc in ctx.updatable_docs]
updatable_chunk_data = [
{
"chunk_id": chunk.chunk_id,
"document_id": chunk.source_document.id,
"boost_score": score,
}
for chunk, score in zip(chunks_with_embeddings, chunk_content_scores)
if score != 1.0
]
# Acquires a lock on the documents so that no other process can modify them # Acquires a lock on the documents so that no other process can modify them
# NOTE: don't need to acquire till here, since this is when the actual race condition # NOTE: don't need to acquire till here, since this is when the actual race condition
@ -751,6 +761,10 @@ def index_doc_batch(
db_session=db_session, db_session=db_session,
) )
update_chunk_boost_components__no_commit(
chunk_data=updatable_chunk_data, db_session=db_session
)
db_session.commit() db_session.commit()
result = IndexingPipelineResult( result = IndexingPipelineResult(