diff --git a/backend/danswer/background/indexing/run_indexing.py b/backend/danswer/background/indexing/run_indexing.py index d65b4b0c9..86b428536 100644 --- a/backend/danswer/background/indexing/run_indexing.py +++ b/backend/danswer/background/indexing/run_indexing.py @@ -384,17 +384,22 @@ def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexA return attempt -def run_indexing_entrypoint(index_attempt_id: int, is_ee: bool = False) -> None: +def run_indexing_entrypoint( + index_attempt_id: int, connector_credential_pair_id: int, is_ee: bool = False +) -> None: """Entrypoint for indexing run when using dask distributed. Wraps the actual logic in a `try` block so that we can catch any exceptions and mark the attempt as failed.""" + try: if is_ee: global_version.set_ee() # set the indexing attempt ID so that all log messages from this process # will have it added as a prefix - IndexAttemptSingleton.set_index_attempt_id(index_attempt_id) + IndexAttemptSingleton.set_cc_and_index_id( + index_attempt_id, connector_credential_pair_id + ) with Session(get_sqlalchemy_engine()) as db_session: # make sure that it is valid to run this indexing attempt + mark it diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 10fa36a1d..5fde7cb3d 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -341,6 +341,7 @@ def kickoff_indexing_jobs( run = secondary_client.submit( run_indexing_entrypoint, attempt.id, + attempt.connector_credential_pair_id, global_version.get_is_ee_version(), pure=False, ) @@ -348,6 +349,7 @@ def kickoff_indexing_jobs( run = client.submit( run_indexing_entrypoint, attempt.id, + attempt.connector_credential_pair_id, global_version.get_is_ee_version(), pure=False, ) diff --git a/backend/danswer/utils/logger.py b/backend/danswer/utils/logger.py index a7751ca3d..9489a6244 100644 --- a/backend/danswer/utils/logger.py +++ b/backend/danswer/utils/logger.py @@ -19,14 +19,22 @@ class IndexAttemptSingleton: main background job (scheduler), etc. this will not be used.""" _INDEX_ATTEMPT_ID: None | int = None + _CONNECTOR_CREDENTIAL_PAIR_ID: None | int = None @classmethod def get_index_attempt_id(cls) -> None | int: return cls._INDEX_ATTEMPT_ID @classmethod - def set_index_attempt_id(cls, index_attempt_id: int) -> None: + def get_connector_credential_pair_id(cls) -> None | int: + return cls._CONNECTOR_CREDENTIAL_PAIR_ID + + @classmethod + def set_cc_and_index_id( + cls, index_attempt_id: int, connector_credential_pair_id: int + ) -> None: cls._INDEX_ATTEMPT_ID = index_attempt_id + cls._CONNECTOR_CREDENTIAL_PAIR_ID = connector_credential_pair_id def get_log_level_from_str(log_level_str: str = LOG_LEVEL) -> int: @@ -50,9 +58,14 @@ class DanswerLoggingAdapter(logging.LoggerAdapter): # If this is an indexing job, add the attempt ID to the log message # This helps filter the logs for this specific indexing attempt_id = IndexAttemptSingleton.get_index_attempt_id() + cc_pair_id = IndexAttemptSingleton.get_connector_credential_pair_id() + if attempt_id is not None: msg = f"[Attempt ID: {attempt_id}] {msg}" + if cc_pair_id is not None: + msg = f"[CC Pair ID: {cc_pair_id}] {msg}" + # For Slack Bot, logs the channel relevant to the request channel_id = self.extra.get(SLACK_CHANNEL_ID) if self.extra else None if channel_id: