Mark indexing jobs as ee when running ee supervisord

This commit is contained in:
Weves 2023-11-29 22:47:29 -08:00 committed by Chris Weaver
parent 670de6c00d
commit f82ae158ea
4 changed files with 29 additions and 4 deletions

View File

@ -31,6 +31,7 @@ from danswer.indexing.embedder import DefaultIndexingEmbedder
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
from danswer.utils.logger import IndexAttemptSingleton
from danswer.utils.logger import setup_logger
from danswer.utils.variable_functionality import global_version
logger = setup_logger()
@ -303,11 +304,16 @@ def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexA
return attempt
def run_indexing_entrypoint(index_attempt_id: int) -> None:
def run_indexing_entrypoint(
index_attempt_id: int, is_ee: bool = False
) -> None:
"""Entrypoint for indexing run when using dask distributed.
Wraps the actual logic in a `try` block so that we can catch any exceptions
and mark the attempt as failed."""
try:
if is_ee:
global_version.set_ee()
# set the indexing attempt ID so that all log messages from this process
# will have it added as a prefix
IndexAttemptSingleton.set_index_attempt_id(index_attempt_id)

View File

@ -38,6 +38,7 @@ from danswer.utils.logger import setup_logger
from shared_configs.configs import INDEXING_MODEL_SERVER_HOST
from shared_configs.configs import LOG_LEVEL
from shared_configs.configs import MODEL_SERVER_PORT
from danswer.utils.variable_functionality import global_version
logger = setup_logger()
@ -307,10 +308,18 @@ def kickoff_indexing_jobs(
if use_secondary_index:
run = secondary_client.submit(
run_indexing_entrypoint, attempt.id, pure=False
run_indexing_entrypoint,
attempt.id,
global_version.get_is_ee_version(),
pure=False
)
else:
run = client.submit(run_indexing_entrypoint, attempt.id, pure=False)
run = client.submit(
run_indexing_entrypoint,
attempt.id,
global_version.get_is_ee_version(),
pure=False
)
if run:
secondary_str = "(secondary index) " if use_secondary_index else ""

View File

@ -5,7 +5,7 @@ logfile_maxbytes=0
[program:indexing]
environment=CURRENT_PROCESS_IS_AN_INDEXING_JOB=true
command=python danswer/background/update.py
command=python ee/danswer/background/update.py
stdout_logfile=/var/log/update.log
stdout_logfile_maxbytes=52428800
redirect_stderr=true

View File

@ -0,0 +1,10 @@
from danswer.background.update import update__main
from danswer.utils.variable_functionality import global_version
if __name__ == "__main__":
# mark this as the EE-enabled indexing job
global_version.set_ee()
# run the usual flow, any future branching will be done with a
# call to `global_version.get_is_ee_version()`
update__main()