mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-05 17:08:36 +02:00
Bugfix/slack timeout (#4652)
* don't log all channels * print number of channels * sanitize indexing exception messages * harden vespa index swap * use constants and fix list generation --------- Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
This commit is contained in:
@@ -896,7 +896,11 @@ def connector_indexing_task(
|
||||
f"cc_pair={cc_pair_id} "
|
||||
f"search_settings={search_settings_id}"
|
||||
)
|
||||
raise e
|
||||
|
||||
# special bulletproofing ... truncate long exception messages
|
||||
sanitized_e = type(e)(str(e)[:1024])
|
||||
sanitized_e.__traceback__ = e.__traceback__
|
||||
raise sanitized_e
|
||||
|
||||
finally:
|
||||
if lock.owned():
|
||||
|
@@ -1,5 +1,6 @@
|
||||
import contextvars
|
||||
import copy
|
||||
import itertools
|
||||
import re
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Generator
|
||||
@@ -292,7 +293,9 @@ def filter_channels(
|
||||
if channel not in all_channel_names:
|
||||
raise ValueError(
|
||||
f"Channel '{channel}' not found in workspace. "
|
||||
f"Available channels: {all_channel_names}"
|
||||
f"Available channels (Showing {len(all_channel_names)} of "
|
||||
f"{min(len(all_channel_names), SlackConnector.MAX_CHANNELS_TO_LOG)}): "
|
||||
f"{list(itertools.islice(all_channel_names, SlackConnector.MAX_CHANNELS_TO_LOG))}"
|
||||
)
|
||||
|
||||
return [
|
||||
@@ -513,6 +516,8 @@ class SlackConnector(
|
||||
|
||||
MAX_RETRIES = 7 # arbitrarily selected
|
||||
|
||||
MAX_CHANNELS_TO_LOG = 50
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
channels: list[str] | None = None,
|
||||
|
@@ -178,6 +178,8 @@ def _convert_thread_to_document(
|
||||
|
||||
|
||||
class TeamsConnector(LoadConnector, PollConnector):
|
||||
MAX_CHANNELS_TO_LOG = 50
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
batch_size: int = INDEX_BATCH_SIZE,
|
||||
@@ -298,7 +300,11 @@ class TeamsConnector(LoadConnector, PollConnector):
|
||||
channels = _get_channels_from_teams(
|
||||
teams=teams,
|
||||
)
|
||||
logger.debug(f"Found available channels: {[c.id for c in channels]}")
|
||||
|
||||
logger.debug(
|
||||
f"Found available channels (max {TeamsConnector.MAX_CHANNELS_TO_LOG} shown): "
|
||||
f"{[c.id for c in channels[:TeamsConnector.MAX_CHANNELS_TO_LOG]]}"
|
||||
)
|
||||
if not channels:
|
||||
msg = "No channels found."
|
||||
logger.error(msg)
|
||||
|
@@ -1,5 +1,8 @@
|
||||
import time
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from onyx.configs.app_configs import VESPA_NUM_ATTEMPTS_ON_STARTUP
|
||||
from onyx.configs.constants import KV_REINDEX_KEY
|
||||
from onyx.db.connector_credential_pair import get_connector_credential_pairs
|
||||
from onyx.db.connector_credential_pair import resync_cc_pair
|
||||
@@ -73,13 +76,37 @@ def _perform_index_swap(
|
||||
|
||||
# remove the old index from the vector db
|
||||
document_index = get_default_document_index(secondary_search_settings, None)
|
||||
document_index.ensure_indices_exist(
|
||||
primary_embedding_dim=secondary_search_settings.final_embedding_dim,
|
||||
primary_embedding_precision=secondary_search_settings.embedding_precision,
|
||||
# just finished swap, no more secondary index
|
||||
secondary_index_embedding_dim=None,
|
||||
secondary_index_embedding_precision=None,
|
||||
)
|
||||
|
||||
WAIT_SECONDS = 5
|
||||
|
||||
success = False
|
||||
for x in range(VESPA_NUM_ATTEMPTS_ON_STARTUP):
|
||||
try:
|
||||
logger.notice(
|
||||
f"Vespa index swap (attempt {x+1}/{VESPA_NUM_ATTEMPTS_ON_STARTUP})..."
|
||||
)
|
||||
document_index.ensure_indices_exist(
|
||||
primary_embedding_dim=secondary_search_settings.final_embedding_dim,
|
||||
primary_embedding_precision=secondary_search_settings.embedding_precision,
|
||||
# just finished swap, no more secondary index
|
||||
secondary_index_embedding_dim=None,
|
||||
secondary_index_embedding_precision=None,
|
||||
)
|
||||
|
||||
logger.notice("Vespa index swap complete.")
|
||||
success = True
|
||||
except Exception:
|
||||
logger.exception(
|
||||
f"Vespa index swap did not succeed. The Vespa service may not be ready yet. Retrying in {WAIT_SECONDS} seconds."
|
||||
)
|
||||
time.sleep(WAIT_SECONDS)
|
||||
|
||||
if not success:
|
||||
logger.error(
|
||||
f"Vespa index swap did not succeed. Attempt limit reached. ({VESPA_NUM_ATTEMPTS_ON_STARTUP})"
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
|
||||
def check_and_perform_index_swap(db_session: Session) -> SearchSettings | None:
|
||||
|
Reference in New Issue
Block a user