better logging and reduce long sessions (#3673)

* testing some tweaks based on issues seen with okteto

* shorten session usage in indexing. still a couple of long running sessions to clean up

* merge sessions

* fixing detached session issues

---------

Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
This commit is contained in:
rkuo-danswer 2025-01-15 17:27:12 -08:00 committed by GitHub
parent beccca5fa2
commit c9a420ec49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 260 additions and 172 deletions

View File

@ -251,8 +251,8 @@ def connector_external_group_sync_generator_task(
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
cc_pair = get_connector_credential_pair_from_id( cc_pair = get_connector_credential_pair_from_id(
cc_pair_id=cc_pair_id,
db_session=db_session, db_session=db_session,
cc_pair_id=cc_pair_id,
) )
if cc_pair is None: if cc_pair is None:
raise ValueError( raise ValueError(

View File

@ -637,7 +637,8 @@ def validate_indexing_fence(
mark_attempt_failed( mark_attempt_failed(
payload.index_attempt_id, payload.index_attempt_id,
db_session, db_session,
"validate_indexing_fence - Canceling index attempt due to missing celery tasks", f"validate_indexing_fence - Canceling index attempt due to missing celery tasks: "
f"index_attempt={payload.index_attempt_id}",
) )
except Exception: except Exception:
logger.exception( logger.exception(

View File

@ -47,7 +47,9 @@ def _initializer(
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME) SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
# Initialize a new engine with desired parameters # Initialize a new engine with desired parameters
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60) SqlEngine.init_engine(
pool_size=4, max_overflow=12, pool_recycle=60, pool_pre_ping=True
)
# Proceed with executing the target function # Proceed with executing the target function
return func(*args, **kwargs) return func(*args, **kwargs)

View File

@ -4,6 +4,7 @@ from datetime import datetime
from datetime import timedelta from datetime import timedelta
from datetime import timezone from datetime import timezone
from pydantic import BaseModel
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from onyx.background.indexing.checkpointing import get_time_windows_for_index_attempt from onyx.background.indexing.checkpointing import get_time_windows_for_index_attempt
@ -11,6 +12,7 @@ from onyx.background.indexing.tracer import OnyxTracer
from onyx.configs.app_configs import INDEXING_SIZE_WARNING_THRESHOLD from onyx.configs.app_configs import INDEXING_SIZE_WARNING_THRESHOLD
from onyx.configs.app_configs import INDEXING_TRACER_INTERVAL from onyx.configs.app_configs import INDEXING_TRACER_INTERVAL
from onyx.configs.app_configs import POLL_CONNECTOR_OFFSET from onyx.configs.app_configs import POLL_CONNECTOR_OFFSET
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import MilestoneRecordType from onyx.configs.constants import MilestoneRecordType
from onyx.connectors.connector_runner import ConnectorRunner from onyx.connectors.connector_runner import ConnectorRunner
from onyx.connectors.factory import instantiate_connector from onyx.connectors.factory import instantiate_connector
@ -21,12 +23,14 @@ from onyx.db.connector_credential_pair import get_last_successful_attempt_time
from onyx.db.connector_credential_pair import update_connector_credential_pair from onyx.db.connector_credential_pair import update_connector_credential_pair
from onyx.db.engine import get_session_with_tenant from onyx.db.engine import get_session_with_tenant
from onyx.db.enums import ConnectorCredentialPairStatus from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.index_attempt import get_index_attempt
from onyx.db.index_attempt import mark_attempt_canceled from onyx.db.index_attempt import mark_attempt_canceled
from onyx.db.index_attempt import mark_attempt_failed from onyx.db.index_attempt import mark_attempt_failed
from onyx.db.index_attempt import mark_attempt_partially_succeeded from onyx.db.index_attempt import mark_attempt_partially_succeeded
from onyx.db.index_attempt import mark_attempt_succeeded from onyx.db.index_attempt import mark_attempt_succeeded
from onyx.db.index_attempt import transition_attempt_to_in_progress from onyx.db.index_attempt import transition_attempt_to_in_progress
from onyx.db.index_attempt import update_docs_indexed from onyx.db.index_attempt import update_docs_indexed
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import IndexAttempt from onyx.db.models import IndexAttempt
from onyx.db.models import IndexingStatus from onyx.db.models import IndexingStatus
from onyx.db.models import IndexModelStatus from onyx.db.models import IndexModelStatus
@ -125,9 +129,21 @@ class ConnectorStopSignal(Exception):
"""A custom exception used to signal a stop in processing.""" """A custom exception used to signal a stop in processing."""
class RunIndexingContext(BaseModel):
index_name: str
cc_pair_id: int
connector_id: int
credential_id: int
source: DocumentSource
earliest_index_time: float
from_beginning: bool
is_primary: bool
search_settings_status: IndexModelStatus
def _run_indexing( def _run_indexing(
db_session: Session, db_session: Session,
index_attempt: IndexAttempt, index_attempt_id: int,
tenant_id: str | None, tenant_id: str | None,
callback: IndexingHeartbeatInterface | None = None, callback: IndexingHeartbeatInterface | None = None,
) -> None: ) -> None:
@ -141,61 +157,76 @@ def _run_indexing(
""" """
start_time = time.time() start_time = time.time()
if index_attempt.search_settings is None: with get_session_with_tenant(tenant_id) as db_session_temp:
index_attempt_start = get_index_attempt(db_session_temp, index_attempt_id)
if not index_attempt_start:
raise ValueError(
f"Index attempt {index_attempt_id} does not exist in DB. This should not be possible."
)
if index_attempt_start.search_settings is None:
raise ValueError( raise ValueError(
"Search settings must be set for indexing. This should not be possible." "Search settings must be set for indexing. This should not be possible."
) )
search_settings = index_attempt.search_settings # search_settings = index_attempt_start.search_settings
db_connector = index_attempt_start.connector_credential_pair.connector
index_name = search_settings.index_name db_credential = index_attempt_start.connector_credential_pair.credential
ctx = RunIndexingContext(
index_name=index_attempt_start.search_settings.index_name,
cc_pair_id=index_attempt_start.connector_credential_pair.id,
connector_id=db_connector.id,
credential_id=db_credential.id,
source=db_connector.source,
earliest_index_time=(
db_connector.indexing_start.timestamp()
if db_connector.indexing_start
else 0
),
from_beginning=index_attempt_start.from_beginning,
# Only update cc-pair status for primary index jobs # Only update cc-pair status for primary index jobs
# Secondary index syncs at the end when swapping # Secondary index syncs at the end when swapping
is_primary = search_settings.status == IndexModelStatus.PRESENT is_primary=(
index_attempt_start.search_settings.status == IndexModelStatus.PRESENT
),
search_settings_status=index_attempt_start.search_settings.status,
)
# Indexing is only done into one index at a time last_successful_index_time = (
document_index = get_default_document_index( ctx.earliest_index_time
primary_index_name=index_name, secondary_index_name=None if ctx.from_beginning
else get_last_successful_attempt_time(
connector_id=ctx.connector_id,
credential_id=ctx.credential_id,
earliest_index=ctx.earliest_index_time,
search_settings=index_attempt_start.search_settings,
db_session=db_session_temp,
)
) )
embedding_model = DefaultIndexingEmbedder.from_db_search_settings( embedding_model = DefaultIndexingEmbedder.from_db_search_settings(
search_settings=search_settings, search_settings=index_attempt_start.search_settings,
callback=callback, callback=callback,
) )
# Indexing is only done into one index at a time
document_index = get_default_document_index(
primary_index_name=ctx.index_name, secondary_index_name=None
)
indexing_pipeline = build_indexing_pipeline( indexing_pipeline = build_indexing_pipeline(
attempt_id=index_attempt.id, attempt_id=index_attempt_id,
embedder=embedding_model, embedder=embedding_model,
document_index=document_index, document_index=document_index,
ignore_time_skip=( ignore_time_skip=(
index_attempt.from_beginning ctx.from_beginning
or (search_settings.status == IndexModelStatus.FUTURE) or (ctx.search_settings_status == IndexModelStatus.FUTURE)
), ),
db_session=db_session, db_session=db_session,
tenant_id=tenant_id, tenant_id=tenant_id,
callback=callback, callback=callback,
) )
db_cc_pair = index_attempt.connector_credential_pair
db_connector = index_attempt.connector_credential_pair.connector
db_credential = index_attempt.connector_credential_pair.credential
earliest_index_time = (
db_connector.indexing_start.timestamp() if db_connector.indexing_start else 0
)
last_successful_index_time = (
earliest_index_time
if index_attempt.from_beginning
else get_last_successful_attempt_time(
connector_id=db_connector.id,
credential_id=db_credential.id,
earliest_index=earliest_index_time,
search_settings=index_attempt.search_settings,
db_session=db_session,
)
)
if INDEXING_TRACER_INTERVAL > 0: if INDEXING_TRACER_INTERVAL > 0:
logger.debug(f"Memory tracer starting: interval={INDEXING_TRACER_INTERVAL}") logger.debug(f"Memory tracer starting: interval={INDEXING_TRACER_INTERVAL}")
tracer = OnyxTracer() tracer = OnyxTracer()
@ -203,8 +234,8 @@ def _run_indexing(
tracer.snap() tracer.snap()
index_attempt_md = IndexAttemptMetadata( index_attempt_md = IndexAttemptMetadata(
connector_id=db_connector.id, connector_id=ctx.connector_id,
credential_id=db_credential.id, credential_id=ctx.credential_id,
) )
batch_num = 0 batch_num = 0
@ -220,15 +251,27 @@ def _run_indexing(
source_type=db_connector.source, source_type=db_connector.source,
) )
): ):
cc_pair_loop: ConnectorCredentialPair | None = None
index_attempt_loop: IndexAttempt | None = None
try: try:
window_start = max( window_start = max(
window_start - timedelta(minutes=POLL_CONNECTOR_OFFSET), window_start - timedelta(minutes=POLL_CONNECTOR_OFFSET),
datetime(1970, 1, 1, tzinfo=timezone.utc), datetime(1970, 1, 1, tzinfo=timezone.utc),
) )
with get_session_with_tenant(tenant_id) as db_session_temp:
index_attempt_loop_start = get_index_attempt(
db_session_temp, index_attempt_id
)
if not index_attempt_loop_start:
raise RuntimeError(
f"Index attempt {index_attempt_id} not found in DB."
)
connector_runner = _get_connector_runner( connector_runner = _get_connector_runner(
db_session=db_session, db_session=db_session_temp,
attempt=index_attempt, attempt=index_attempt_loop_start,
start_time=window_start, start_time=window_start,
end_time=window_end, end_time=window_end,
tenant_id=tenant_id, tenant_id=tenant_id,
@ -249,23 +292,37 @@ def _run_indexing(
raise ConnectorStopSignal("Connector stop signal detected") raise ConnectorStopSignal("Connector stop signal detected")
# TODO: should we move this into the above callback instead? # TODO: should we move this into the above callback instead?
db_session.refresh(db_cc_pair) with get_session_with_tenant(tenant_id) as db_session_temp:
cc_pair_loop = get_connector_credential_pair_from_id(
db_session_temp,
ctx.cc_pair_id,
)
if not cc_pair_loop:
raise RuntimeError(f"CC pair {ctx.cc_pair_id} not found in DB.")
if ( if (
( (
db_cc_pair.status == ConnectorCredentialPairStatus.PAUSED cc_pair_loop.status == ConnectorCredentialPairStatus.PAUSED
and search_settings.status != IndexModelStatus.FUTURE and ctx.search_settings_status != IndexModelStatus.FUTURE
) )
# if it's deleting, we don't care if this is a secondary index # if it's deleting, we don't care if this is a secondary index
or db_cc_pair.status == ConnectorCredentialPairStatus.DELETING or cc_pair_loop.status == ConnectorCredentialPairStatus.DELETING
): ):
# let the `except` block handle this # let the `except` block handle this
raise RuntimeError("Connector was disabled mid run") raise RuntimeError("Connector was disabled mid run")
db_session.refresh(index_attempt) index_attempt_loop = get_index_attempt(
if index_attempt.status != IndexingStatus.IN_PROGRESS: db_session_temp, index_attempt_id
)
if not index_attempt_loop:
raise RuntimeError(
f"Index attempt {index_attempt_id} not found in DB."
)
if index_attempt_loop.status != IndexingStatus.IN_PROGRESS:
# Likely due to user manually disabling it or model swap # Likely due to user manually disabling it or model swap
raise RuntimeError( raise RuntimeError(
f"Index Attempt was canceled, status is {index_attempt.status}" f"Index Attempt was canceled, status is {index_attempt_loop.status}"
) )
batch_description = [] batch_description = []
@ -312,9 +369,10 @@ def _run_indexing(
callback.progress("_run_indexing", len(doc_batch_cleaned)) callback.progress("_run_indexing", len(doc_batch_cleaned))
# This new value is updated every batch, so UI can refresh per batch update # This new value is updated every batch, so UI can refresh per batch update
with get_session_with_tenant(tenant_id) as db_session_temp:
update_docs_indexed( update_docs_indexed(
db_session=db_session, db_session=db_session_temp,
index_attempt=index_attempt, index_attempt_id=index_attempt_id,
total_docs_indexed=document_count, total_docs_indexed=document_count,
new_docs_indexed=net_doc_change, new_docs_indexed=net_doc_change,
docs_removed_from_index=0, docs_removed_from_index=0,
@ -332,11 +390,12 @@ def _run_indexing(
tracer.log_previous_diff(INDEXING_TRACER_NUM_PRINT_ENTRIES) tracer.log_previous_diff(INDEXING_TRACER_NUM_PRINT_ENTRIES)
run_end_dt = window_end run_end_dt = window_end
if is_primary: if ctx.is_primary:
with get_session_with_tenant(tenant_id) as db_session_temp:
update_connector_credential_pair( update_connector_credential_pair(
db_session=db_session, db_session=db_session_temp,
connector_id=db_connector.id, connector_id=ctx.connector_id,
credential_id=db_credential.id, credential_id=ctx.credential_id,
net_docs=net_doc_change, net_docs=net_doc_change,
run_dt=run_end_dt, run_dt=run_end_dt,
) )
@ -346,17 +405,18 @@ def _run_indexing(
) )
if isinstance(e, ConnectorStopSignal): if isinstance(e, ConnectorStopSignal):
with get_session_with_tenant(tenant_id) as db_session_temp:
mark_attempt_canceled( mark_attempt_canceled(
index_attempt.id, index_attempt_id,
db_session, db_session_temp,
reason=str(e), reason=str(e),
) )
if is_primary: if ctx.is_primary:
update_connector_credential_pair( update_connector_credential_pair(
db_session=db_session, db_session=db_session_temp,
connector_id=db_connector.id, connector_id=ctx.connector_id,
credential_id=db_credential.id, credential_id=ctx.credential_id,
net_docs=net_doc_change, net_docs=net_doc_change,
) )
@ -373,21 +433,27 @@ def _run_indexing(
# to give better clarity in the UI, as the next run will never happen. # to give better clarity in the UI, as the next run will never happen.
if ( if (
ind == 0 ind == 0
or not db_cc_pair.status.is_active() or (
or index_attempt.status != IndexingStatus.IN_PROGRESS cc_pair_loop is not None and not cc_pair_loop.status.is_active()
)
or (
index_attempt_loop is not None
and index_attempt_loop.status != IndexingStatus.IN_PROGRESS
)
): ):
with get_session_with_tenant(tenant_id) as db_session_temp:
mark_attempt_failed( mark_attempt_failed(
index_attempt.id, index_attempt_id,
db_session, db_session_temp,
failure_reason=str(e), failure_reason=str(e),
full_exception_trace=traceback.format_exc(), full_exception_trace=traceback.format_exc(),
) )
if is_primary: if ctx.is_primary:
update_connector_credential_pair( update_connector_credential_pair(
db_session=db_session, db_session=db_session_temp,
connector_id=db_connector.id, connector_id=ctx.connector_id,
credential_id=db_credential.id, credential_id=ctx.credential_id,
net_docs=net_doc_change, net_docs=net_doc_change,
) )
@ -412,16 +478,17 @@ def _run_indexing(
index_attempt_md.num_exceptions > 0 index_attempt_md.num_exceptions > 0
and index_attempt_md.num_exceptions >= batch_num and index_attempt_md.num_exceptions >= batch_num
): ):
with get_session_with_tenant(tenant_id) as db_session_temp:
mark_attempt_failed( mark_attempt_failed(
index_attempt.id, index_attempt_id,
db_session, db_session_temp,
failure_reason="All batches exceptioned.", failure_reason="All batches exceptioned.",
) )
if is_primary: if ctx.is_primary:
update_connector_credential_pair( update_connector_credential_pair(
db_session=db_session, db_session=db_session_temp,
connector_id=index_attempt.connector_credential_pair.connector.id, connector_id=ctx.connector_id,
credential_id=index_attempt.connector_credential_pair.credential.id, credential_id=ctx.credential_id,
) )
raise Exception( raise Exception(
f"Connector failed - All batches exceptioned: batches={batch_num}" f"Connector failed - All batches exceptioned: batches={batch_num}"
@ -429,15 +496,16 @@ def _run_indexing(
elapsed_time = time.time() - start_time elapsed_time = time.time() - start_time
with get_session_with_tenant(tenant_id) as db_session_temp:
if index_attempt_md.num_exceptions == 0: if index_attempt_md.num_exceptions == 0:
mark_attempt_succeeded(index_attempt, db_session) mark_attempt_succeeded(index_attempt_id, db_session_temp)
create_milestone_and_report( create_milestone_and_report(
user=None, user=None,
distinct_id=tenant_id or "N/A", distinct_id=tenant_id or "N/A",
event_type=MilestoneRecordType.CONNECTOR_SUCCEEDED, event_type=MilestoneRecordType.CONNECTOR_SUCCEEDED,
properties=None, properties=None,
db_session=db_session, db_session=db_session_temp,
) )
logger.info( logger.info(
@ -445,7 +513,7 @@ def _run_indexing(
f"docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s" f"docs={document_count} chunks={chunk_count} elapsed={elapsed_time:.2f}s"
) )
else: else:
mark_attempt_partially_succeeded(index_attempt, db_session) mark_attempt_partially_succeeded(index_attempt_id, db_session_temp)
logger.info( logger.info(
f"Connector completed with some errors: " f"Connector completed with some errors: "
f"exceptions={index_attempt_md.num_exceptions} " f"exceptions={index_attempt_md.num_exceptions} "
@ -455,11 +523,11 @@ def _run_indexing(
f"elapsed={elapsed_time:.2f}s" f"elapsed={elapsed_time:.2f}s"
) )
if is_primary: if ctx.is_primary:
update_connector_credential_pair( update_connector_credential_pair(
db_session=db_session, db_session=db_session_temp,
connector_id=db_connector.id, connector_id=ctx.connector_id,
credential_id=db_credential.id, credential_id=ctx.credential_id,
run_dt=run_end_dt, run_dt=run_end_dt,
) )
@ -481,26 +549,34 @@ def run_indexing_entrypoint(
index_attempt_id, connector_credential_pair_id index_attempt_id, connector_credential_pair_id
) )
with get_session_with_tenant(tenant_id) as db_session: with get_session_with_tenant(tenant_id) as db_session:
# TODO: remove long running session entirely
attempt = transition_attempt_to_in_progress(index_attempt_id, db_session) attempt = transition_attempt_to_in_progress(index_attempt_id, db_session)
tenant_str = "" tenant_str = ""
if tenant_id is not None: if tenant_id is not None:
tenant_str = f" for tenant {tenant_id}" tenant_str = f" for tenant {tenant_id}"
connector_name = attempt.connector_credential_pair.connector.name
connector_config = (
attempt.connector_credential_pair.connector.connector_specific_config
)
credential_id = attempt.connector_credential_pair.credential_id
logger.info( logger.info(
f"Indexing starting{tenant_str}: " f"Indexing starting{tenant_str}: "
f"connector='{attempt.connector_credential_pair.connector.name}' " f"connector='{connector_name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' " f"config='{connector_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'" f"credentials='{credential_id}'"
) )
_run_indexing(db_session, attempt, tenant_id, callback) with get_session_with_tenant(tenant_id) as db_session:
_run_indexing(db_session, index_attempt_id, tenant_id, callback)
logger.info( logger.info(
f"Indexing finished{tenant_str}: " f"Indexing finished{tenant_str}: "
f"connector='{attempt.connector_credential_pair.connector.name}' " f"connector='{connector_name}' "
f"config='{attempt.connector_credential_pair.connector.connector_specific_config}' " f"config='{connector_config}' "
f"credentials='{attempt.connector_credential_pair.connector_id}'" f"credentials='{credential_id}'"
) )
except Exception as e: except Exception as e:
logger.exception( logger.exception(

View File

@ -182,13 +182,13 @@ def mark_attempt_in_progress(
def mark_attempt_succeeded( def mark_attempt_succeeded(
index_attempt: IndexAttempt, index_attempt_id: int,
db_session: Session, db_session: Session,
) -> None: ) -> None:
try: try:
attempt = db_session.execute( attempt = db_session.execute(
select(IndexAttempt) select(IndexAttempt)
.where(IndexAttempt.id == index_attempt.id) .where(IndexAttempt.id == index_attempt_id)
.with_for_update() .with_for_update()
).scalar_one() ).scalar_one()
@ -200,13 +200,13 @@ def mark_attempt_succeeded(
def mark_attempt_partially_succeeded( def mark_attempt_partially_succeeded(
index_attempt: IndexAttempt, index_attempt_id: int,
db_session: Session, db_session: Session,
) -> None: ) -> None:
try: try:
attempt = db_session.execute( attempt = db_session.execute(
select(IndexAttempt) select(IndexAttempt)
.where(IndexAttempt.id == index_attempt.id) .where(IndexAttempt.id == index_attempt_id)
.with_for_update() .with_for_update()
).scalar_one() ).scalar_one()
@ -265,17 +265,26 @@ def mark_attempt_failed(
def update_docs_indexed( def update_docs_indexed(
db_session: Session, db_session: Session,
index_attempt: IndexAttempt, index_attempt_id: int,
total_docs_indexed: int, total_docs_indexed: int,
new_docs_indexed: int, new_docs_indexed: int,
docs_removed_from_index: int, docs_removed_from_index: int,
) -> None: ) -> None:
index_attempt.total_docs_indexed = total_docs_indexed try:
index_attempt.new_docs_indexed = new_docs_indexed attempt = db_session.execute(
index_attempt.docs_removed_from_index = docs_removed_from_index select(IndexAttempt)
.where(IndexAttempt.id == index_attempt_id)
.with_for_update()
).scalar_one()
db_session.add(index_attempt) attempt.total_docs_indexed = total_docs_indexed
attempt.new_docs_indexed = new_docs_indexed
attempt.docs_removed_from_index = docs_removed_from_index
db_session.commit() db_session.commit()
except Exception:
db_session.rollback()
logger.exception("update_docs_indexed exceptioned.")
raise
def get_last_attempt( def get_last_attempt(