mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-20 21:10:58 +02:00
use indexing flag in db for manually triggering indexing (#3264)
* use indexing flag in db for manually trigger indexing * add comment. * only try to release the lock if we actually succeeded with the lock * ensure we don't trigger manual indexing on anything but the primary search settings * comment usage of primary search settings * run check for indexing immediately after indexing triggers are set * reorder fix
This commit is contained in:
parent
fd84b7a768
commit
5be7d27285
@ -0,0 +1,30 @@
|
||||
"""add indexing trigger to cc_pair
|
||||
|
||||
Revision ID: abe7378b8217
|
||||
Revises: 6d562f86c78b
|
||||
Create Date: 2024-11-26 19:09:53.481171
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "abe7378b8217"
|
||||
down_revision = "93560ba1b118"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"connector_credential_pair",
|
||||
sa.Column(
|
||||
"indexing_trigger",
|
||||
sa.Enum("UPDATE", "REINDEX", name="indexingmode", native_enum=False),
|
||||
nullable=True,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("connector_credential_pair", "indexing_trigger")
|
@ -25,11 +25,13 @@ from danswer.configs.constants import DanswerCeleryPriority
|
||||
from danswer.configs.constants import DanswerCeleryQueues
|
||||
from danswer.configs.constants import DanswerRedisLocks
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.db.connector import mark_ccpair_with_indexing_trigger
|
||||
from danswer.db.connector_credential_pair import fetch_connector_credential_pairs
|
||||
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
|
||||
from danswer.db.engine import get_db_current_time
|
||||
from danswer.db.engine import get_session_with_tenant
|
||||
from danswer.db.enums import ConnectorCredentialPairStatus
|
||||
from danswer.db.enums import IndexingMode
|
||||
from danswer.db.enums import IndexingStatus
|
||||
from danswer.db.enums import IndexModelStatus
|
||||
from danswer.db.index_attempt import create_index_attempt
|
||||
@ -159,7 +161,7 @@ def get_unfenced_index_attempt_ids(db_session: Session, r: redis.Redis) -> list[
|
||||
)
|
||||
def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
tasks_created = 0
|
||||
|
||||
locked = False
|
||||
r = get_redis_client(tenant_id=tenant_id)
|
||||
|
||||
lock_beat: RedisLock = r.lock(
|
||||
@ -172,6 +174,8 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
if not lock_beat.acquire(blocking=False):
|
||||
return None
|
||||
|
||||
locked = True
|
||||
|
||||
# check for search settings swap
|
||||
with get_session_with_tenant(tenant_id=tenant_id) as db_session:
|
||||
old_search_settings = check_index_swap(db_session=db_session)
|
||||
@ -231,22 +235,46 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
last_attempt = get_last_attempt_for_cc_pair(
|
||||
cc_pair.id, search_settings_instance.id, db_session
|
||||
)
|
||||
|
||||
search_settings_primary = False
|
||||
if search_settings_instance.id == primary_search_settings.id:
|
||||
search_settings_primary = True
|
||||
|
||||
if not _should_index(
|
||||
cc_pair=cc_pair,
|
||||
last_index=last_attempt,
|
||||
search_settings_instance=search_settings_instance,
|
||||
search_settings_primary=search_settings_primary,
|
||||
secondary_index_building=len(search_settings) > 1,
|
||||
db_session=db_session,
|
||||
):
|
||||
continue
|
||||
|
||||
reindex = False
|
||||
if search_settings_instance.id == primary_search_settings.id:
|
||||
# the indexing trigger is only checked and cleared with the primary search settings
|
||||
if cc_pair.indexing_trigger is not None:
|
||||
if cc_pair.indexing_trigger == IndexingMode.REINDEX:
|
||||
reindex = True
|
||||
|
||||
task_logger.info(
|
||||
f"Connector indexing manual trigger detected: "
|
||||
f"cc_pair={cc_pair.id} "
|
||||
f"search_settings={search_settings_instance.id} "
|
||||
f"indexing_mode={cc_pair.indexing_trigger}"
|
||||
)
|
||||
|
||||
mark_ccpair_with_indexing_trigger(
|
||||
cc_pair.id, None, db_session
|
||||
)
|
||||
|
||||
# using a task queue and only allowing one task per cc_pair/search_setting
|
||||
# prevents us from starving out certain attempts
|
||||
attempt_id = try_creating_indexing_task(
|
||||
self.app,
|
||||
cc_pair,
|
||||
search_settings_instance,
|
||||
False,
|
||||
reindex,
|
||||
db_session,
|
||||
r,
|
||||
tenant_id,
|
||||
@ -281,7 +309,6 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
mark_attempt_failed(
|
||||
attempt.id, db_session, failure_reason=failure_reason
|
||||
)
|
||||
|
||||
except SoftTimeLimitExceeded:
|
||||
task_logger.info(
|
||||
"Soft time limit exceeded, task is being terminated gracefully."
|
||||
@ -289,6 +316,7 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
|
||||
except Exception:
|
||||
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
|
||||
finally:
|
||||
if locked:
|
||||
if lock_beat.owned():
|
||||
lock_beat.release()
|
||||
else:
|
||||
@ -304,6 +332,7 @@ def _should_index(
|
||||
cc_pair: ConnectorCredentialPair,
|
||||
last_index: IndexAttempt | None,
|
||||
search_settings_instance: SearchSettings,
|
||||
search_settings_primary: bool,
|
||||
secondary_index_building: bool,
|
||||
db_session: Session,
|
||||
) -> bool:
|
||||
@ -368,6 +397,11 @@ def _should_index(
|
||||
):
|
||||
return False
|
||||
|
||||
if search_settings_primary:
|
||||
if cc_pair.indexing_trigger is not None:
|
||||
# if a manual indexing trigger is on the cc pair, honor it for primary search settings
|
||||
return True
|
||||
|
||||
# if no attempt has ever occurred, we should index regardless of refresh_freq
|
||||
if not last_index:
|
||||
return True
|
||||
|
@ -1,6 +1,8 @@
|
||||
"""Factory stub for running celery worker / celery beat."""
|
||||
from celery import Celery
|
||||
|
||||
from danswer.background.celery.apps.beat import celery_app
|
||||
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
|
||||
|
||||
set_is_ee_based_on_env_variable()
|
||||
app = celery_app
|
||||
app: Celery = celery_app
|
||||
|
@ -1,8 +1,10 @@
|
||||
"""Factory stub for running celery worker / celery beat."""
|
||||
from celery import Celery
|
||||
|
||||
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
||||
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
|
||||
|
||||
set_is_ee_based_on_env_variable()
|
||||
app = fetch_versioned_implementation(
|
||||
app: Celery = fetch_versioned_implementation(
|
||||
"danswer.background.celery.apps.primary", "celery_app"
|
||||
)
|
||||
|
@ -12,6 +12,7 @@ from sqlalchemy.orm import Session
|
||||
from danswer.configs.app_configs import DEFAULT_PRUNING_FREQ
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.models import InputType
|
||||
from danswer.db.enums import IndexingMode
|
||||
from danswer.db.models import Connector
|
||||
from danswer.db.models import ConnectorCredentialPair
|
||||
from danswer.db.models import IndexAttempt
|
||||
@ -311,3 +312,25 @@ def mark_cc_pair_as_external_group_synced(db_session: Session, cc_pair_id: int)
|
||||
# If this changes, we need to update this function.
|
||||
cc_pair.last_time_external_group_sync = datetime.now(timezone.utc)
|
||||
db_session.commit()
|
||||
|
||||
|
||||
def mark_ccpair_with_indexing_trigger(
|
||||
cc_pair_id: int, indexing_mode: IndexingMode | None, db_session: Session
|
||||
) -> None:
|
||||
"""indexing_mode sets a field which will be picked up by a background task
|
||||
to trigger indexing. Set to None to disable the trigger."""
|
||||
try:
|
||||
cc_pair = db_session.execute(
|
||||
select(ConnectorCredentialPair)
|
||||
.where(ConnectorCredentialPair.id == cc_pair_id)
|
||||
.with_for_update()
|
||||
).scalar_one()
|
||||
|
||||
if cc_pair is None:
|
||||
raise ValueError(f"No cc_pair with ID: {cc_pair_id}")
|
||||
|
||||
cc_pair.indexing_trigger = indexing_mode
|
||||
db_session.commit()
|
||||
except Exception:
|
||||
db_session.rollback()
|
||||
raise
|
||||
|
@ -19,6 +19,11 @@ class IndexingStatus(str, PyEnum):
|
||||
return self in terminal_states
|
||||
|
||||
|
||||
class IndexingMode(str, PyEnum):
|
||||
UPDATE = "update"
|
||||
REINDEX = "reindex"
|
||||
|
||||
|
||||
# these may differ in the future, which is why we're okay with this duplication
|
||||
class DeletionStatus(str, PyEnum):
|
||||
NOT_STARTED = "not_started"
|
||||
|
@ -42,7 +42,7 @@ from danswer.configs.constants import DEFAULT_BOOST
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.configs.constants import FileOrigin
|
||||
from danswer.configs.constants import MessageType
|
||||
from danswer.db.enums import AccessType
|
||||
from danswer.db.enums import AccessType, IndexingMode
|
||||
from danswer.configs.constants import NotificationType
|
||||
from danswer.configs.constants import SearchFeedbackType
|
||||
from danswer.configs.constants import TokenRateLimitScope
|
||||
@ -438,6 +438,10 @@ class ConnectorCredentialPair(Base):
|
||||
|
||||
total_docs_indexed: Mapped[int] = mapped_column(Integer, default=0)
|
||||
|
||||
indexing_trigger: Mapped[IndexingMode | None] = mapped_column(
|
||||
Enum(IndexingMode, native_enum=False), nullable=True
|
||||
)
|
||||
|
||||
connector: Mapped["Connector"] = relationship(
|
||||
"Connector", back_populates="credentials"
|
||||
)
|
||||
|
@ -45,6 +45,7 @@ from danswer.configs.constants import AuthType
|
||||
from danswer.configs.constants import POSTGRES_WEB_APP_NAME
|
||||
from danswer.db.engine import SqlEngine
|
||||
from danswer.db.engine import warm_up_connections
|
||||
from danswer.server.api_key.api import router as api_key_router
|
||||
from danswer.server.auth_check import check_router_auth
|
||||
from danswer.server.danswer_api.ingestion import router as danswer_api_router
|
||||
from danswer.server.documents.cc_pair import router as cc_pair_router
|
||||
@ -91,7 +92,6 @@ from danswer.server.settings.api import basic_router as settings_router
|
||||
from danswer.server.token_rate_limits.api import (
|
||||
router as token_rate_limit_settings_router,
|
||||
)
|
||||
from danswer.server.api_key.api import router as api_key_router
|
||||
from danswer.setup import setup_danswer
|
||||
from danswer.setup import setup_multitenant_danswer
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
@ -17,9 +17,9 @@ from danswer.auth.users import current_admin_user
|
||||
from danswer.auth.users import current_curator_or_admin_user
|
||||
from danswer.auth.users import current_user
|
||||
from danswer.background.celery.celery_utils import get_deletion_attempt_snapshot
|
||||
from danswer.background.celery.tasks.indexing.tasks import try_creating_indexing_task
|
||||
from danswer.background.celery.versioned_apps.primary import app as primary_app
|
||||
from danswer.configs.app_configs import ENABLED_CONNECTOR_TYPES
|
||||
from danswer.configs.constants import DanswerCeleryPriority
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.configs.constants import FileOrigin
|
||||
from danswer.connectors.google_utils.google_auth import (
|
||||
@ -59,6 +59,7 @@ from danswer.db.connector import delete_connector
|
||||
from danswer.db.connector import fetch_connector_by_id
|
||||
from danswer.db.connector import fetch_connectors
|
||||
from danswer.db.connector import get_connector_credential_ids
|
||||
from danswer.db.connector import mark_ccpair_with_indexing_trigger
|
||||
from danswer.db.connector import update_connector
|
||||
from danswer.db.connector_credential_pair import add_credential_to_connector
|
||||
from danswer.db.connector_credential_pair import get_cc_pair_groups_for_ids
|
||||
@ -74,6 +75,7 @@ from danswer.db.document import get_document_counts_for_cc_pairs
|
||||
from danswer.db.engine import get_current_tenant_id
|
||||
from danswer.db.engine import get_session
|
||||
from danswer.db.enums import AccessType
|
||||
from danswer.db.enums import IndexingMode
|
||||
from danswer.db.index_attempt import get_index_attempts_for_cc_pair
|
||||
from danswer.db.index_attempt import get_latest_index_attempt_for_cc_pair_id
|
||||
from danswer.db.index_attempt import get_latest_index_attempts
|
||||
@ -86,7 +88,6 @@ from danswer.db.search_settings import get_secondary_search_settings
|
||||
from danswer.file_store.file_store import get_default_file_store
|
||||
from danswer.key_value_store.interface import KvKeyNotFoundError
|
||||
from danswer.redis.redis_connector import RedisConnector
|
||||
from danswer.redis.redis_pool import get_redis_client
|
||||
from danswer.server.documents.models import AuthStatus
|
||||
from danswer.server.documents.models import AuthUrl
|
||||
from danswer.server.documents.models import ConnectorCredentialPairIdentifier
|
||||
@ -792,12 +793,10 @@ def connector_run_once(
|
||||
_: User = Depends(current_curator_or_admin_user),
|
||||
db_session: Session = Depends(get_session),
|
||||
tenant_id: str = Depends(get_current_tenant_id),
|
||||
) -> StatusResponse[list[int]]:
|
||||
) -> StatusResponse[int]:
|
||||
"""Used to trigger indexing on a set of cc_pairs associated with a
|
||||
single connector."""
|
||||
|
||||
r = get_redis_client(tenant_id=tenant_id)
|
||||
|
||||
connector_id = run_info.connector_id
|
||||
specified_credential_ids = run_info.credential_ids
|
||||
|
||||
@ -843,54 +842,41 @@ def connector_run_once(
|
||||
)
|
||||
]
|
||||
|
||||
search_settings = get_current_search_settings(db_session)
|
||||
|
||||
connector_credential_pairs = [
|
||||
get_connector_credential_pair(connector_id, credential_id, db_session)
|
||||
for credential_id in credential_ids
|
||||
if credential_id not in skipped_credentials
|
||||
]
|
||||
|
||||
index_attempt_ids = []
|
||||
num_triggers = 0
|
||||
for cc_pair in connector_credential_pairs:
|
||||
if cc_pair is not None:
|
||||
attempt_id = try_creating_indexing_task(
|
||||
primary_app,
|
||||
cc_pair,
|
||||
search_settings,
|
||||
run_info.from_beginning,
|
||||
db_session,
|
||||
r,
|
||||
tenant_id,
|
||||
)
|
||||
if attempt_id:
|
||||
indexing_mode = IndexingMode.UPDATE
|
||||
if run_info.from_beginning:
|
||||
indexing_mode = IndexingMode.REINDEX
|
||||
|
||||
mark_ccpair_with_indexing_trigger(cc_pair.id, indexing_mode, db_session)
|
||||
num_triggers += 1
|
||||
|
||||
logger.info(
|
||||
f"connector_run_once - try_creating_indexing_task succeeded: "
|
||||
f"connector_run_once - marking cc_pair with indexing trigger: "
|
||||
f"connector={run_info.connector_id} "
|
||||
f"cc_pair={cc_pair.id} "
|
||||
f"attempt={attempt_id} "
|
||||
)
|
||||
index_attempt_ids.append(attempt_id)
|
||||
else:
|
||||
logger.info(
|
||||
f"connector_run_once - try_creating_indexing_task failed: "
|
||||
f"connector={run_info.connector_id} "
|
||||
f"cc_pair={cc_pair.id}"
|
||||
f"indexing_trigger={indexing_mode}"
|
||||
)
|
||||
|
||||
if not index_attempt_ids:
|
||||
msg = "No new indexing attempts created, indexing jobs are queued or running."
|
||||
logger.info(msg)
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail=msg,
|
||||
# run the beat task to pick up the triggers immediately
|
||||
primary_app.send_task(
|
||||
"check_for_indexing",
|
||||
priority=DanswerCeleryPriority.HIGH,
|
||||
kwargs={"tenant_id": tenant_id},
|
||||
)
|
||||
|
||||
msg = f"Successfully created {len(index_attempt_ids)} index attempts. {index_attempt_ids}"
|
||||
msg = f"Marked {num_triggers} index attempts with indexing triggers."
|
||||
return StatusResponse(
|
||||
success=True,
|
||||
message=msg,
|
||||
data=index_attempt_ids,
|
||||
data=num_triggers,
|
||||
)
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user