Reindex All Backend (#1049)

This commit is contained in:
Yuhong Sun 2024-02-06 23:07:24 -08:00 committed by GitHub
parent 62000c1e46
commit 2362c2bdcc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 51 additions and 23 deletions

View File

@ -0,0 +1,27 @@
"""Index From Beginning
Revision ID: ec3ec2eabf7b
Revises: dbaa756c2ccf
Create Date: 2024-02-06 22:03:28.098158
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "ec3ec2eabf7b"
down_revision = "dbaa756c2ccf"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.add_column(
"index_attempt", sa.Column("from_beginning", sa.Boolean(), nullable=True)
)
op.execute("UPDATE index_attempt SET from_beginning = False")
op.alter_column("index_attempt", "from_beginning", nullable=False)
def downgrade() -> None:
op.drop_column("index_attempt", "from_beginning")

View File

@ -128,16 +128,21 @@ def _run_indexing(
indexing_pipeline = build_indexing_pipeline(
embedder=embedding_model,
document_index=document_index,
ignore_time_skip=(db_embedding_model.status == IndexModelStatus.FUTURE),
ignore_time_skip=index_attempt.from_beginning
or (db_embedding_model.status == IndexModelStatus.FUTURE),
)
db_connector = index_attempt.connector
db_credential = index_attempt.credential
last_successful_index_time = get_last_successful_attempt_time(
connector_id=db_connector.id,
credential_id=db_credential.id,
embedding_model=index_attempt.embedding_model,
db_session=db_session,
last_successful_index_time = (
0.0
if index_attempt.from_beginning
else get_last_successful_attempt_time(
connector_id=db_connector.id,
credential_id=db_credential.id,
embedding_model=index_attempt.embedding_model,
db_session=db_session,
)
)
net_doc_change = 0

View File

@ -35,11 +35,13 @@ def create_index_attempt(
credential_id: int,
embedding_model_id: int | None,
db_session: Session,
from_beginning: bool = False,
) -> int:
new_attempt = IndexAttempt(
connector_id=connector_id,
credential_id=credential_id,
embedding_model_id=embedding_model_id,
from_beginning=from_beginning,
status=IndexingStatus.NOT_STARTED,
)
db_session.add(new_attempt)

View File

@ -419,6 +419,10 @@ class IndexAttempt(Base):
ForeignKey("credential.id"),
nullable=True,
)
# Some index attempts that run from beginning will still have this as False
# This is only for attempts that are explicitly marked as from the start via
# the run once API
from_beginning: Mapped[bool] = mapped_column(Boolean)
status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
# The two below may be slightly out of sync if user switches Embedding Model
new_docs_indexed: Mapped[int | None] = mapped_column(Integer, default=0)

View File

@ -55,7 +55,6 @@ from danswer.db.credentials import fetch_credential_by_id
from danswer.db.deletion_attempt import check_deletion_attempt_is_allowed
from danswer.db.document import get_document_cnts_for_cc_pairs
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.engine import get_session
from danswer.db.index_attempt import cancel_indexing_attempts_for_connector
from danswer.db.index_attempt import create_index_attempt
@ -529,6 +528,7 @@ def connector_run_once(
connector_id=run_info.connector_id,
credential_id=credential_id,
),
only_current=True,
disinclude_finished=True,
db_session=db_session,
)
@ -536,29 +536,18 @@ def connector_run_once(
embedding_model = get_current_db_embedding_model(db_session)
secondary_embedding_model = get_secondary_db_embedding_model(db_session)
index_attempt_ids = [
create_index_attempt(
run_info.connector_id, credential_id, embedding_model.id, db_session
connector_id=run_info.connector_id,
credential_id=credential_id,
embedding_model_id=embedding_model.id,
from_beginning=run_info.from_beginning,
db_session=db_session,
)
for credential_id in credential_ids
if credential_id not in skipped_credentials
]
if secondary_embedding_model is not None:
# Secondary index doesn't have to be returned
[
create_index_attempt(
run_info.connector_id,
credential_id,
secondary_embedding_model.id,
db_session,
)
for credential_id in credential_ids
if credential_id not in skipped_credentials
]
if not index_attempt_ids:
raise HTTPException(
status_code=400,

View File

@ -189,6 +189,7 @@ class ConnectorCredentialPairDescriptor(BaseModel):
class RunConnectorRequest(BaseModel):
connector_id: int
credential_ids: list[int] | None
from_beginning: bool = False
"""Connectors Models"""