Add better logging for connectors (#2219)

* Add better logging for connectors

* fix
This commit is contained in:
Chris Weaver 2024-08-22 20:58:29 -07:00 committed by GitHub
parent b1302303b2
commit 5cb9c17ddf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 78 additions and 33 deletions

View File

@ -11,12 +11,9 @@ from danswer.background.indexing.tracer import DanswerTracer
from danswer.configs.app_configs import INDEXING_SIZE_WARNING_THRESHOLD
from danswer.configs.app_configs import INDEXING_TRACER_INTERVAL
from danswer.configs.app_configs import POLL_CONNECTOR_OFFSET
from danswer.connectors.connector_runner import ConnectorRunner
from danswer.connectors.factory import instantiate_connector
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.models import IndexAttemptMetadata
from danswer.connectors.models import InputType
from danswer.db.connector_credential_pair import get_last_successful_attempt_time
from danswer.db.connector_credential_pair import update_connector_credential_pair
from danswer.db.engine import get_sqlalchemy_engine
@ -42,12 +39,12 @@ logger = setup_logger()
INDEXING_TRACER_NUM_PRINT_ENTRIES = 5
def _get_document_generator(
def _get_connector_runner(
db_session: Session,
attempt: IndexAttempt,
start_time: datetime,
end_time: datetime,
) -> GenerateDocumentsOutput:
) -> ConnectorRunner:
"""
NOTE: `start_time` and `end_time` are only used for poll connectors
@ -77,31 +74,9 @@ def _get_document_generator(
)
raise e
if task == InputType.LOAD_STATE:
assert isinstance(runnable_connector, LoadConnector)
doc_batch_generator = runnable_connector.load_from_state()
elif task == InputType.POLL:
assert isinstance(runnable_connector, PollConnector)
if (
attempt.connector_credential_pair.connector_id is None
or attempt.connector_credential_pair.connector_id is None
):
raise ValueError(
f"Polling attempt {attempt.id} is missing connector_id or credential_id, "
f"can't fetch time range."
)
logger.info(f"Polling for updates between {start_time} and {end_time}")
doc_batch_generator = runnable_connector.poll_source(
start=start_time.timestamp(), end=end_time.timestamp()
)
else:
# Event types cannot be handled by a background type
raise RuntimeError(f"Invalid task type: {task}")
return doc_batch_generator
return ConnectorRunner(
connector=runnable_connector, time_range=(start_time, end_time)
)
def _run_indexing(
@ -189,7 +164,7 @@ def _run_indexing(
datetime(1970, 1, 1, tzinfo=timezone.utc),
)
doc_batch_generator = _get_document_generator(
connector_runner = _get_connector_runner(
db_session=db_session,
attempt=index_attempt,
start_time=window_start,
@ -201,7 +176,7 @@ def _run_indexing(
tracer_counter = 0
if INDEXING_TRACER_INTERVAL > 0:
tracer.snap()
for doc_batch in doc_batch_generator:
for doc_batch in connector_runner.run():
# Check if connector is disabled mid run and stop if so unless it's the secondary
# index being built. We want to populate it even for paused connectors
# Often paused connectors are sources that aren't updated frequently but the

View File

@ -0,0 +1,70 @@
import sys
from datetime import datetime
from danswer.connectors.interfaces import BaseConnector
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.utils.logger import setup_logger
logger = setup_logger()
TimeRange = tuple[datetime, datetime]
class ConnectorRunner:
def __init__(
self,
connector: BaseConnector,
time_range: TimeRange | None = None,
fail_loudly: bool = False,
):
self.connector = connector
if isinstance(self.connector, PollConnector):
if time_range is None:
raise ValueError("time_range is required for PollConnector")
self.doc_batch_generator = self.connector.poll_source(
time_range[0].timestamp(), time_range[1].timestamp()
)
elif isinstance(self.connector, LoadConnector):
if time_range and fail_loudly:
raise ValueError(
"time_range specified, but passed in connector is not a PollConnector"
)
self.doc_batch_generator = self.connector.load_from_state()
else:
raise ValueError(f"Invalid connector. type: {type(self.connector)}")
def run(self) -> GenerateDocumentsOutput:
"""Adds additional exception logging to the connector."""
try:
yield from self.doc_batch_generator
except Exception:
exc_type, _, exc_traceback = sys.exc_info()
# Traverse the traceback to find the last frame where the exception was raised
tb = exc_traceback
if tb is None:
logger.error("No traceback found for exception")
raise
while tb.tb_next:
tb = tb.tb_next # Move to the next frame in the traceback
# Get the local variables from the frame where the exception occurred
local_vars = tb.tb_frame.f_locals
local_vars_str = "\n".join(
f"{key}: {value}" for key, value in local_vars.items()
)
logger.error(
f"Error in connector. type: {exc_type};\n"
f"local_vars below -> \n{local_vars_str}"
)
raise