mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-03 09:28:25 +02:00
Fix Indexing Frozen (#660)
This commit is contained in:
parent
517a539d7e
commit
0618b59de6
@ -3,6 +3,7 @@ import time
|
||||
from datetime import datetime
|
||||
from datetime import timezone
|
||||
|
||||
import dask
|
||||
import torch
|
||||
from dask.distributed import Client
|
||||
from dask.distributed import Future
|
||||
@ -44,6 +45,10 @@ from danswer.utils.logger import setup_logger
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
# If the indexing dies, it's most likely due to resource constraints,
|
||||
# restarting just delays the eventual failure, not useful to the user
|
||||
dask.config.set({"distributed.scheduler.allowed-failures": 0})
|
||||
|
||||
_UNEXPECTED_STATE_FAILURE_REASON = (
|
||||
"Stopped mid run, likely due to the background process being killed"
|
||||
)
|
||||
@ -144,6 +149,9 @@ def cleanup_indexing_jobs(
|
||||
if not job.done():
|
||||
continue
|
||||
|
||||
if job.status == "error":
|
||||
logger.error(job.exception())
|
||||
|
||||
job.release()
|
||||
del existing_jobs_copy[attempt_id]
|
||||
index_attempt = get_index_attempt(
|
||||
@ -156,7 +164,7 @@ def cleanup_indexing_jobs(
|
||||
)
|
||||
continue
|
||||
|
||||
if index_attempt.status == IndexingStatus.IN_PROGRESS:
|
||||
if index_attempt.status == IndexingStatus.IN_PROGRESS or job.status == "error":
|
||||
mark_run_failed(
|
||||
db_session=db_session,
|
||||
index_attempt=index_attempt,
|
||||
@ -286,10 +294,10 @@ def _run_indexing(
|
||||
run_dt=run_dt,
|
||||
)
|
||||
|
||||
net_doc_change = 0
|
||||
document_count = 0
|
||||
chunk_count = 0
|
||||
try:
|
||||
net_doc_change = 0
|
||||
document_count = 0
|
||||
chunk_count = 0
|
||||
for doc_batch in doc_batch_generator:
|
||||
logger.debug(
|
||||
f"Indexing batch of documents: {[doc.to_short_descriptor() for doc in doc_batch]}"
|
||||
@ -418,7 +426,14 @@ def kickoff_indexing_jobs(
|
||||
) -> dict[int, Future]:
|
||||
existing_jobs_copy = existing_jobs.copy()
|
||||
|
||||
new_indexing_attempts = get_not_started_index_attempts(db_session)
|
||||
# Don't include jobs waiting in the Dask queue that just haven't started running
|
||||
# Also (rarely) don't include for jobs that started but haven't updated the indexing tables yet
|
||||
new_indexing_attempts = [
|
||||
attempt
|
||||
for attempt in get_not_started_index_attempts(db_session)
|
||||
if attempt.id not in existing_jobs
|
||||
]
|
||||
|
||||
logger.info(f"Found {len(new_indexing_attempts)} new indexing tasks.")
|
||||
|
||||
if not new_indexing_attempts:
|
||||
@ -440,9 +455,6 @@ def kickoff_indexing_jobs(
|
||||
)
|
||||
continue
|
||||
|
||||
if attempt.id in existing_jobs:
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
f"Kicking off indexing attempt for connector: '{attempt.connector.name}', "
|
||||
f"with config: '{attempt.connector.connector_specific_config}', and "
|
||||
|
@ -211,5 +211,5 @@ app = get_application()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
logger.info(f"Running QA Service on http://{APP_HOST}:{str(APP_PORT)}/")
|
||||
logger.info(f"Starting Danswer Backend on http://{APP_HOST}:{str(APP_PORT)}/")
|
||||
uvicorn.run(app, host=APP_HOST, port=APP_PORT)
|
||||
|
@ -1,9 +1,11 @@
|
||||
[supervisord]
|
||||
nodaemon=true
|
||||
logfile=/dev/stdout
|
||||
logfile_maxbytes=0
|
||||
logfile=/var/log/supervisord.log
|
||||
|
||||
[program:indexing]
|
||||
# Indexing is the heaviest job, also requires some CPU intensive steps
|
||||
# Cannot place this in Celery for now because Celery must run as a single process (see note below)
|
||||
# Indexing uses multi-processing to speed things up
|
||||
[program:document_indexing]
|
||||
command=python danswer/background/update.py
|
||||
stdout_logfile=/var/log/update.log
|
||||
stdout_logfile_maxbytes=52428800
|
||||
|
Loading…
x
Reference in New Issue
Block a user