mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-06 04:59:24 +02:00
add ccpair id to logging (#2391)
This commit is contained in:
parent
9f6e8bd124
commit
0d749ebd46
@ -384,17 +384,22 @@ def _prepare_index_attempt(db_session: Session, index_attempt_id: int) -> IndexA
|
|||||||
return attempt
|
return attempt
|
||||||
|
|
||||||
|
|
||||||
def run_indexing_entrypoint(index_attempt_id: int, is_ee: bool = False) -> None:
|
def run_indexing_entrypoint(
|
||||||
|
index_attempt_id: int, connector_credential_pair_id: int, is_ee: bool = False
|
||||||
|
) -> None:
|
||||||
"""Entrypoint for indexing run when using dask distributed.
|
"""Entrypoint for indexing run when using dask distributed.
|
||||||
Wraps the actual logic in a `try` block so that we can catch any exceptions
|
Wraps the actual logic in a `try` block so that we can catch any exceptions
|
||||||
and mark the attempt as failed."""
|
and mark the attempt as failed."""
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if is_ee:
|
if is_ee:
|
||||||
global_version.set_ee()
|
global_version.set_ee()
|
||||||
|
|
||||||
# set the indexing attempt ID so that all log messages from this process
|
# set the indexing attempt ID so that all log messages from this process
|
||||||
# will have it added as a prefix
|
# will have it added as a prefix
|
||||||
IndexAttemptSingleton.set_index_attempt_id(index_attempt_id)
|
IndexAttemptSingleton.set_cc_and_index_id(
|
||||||
|
index_attempt_id, connector_credential_pair_id
|
||||||
|
)
|
||||||
|
|
||||||
with Session(get_sqlalchemy_engine()) as db_session:
|
with Session(get_sqlalchemy_engine()) as db_session:
|
||||||
# make sure that it is valid to run this indexing attempt + mark it
|
# make sure that it is valid to run this indexing attempt + mark it
|
||||||
|
@ -341,6 +341,7 @@ def kickoff_indexing_jobs(
|
|||||||
run = secondary_client.submit(
|
run = secondary_client.submit(
|
||||||
run_indexing_entrypoint,
|
run_indexing_entrypoint,
|
||||||
attempt.id,
|
attempt.id,
|
||||||
|
attempt.connector_credential_pair_id,
|
||||||
global_version.get_is_ee_version(),
|
global_version.get_is_ee_version(),
|
||||||
pure=False,
|
pure=False,
|
||||||
)
|
)
|
||||||
@ -348,6 +349,7 @@ def kickoff_indexing_jobs(
|
|||||||
run = client.submit(
|
run = client.submit(
|
||||||
run_indexing_entrypoint,
|
run_indexing_entrypoint,
|
||||||
attempt.id,
|
attempt.id,
|
||||||
|
attempt.connector_credential_pair_id,
|
||||||
global_version.get_is_ee_version(),
|
global_version.get_is_ee_version(),
|
||||||
pure=False,
|
pure=False,
|
||||||
)
|
)
|
||||||
|
@ -19,14 +19,22 @@ class IndexAttemptSingleton:
|
|||||||
main background job (scheduler), etc. this will not be used."""
|
main background job (scheduler), etc. this will not be used."""
|
||||||
|
|
||||||
_INDEX_ATTEMPT_ID: None | int = None
|
_INDEX_ATTEMPT_ID: None | int = None
|
||||||
|
_CONNECTOR_CREDENTIAL_PAIR_ID: None | int = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_index_attempt_id(cls) -> None | int:
|
def get_index_attempt_id(cls) -> None | int:
|
||||||
return cls._INDEX_ATTEMPT_ID
|
return cls._INDEX_ATTEMPT_ID
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def set_index_attempt_id(cls, index_attempt_id: int) -> None:
|
def get_connector_credential_pair_id(cls) -> None | int:
|
||||||
|
return cls._CONNECTOR_CREDENTIAL_PAIR_ID
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def set_cc_and_index_id(
|
||||||
|
cls, index_attempt_id: int, connector_credential_pair_id: int
|
||||||
|
) -> None:
|
||||||
cls._INDEX_ATTEMPT_ID = index_attempt_id
|
cls._INDEX_ATTEMPT_ID = index_attempt_id
|
||||||
|
cls._CONNECTOR_CREDENTIAL_PAIR_ID = connector_credential_pair_id
|
||||||
|
|
||||||
|
|
||||||
def get_log_level_from_str(log_level_str: str = LOG_LEVEL) -> int:
|
def get_log_level_from_str(log_level_str: str = LOG_LEVEL) -> int:
|
||||||
@ -50,9 +58,14 @@ class DanswerLoggingAdapter(logging.LoggerAdapter):
|
|||||||
# If this is an indexing job, add the attempt ID to the log message
|
# If this is an indexing job, add the attempt ID to the log message
|
||||||
# This helps filter the logs for this specific indexing
|
# This helps filter the logs for this specific indexing
|
||||||
attempt_id = IndexAttemptSingleton.get_index_attempt_id()
|
attempt_id = IndexAttemptSingleton.get_index_attempt_id()
|
||||||
|
cc_pair_id = IndexAttemptSingleton.get_connector_credential_pair_id()
|
||||||
|
|
||||||
if attempt_id is not None:
|
if attempt_id is not None:
|
||||||
msg = f"[Attempt ID: {attempt_id}] {msg}"
|
msg = f"[Attempt ID: {attempt_id}] {msg}"
|
||||||
|
|
||||||
|
if cc_pair_id is not None:
|
||||||
|
msg = f"[CC Pair ID: {cc_pair_id}] {msg}"
|
||||||
|
|
||||||
# For Slack Bot, logs the channel relevant to the request
|
# For Slack Bot, logs the channel relevant to the request
|
||||||
channel_id = self.extra.get(SLACK_CHANNEL_ID) if self.extra else None
|
channel_id = self.extra.get(SLACK_CHANNEL_ID) if self.extra else None
|
||||||
if channel_id:
|
if channel_id:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user