Heavy task improvements, logging, and validation (#4058)

This commit is contained in:
pablonyx 2025-02-24 13:48:53 -08:00 committed by GitHub
parent b9e79e5db3
commit 7d40676398
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
41 changed files with 667 additions and 152 deletions

View File

@ -4,6 +4,7 @@ from sqlalchemy.orm import Session
from onyx.configs.constants import DocumentSource
from onyx.db.connector_credential_pair import get_connector_credential_pair
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.models import Connector
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import UserGroup__ConnectorCredentialPair
@ -35,10 +36,11 @@ def _delete_connector_credential_pair_user_groups_relationship__no_commit(
def get_cc_pairs_by_source(
db_session: Session,
source_type: DocumentSource,
only_sync: bool,
access_type: AccessType | None = None,
status: ConnectorCredentialPairStatus | None = None,
) -> list[ConnectorCredentialPair]:
"""
Get all cc_pairs for a given source type (and optionally only sync)
Get all cc_pairs for a given source type with optional filtering by access_type and status
result is sorted by cc_pair id
"""
query = (
@ -48,8 +50,11 @@ def get_cc_pairs_by_source(
.order_by(ConnectorCredentialPair.id)
)
if only_sync:
query = query.filter(ConnectorCredentialPair.access_type == AccessType.SYNC)
if access_type is not None:
query = query.filter(ConnectorCredentialPair.access_type == access_type)
if status is not None:
query = query.filter(ConnectorCredentialPair.status == status)
cc_pairs = query.all()
return cc_pairs

View File

@ -62,12 +62,14 @@ def _fetch_permissions_for_permission_ids(
user_email=(owner_email or google_drive_connector.primary_admin_email),
)
# We continue on 404 or 403 because the document may not exist or the user may not have access to it
fetched_permissions = execute_paginated_retrieval(
retrieval_function=drive_service.permissions().list,
list_key="permissions",
fileId=doc_id,
fields="permissions(id, emailAddress, type, domain)",
supportsAllDrives=True,
continue_on_404_or_403=True,
)
permissions_for_doc_id = []
@ -104,7 +106,13 @@ def _get_permissions_from_slim_doc(
user_emails: set[str] = set()
group_emails: set[str] = set()
public = False
skipped_permissions = 0
for permission in permissions_list:
if not permission:
skipped_permissions += 1
continue
permission_type = permission["type"]
if permission_type == "user":
user_emails.add(permission["emailAddress"])
@ -121,6 +129,11 @@ def _get_permissions_from_slim_doc(
elif permission_type == "anyone":
public = True
if skipped_permissions > 0:
logger.warning(
f"Skipped {skipped_permissions} permissions of {len(permissions_list)} for document {slim_doc.id}"
)
drive_id = permission_info.get("drive_id")
group_ids = group_emails | ({drive_id} if drive_id is not None else set())

View File

@ -43,8 +43,10 @@ from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.factory import validate_ccpair_for_user
from onyx.db.connector import mark_cc_pair_as_permissions_synced
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.connector_credential_pair import update_connector_credential_pair
from onyx.db.document import upsert_document_by_connector_credential_pair
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.enums import AccessType
@ -64,6 +66,7 @@ from onyx.redis.redis_pool import get_redis_replica_client
from onyx.redis.redis_pool import redis_lock_dump
from onyx.server.utils import make_short_id
from onyx.utils.logger import doc_permission_sync_ctx
from onyx.utils.logger import format_error_for_logging
from onyx.utils.logger import LoggerContextVars
from onyx.utils.logger import setup_logger
@ -194,12 +197,19 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
monitor_ccpair_permissions_taskset(
tenant_id, key_bytes, r, db_session
)
task_logger.info(f"check_for_doc_permissions_sync finished: tenant={tenant_id}")
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(
f"Unexpected check_for_doc_permissions_sync exception: tenant={tenant_id} {error_msg}"
)
task_logger.exception(
f"Unexpected check_for_doc_permissions_sync exception: tenant={tenant_id}"
)
finally:
if lock_beat.owned():
lock_beat.release()
@ -283,13 +293,19 @@ def try_creating_permissions_sync_task(
redis_connector.permissions.set_fence(payload)
payload_id = payload.id
except Exception:
task_logger.exception(f"Unexpected exception: cc_pair={cc_pair_id}")
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(
f"Unexpected try_creating_permissions_sync_task exception: cc_pair={cc_pair_id} {error_msg}"
)
return None
finally:
if lock.owned():
lock.release()
task_logger.info(
f"try_creating_permissions_sync_task finished: cc_pair={cc_pair_id} payload_id={payload_id}"
)
return payload_id
@ -389,6 +405,30 @@ def connector_permission_sync_generator_task(
f"No connector credential pair found for id: {cc_pair_id}"
)
try:
created = validate_ccpair_for_user(
cc_pair.connector.id,
cc_pair.credential.id,
db_session,
tenant_id,
enforce_creation=False,
)
if not created:
task_logger.warning(
f"Unable to create connector credential pair for id: {cc_pair_id}"
)
except Exception:
task_logger.exception(
f"validate_ccpair_permissions_sync exceptioned: cc_pair={cc_pair_id}"
)
update_connector_credential_pair(
db_session=db_session,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
status=ConnectorCredentialPairStatus.INVALID,
)
raise
source_type = cc_pair.connector.source
doc_sync_func = DOC_PERMISSIONS_FUNC_MAP.get(source_type)
@ -440,6 +480,10 @@ def connector_permission_sync_generator_task(
redis_connector.permissions.generator_complete = tasks_generated
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(
f"Permission sync exceptioned: cc_pair={cc_pair_id} payload_id={payload_id} {error_msg}"
)
task_logger.exception(
f"Permission sync exceptioned: cc_pair={cc_pair_id} payload_id={payload_id}"
)
@ -516,7 +560,11 @@ def update_external_document_permissions_task(
)
completion_status = OnyxCeleryTaskCompletionStatus.SUCCEEDED
except Exception:
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(
f"Exception in update_external_document_permissions_task: connector_id={connector_id} doc_id={doc_id} {error_msg}"
)
task_logger.exception(
f"update_external_document_permissions_task exceptioned: "
f"connector_id={connector_id} doc_id={doc_id}"
@ -530,6 +578,9 @@ def update_external_document_permissions_task(
if completion_status != OnyxCeleryTaskCompletionStatus.SUCCEEDED:
return False
task_logger.info(
f"update_external_document_permissions_task finished: connector_id={connector_id} doc_id={doc_id}"
)
return True

View File

@ -37,8 +37,11 @@ from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.factory import validate_ccpair_for_user
from onyx.db.connector import mark_cc_pair_as_external_group_synced
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.connector_credential_pair import update_connector_credential_pair
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
@ -55,6 +58,7 @@ from onyx.redis.redis_connector_ext_group_sync import (
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.server.utils import make_short_id
from onyx.utils.logger import format_error_for_logging
from onyx.utils.logger import setup_logger
logger = setup_logger()
@ -148,7 +152,10 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool
for source in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC:
# These are ordered by cc_pair id so the first one is the one we want
cc_pairs_to_dedupe = get_cc_pairs_by_source(
db_session, source, only_sync=True
db_session,
source,
access_type=AccessType.SYNC,
status=ConnectorCredentialPairStatus.ACTIVE,
)
# We only want to sync one cc_pair per source type
# in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC so we dedupe here
@ -195,12 +202,17 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(
f"Unexpected check_for_external_group_sync exception: tenant={tenant_id} {error_msg}"
)
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
finally:
if lock_beat.owned():
lock_beat.release()
task_logger.info(f"check_for_external_group_sync finished: tenant={tenant_id}")
return True
@ -267,12 +279,19 @@ def try_creating_external_group_sync_task(
redis_connector.external_group_sync.set_fence(payload)
payload_id = payload.id
except Exception:
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(
f"Unexpected try_creating_external_group_sync_task exception: cc_pair={cc_pair_id} {error_msg}"
)
task_logger.exception(
f"Unexpected exception while trying to create external group sync task: cc_pair={cc_pair_id}"
)
return None
task_logger.info(
f"try_creating_external_group_sync_task finished: cc_pair={cc_pair_id} payload_id={payload_id}"
)
return payload_id
@ -368,6 +387,30 @@ def connector_external_group_sync_generator_task(
f"No connector credential pair found for id: {cc_pair_id}"
)
try:
created = validate_ccpair_for_user(
cc_pair.connector.id,
cc_pair.credential.id,
db_session,
tenant_id,
enforce_creation=False,
)
if not created:
task_logger.warning(
f"Unable to create connector credential pair for id: {cc_pair_id}"
)
except Exception:
task_logger.exception(
f"validate_ccpair_permissions_sync exceptioned: cc_pair={cc_pair_id}"
)
update_connector_credential_pair(
db_session=db_session,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
status=ConnectorCredentialPairStatus.INVALID,
)
raise
source_type = cc_pair.connector.source
ext_group_sync_func = GROUP_PERMISSIONS_FUNC_MAP.get(source_type)
@ -379,8 +422,18 @@ def connector_external_group_sync_generator_task(
logger.info(
f"Syncing external groups for {source_type} for cc_pair: {cc_pair_id}"
)
external_user_groups: list[ExternalUserGroup] = ext_group_sync_func(cc_pair)
external_user_groups: list[ExternalUserGroup] = []
try:
external_user_groups = ext_group_sync_func(cc_pair)
except ConnectorValidationError as e:
msg = f"Error syncing external groups for {source_type} for cc_pair: {cc_pair_id} {e}"
update_connector_credential_pair(
db_session=db_session,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
status=ConnectorCredentialPairStatus.INVALID,
)
raise e
logger.info(
f"Syncing {len(external_user_groups)} external user groups for {source_type}"
@ -406,6 +459,14 @@ def connector_external_group_sync_generator_task(
sync_status=SyncStatus.SUCCESS,
)
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(
f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id} {error_msg}"
)
task_logger.exception(
f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}"
)
msg = f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}"
task_logger.exception(msg)
emit_background_error(msg + f"\n\n{e}", cc_pair_id=cc_pair_id)

View File

@ -48,7 +48,7 @@ from onyx.configs.constants import OnyxCeleryTask
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.db.connector import mark_ccpair_with_indexing_trigger
from onyx.db.connector_credential_pair import fetch_connector_credential_pairs
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
@ -927,6 +927,7 @@ def connector_indexing_proxy_task(
task_logger.error("self.request.id is None!")
client = SimpleJobClient()
task_logger.info(f"submitting connector_indexing_task with tenant_id={tenant_id}")
job = client.submit(
connector_indexing_task,
@ -1055,6 +1056,7 @@ def connector_indexing_proxy_task(
if not index_attempt.is_finished():
continue
except Exception:
task_logger.exception(
log_builder.build(
@ -1062,6 +1064,7 @@ def connector_indexing_proxy_task(
)
)
continue
except Exception as e:
result.status = IndexingWatchdogTerminalStatus.WATCHDOG_EXCEPTIONED
if isinstance(e, ConnectorValidationError):

View File

@ -55,6 +55,7 @@ from onyx.redis.redis_connector_prune import RedisConnectorPrunePayload
from onyx.redis.redis_pool import get_redis_client
from onyx.redis.redis_pool import get_redis_replica_client
from onyx.server.utils import make_short_id
from onyx.utils.logger import format_error_for_logging
from onyx.utils.logger import LoggerContextVars
from onyx.utils.logger import pruning_ctx
from onyx.utils.logger import setup_logger
@ -194,12 +195,14 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(f"Unexpected pruning check exception: {error_msg}")
task_logger.exception("Unexpected exception during pruning check")
finally:
if lock_beat.owned():
lock_beat.release()
task_logger.info(f"check_for_pruning finished: tenant={tenant_id}")
return True
@ -301,13 +304,19 @@ def try_creating_prune_generator_task(
redis_connector.prune.set_fence(payload)
payload_id = payload.id
except Exception:
except Exception as e:
error_msg = format_error_for_logging(e)
task_logger.warning(
f"Unexpected try_creating_prune_generator_task exception: cc_pair={cc_pair.id} {error_msg}"
)
task_logger.exception(f"Unexpected exception: cc_pair={cc_pair.id}")
return None
finally:
if lock.owned():
lock.release()
task_logger.info(
f"try_creating_prune_generator_task finished: cc_pair={cc_pair.id} payload_id={payload_id}"
)
return payload_id

View File

@ -265,6 +265,7 @@ def document_by_cc_pair_cleanup_task(
if completion_status != OnyxCeleryTaskCompletionStatus.SUCCEEDED:
return False
task_logger.info(f"document_by_cc_pair_cleanup_task finished: doc={document_id}")
return True

View File

@ -1,3 +1,5 @@
from sqlalchemy.exc import IntegrityError
from onyx.db.background_error import create_background_error
from onyx.db.engine import get_session_with_current_tenant
@ -10,4 +12,9 @@ def emit_background_error(
In the future, could create notifications based on the severity."""
with get_session_with_current_tenant() as db_session:
create_background_error(db_session, message, cc_pair_id)
try:
create_background_error(db_session, message, cc_pair_id)
except IntegrityError as e:
# Log an error if the cc_pair_id was deleted or any other exception occurs
error_message = f"Failed to create background error: {str(e)}. Original message: {message}"
create_background_error(db_session, error_message, None)

View File

@ -17,6 +17,9 @@ from typing import Optional
from onyx.configs.constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME
from onyx.db.engine import SqlEngine
from onyx.utils.logger import setup_logger
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
from shared_configs.configs import TENANT_ID_PREFIX
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
logger = setup_logger()
@ -54,6 +57,15 @@ def _initializer(
kwargs = {}
logger.info("Initializing spawned worker child process.")
# 1. Get tenant_id from args or fallback to default
tenant_id = POSTGRES_DEFAULT_SCHEMA
for arg in reversed(args):
if isinstance(arg, str) and arg.startswith(TENANT_ID_PREFIX):
tenant_id = arg
break
# 2. Set the tenant context before running anything
token = CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
# Reset the engine in the child process
SqlEngine.reset_engine()
@ -81,6 +93,8 @@ def _initializer(
queue.put(error_msg) # Send the exception to the parent process
sys.exit(255) # use 255 to indicate a generic exception
finally:
CURRENT_TENANT_ID_CONTEXTVAR.reset(token)
def _run_in_process(

View File

@ -21,8 +21,8 @@ from onyx.configs.app_configs import POLL_CONNECTOR_OFFSET
from onyx.configs.constants import DocumentSource
from onyx.configs.constants import MilestoneRecordType
from onyx.connectors.connector_runner import ConnectorRunner
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.factory import instantiate_connector
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import Document

View File

@ -158,7 +158,7 @@ POSTGRES_USER = os.environ.get("POSTGRES_USER") or "postgres"
POSTGRES_PASSWORD = urllib.parse.quote_plus(
os.environ.get("POSTGRES_PASSWORD") or "password"
)
POSTGRES_HOST = os.environ.get("POSTGRES_HOST") or "127.0.0.1"
POSTGRES_HOST = os.environ.get("POSTGRES_HOST") or "localhost"
POSTGRES_PORT = os.environ.get("POSTGRES_PORT") or "5432"
POSTGRES_DB = os.environ.get("POSTGRES_DB") or "postgres"
AWS_REGION_NAME = os.environ.get("AWS_REGION_NAME") or "us-east-2"

View File

@ -7,11 +7,18 @@ from typing import Optional
import boto3 # type: ignore
from botocore.client import Config # type: ignore
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError
from botocore.exceptions import PartialCredentialsError
from mypy_boto3_s3 import S3Client # type: ignore
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import BlobType
from onyx.configs.constants import DocumentSource
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
@ -240,6 +247,73 @@ class BlobStorageConnector(LoadConnector, PollConnector):
return None
def validate_connector_settings(self) -> None:
if self.s3_client is None:
raise ConnectorMissingCredentialError(
"Blob storage credentials not loaded."
)
if not self.bucket_name:
raise ConnectorValidationError(
"No bucket name was provided in connector settings."
)
try:
# We only fetch one object/page as a light-weight validation step.
# This ensures we trigger typical S3 permission checks (ListObjectsV2, etc.).
self.s3_client.list_objects_v2(
Bucket=self.bucket_name, Prefix=self.prefix, MaxKeys=1
)
except NoCredentialsError:
raise ConnectorMissingCredentialError(
"No valid blob storage credentials found or provided to boto3."
)
except PartialCredentialsError:
raise ConnectorMissingCredentialError(
"Partial or incomplete blob storage credentials provided to boto3."
)
except ClientError as e:
error_code = e.response["Error"].get("Code", "")
status_code = e.response["ResponseMetadata"].get("HTTPStatusCode")
# Most common S3 error cases
if error_code in [
"AccessDenied",
"InvalidAccessKeyId",
"SignatureDoesNotMatch",
]:
if status_code == 403 or error_code == "AccessDenied":
raise InsufficientPermissionsError(
f"Insufficient permissions to list objects in bucket '{self.bucket_name}'. "
"Please check your bucket policy and/or IAM policy."
)
if status_code == 401 or error_code == "SignatureDoesNotMatch":
raise CredentialExpiredError(
"Provided blob storage credentials appear invalid or expired."
)
raise CredentialExpiredError(
f"Credential issue encountered ({error_code})."
)
if error_code == "NoSuchBucket" or status_code == 404:
raise ConnectorValidationError(
f"Bucket '{self.bucket_name}' does not exist or cannot be found."
)
raise ConnectorValidationError(
f"Unexpected S3 client error (code={error_code}, status={status_code}): {e}"
)
except Exception as e:
# Catch-all for anything not captured by the above
# Since we are unsure of the error and it may not disable the connector,
# raise an unexpected error (does not disable connector)
raise UnexpectedError(
f"Unexpected error during blob storage settings validation: {e}"
)
if __name__ == "__main__":
credentials_dict = {

View File

@ -9,10 +9,10 @@ from onyx.configs.constants import DocumentSource
from onyx.connectors.bookstack.client import BookStackApiClient
from onyx.connectors.bookstack.client import BookStackClientRequestFailedError
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import CredentialExpiredError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import InsufficientPermissionsError
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch

View File

@ -4,6 +4,8 @@ from datetime import timezone
from typing import Any
from urllib.parse import quote
from requests.exceptions import HTTPError
from onyx.configs.app_configs import CONFLUENCE_CONNECTOR_LABELS_TO_SKIP
from onyx.configs.app_configs import CONFLUENCE_TIMEZONE_OFFSET
from onyx.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
@ -16,6 +18,10 @@ from onyx.connectors.confluence.utils import build_confluence_document_id
from onyx.connectors.confluence.utils import datetime_from_string
from onyx.connectors.confluence.utils import extract_text_from_confluence_html
from onyx.connectors.confluence.utils import validate_attachment_filetype
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import LoadConnector
@ -397,3 +403,33 @@ class ConfluenceConnector(LoadConnector, PollConnector, SlimConnector):
callback.progress("retrieve_all_slim_documents", 1)
yield doc_metadata_list
def validate_connector_settings(self) -> None:
if self._confluence_client is None:
raise ConnectorMissingCredentialError("Confluence credentials not loaded.")
try:
spaces = self._confluence_client.get_all_spaces(limit=1)
except HTTPError as e:
status_code = e.response.status_code if e.response else None
if status_code == 401:
raise CredentialExpiredError(
"Invalid or expired Confluence credentials (HTTP 401)."
)
elif status_code == 403:
raise InsufficientPermissionsError(
"Insufficient permissions to access Confluence resources (HTTP 403)."
)
raise UnexpectedError(
f"Unexpected Confluence error (status={status_code}): {e}"
)
except Exception as e:
raise UnexpectedError(
f"Unexpected error while validating Confluence settings: {e}"
)
if not spaces or not spaces.get("results"):
raise ConnectorValidationError(
"No Confluence spaces found. Either your credentials lack permissions, or "
"there truly are no spaces in this Confluence instance."
)

View File

@ -11,6 +11,7 @@ from atlassian import Confluence # type:ignore
from pydantic import BaseModel
from requests import HTTPError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.utils.logger import setup_logger
logger = setup_logger()
@ -508,11 +509,15 @@ def build_confluence_client(
is_cloud: bool,
wiki_base: str,
) -> OnyxConfluence:
_validate_connector_configuration(
credentials=credentials,
is_cloud=is_cloud,
wiki_base=wiki_base,
)
try:
_validate_connector_configuration(
credentials=credentials,
is_cloud=is_cloud,
wiki_base=wiki_base,
)
except Exception as e:
raise ConnectorValidationError(str(e))
return OnyxConfluence(
api_version="cloud" if is_cloud else "latest",
# Remove trailing slash from wiki_base if present

View File

@ -10,10 +10,10 @@ from dropbox.files import FolderMetadata # type:ignore
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import CredentialInvalidError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialInvalidError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import InsufficientPermissionsError
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch

View File

@ -0,0 +1,49 @@
class ValidationError(Exception):
"""General exception for validation errors."""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
class ConnectorValidationError(ValidationError):
"""General exception for connector validation errors."""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
class UnexpectedError(ValidationError):
"""Raised when an unexpected error occurs during connector validation.
Unexpected errors don't necessarily mean the credential is invalid,
but rather that there was an error during the validation process
or we encountered a currently unhandled error case.
"""
def __init__(self, message: str = "Unexpected error during connector validation"):
super().__init__(message)
class CredentialInvalidError(ConnectorValidationError):
"""Raised when a connector's credential is invalid."""
def __init__(self, message: str = "Credential is invalid"):
super().__init__(message)
class CredentialExpiredError(ConnectorValidationError):
"""Raised when a connector's credential is expired."""
def __init__(self, message: str = "Credential has expired"):
super().__init__(message)
class InsufficientPermissionsError(ConnectorValidationError):
"""Raised when the credential does not have sufficient API permissions."""
def __init__(
self, message: str = "Insufficient permissions for the requested operation"
):
super().__init__(message)

View File

@ -18,6 +18,7 @@ from onyx.connectors.discourse.connector import DiscourseConnector
from onyx.connectors.document360.connector import Document360Connector
from onyx.connectors.dropbox.connector import DropboxConnector
from onyx.connectors.egnyte.connector import EgnyteConnector
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.file.connector import LocalFileConnector
from onyx.connectors.fireflies.connector import FirefliesConnector
from onyx.connectors.freshdesk.connector import FreshdeskConnector
@ -32,7 +33,6 @@ from onyx.connectors.guru.connector import GuruConnector
from onyx.connectors.hubspot.connector import HubSpotConnector
from onyx.connectors.interfaces import BaseConnector
from onyx.connectors.interfaces import CheckpointConnector
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import EventConnector
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
@ -56,9 +56,8 @@ from onyx.connectors.zendesk.connector import ZendeskConnector
from onyx.connectors.zulip.connector import ZulipConnector
from onyx.db.connector import fetch_connector_by_id
from onyx.db.credentials import backend_update_credential_json
from onyx.db.credentials import fetch_credential_by_id_for_user
from onyx.db.credentials import fetch_credential_by_id
from onyx.db.models import Credential
from onyx.db.models import User
class ConnectorMissingException(Exception):
@ -185,19 +184,17 @@ def validate_ccpair_for_user(
connector_id: int,
credential_id: int,
db_session: Session,
user: User | None,
tenant_id: str | None,
) -> None:
enforce_creation: bool = True,
) -> bool:
if INTEGRATION_TESTS_MODE:
return
return True
# Validate the connector settings
connector = fetch_connector_by_id(connector_id, db_session)
credential = fetch_credential_by_id_for_user(
credential = fetch_credential_by_id(
credential_id,
user,
db_session,
get_editable=False,
)
if not connector:
@ -207,7 +204,7 @@ def validate_ccpair_for_user(
connector.source == DocumentSource.INGESTION_API
or connector.source == DocumentSource.MOCK_CONNECTOR
):
return
return True
if not credential:
raise ValueError("Credential not found")
@ -221,7 +218,13 @@ def validate_ccpair_for_user(
credential=credential,
tenant_id=tenant_id,
)
except ConnectorValidationError as e:
raise e
except Exception as e:
raise ConnectorValidationError(str(e))
if enforce_creation:
raise ConnectorValidationError(str(e))
else:
return False
runnable_connector.validate_connector_settings()
return True

View File

@ -17,14 +17,14 @@ from github.PullRequest import PullRequest
from onyx.configs.app_configs import GITHUB_CONNECTOR_BASE_URL
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import CredentialExpiredError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import InsufficientPermissionsError
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import UnexpectedError
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import Section

View File

@ -305,6 +305,7 @@ class GmailConnector(LoadConnector, PollConnector, SlimConnector):
userId=user_email,
fields=THREAD_FIELDS,
id=thread["id"],
continue_on_404_or_403=True,
)
# full_threads is an iterator containing a single thread
# so we need to convert it to a list and grab the first element
@ -336,6 +337,7 @@ class GmailConnector(LoadConnector, PollConnector, SlimConnector):
userId=user_email,
fields=THREAD_LIST_FIELDS,
q=query,
continue_on_404_or_403=True,
):
doc_batch.append(
SlimDocument(

View File

@ -13,6 +13,9 @@ from googleapiclient.errors import HttpError # type: ignore
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.app_configs import MAX_FILE_SIZE_BYTES
from onyx.configs.constants import DocumentSource
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.google_drive.doc_conversion import build_slim_document
from onyx.connectors.google_drive.doc_conversion import (
convert_drive_item_to_document,
@ -42,6 +45,7 @@ from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import retry_builder
@ -137,7 +141,7 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector):
"Please visit the docs for help with the new setup: "
f"{SCOPE_DOC_URL}"
)
raise ValueError(
raise ConnectorValidationError(
"Google Drive connector received old input parameters. "
"Please visit the docs for help with the new setup: "
f"{SCOPE_DOC_URL}"
@ -151,7 +155,7 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector):
and not my_drive_emails
and not shared_drive_urls
):
raise ValueError(
raise ConnectorValidationError(
"Nothing to index. Please specify at least one of the following: "
"include_shared_drives, include_my_drives, include_files_shared_with_me, "
"shared_folder_urls, or my_drive_emails"
@ -609,3 +613,50 @@ class GoogleDriveConnector(LoadConnector, PollConnector, SlimConnector):
if MISSING_SCOPES_ERROR_STR in str(e):
raise PermissionError(ONYX_SCOPE_INSTRUCTIONS) from e
raise e
def validate_connector_settings(self) -> None:
if self._creds is None:
raise ConnectorMissingCredentialError(
"Google Drive credentials not loaded."
)
if self._primary_admin_email is None:
raise ConnectorValidationError(
"Primary admin email not found in credentials. "
"Ensure DB_CREDENTIALS_PRIMARY_ADMIN_KEY is set."
)
try:
drive_service = get_drive_service(self._creds, self._primary_admin_email)
drive_service.files().list(pageSize=1, fields="files(id)").execute()
if isinstance(self._creds, ServiceAccountCredentials):
retry_builder()(get_root_folder_id)(drive_service)
except HttpError as e:
status_code = e.resp.status if e.resp else None
if status_code == 401:
raise CredentialExpiredError(
"Invalid or expired Google Drive credentials (401)."
)
elif status_code == 403:
raise InsufficientPermissionsError(
"Google Drive app lacks required permissions (403). "
"Please ensure the necessary scopes are granted and Drive "
"apps are enabled."
)
else:
raise ConnectorValidationError(
f"Unexpected Google Drive error (status={status_code}): {e}"
)
except Exception as e:
# Check for scope-related hints from the error message
if MISSING_SCOPES_ERROR_STR in str(e):
raise InsufficientPermissionsError(
"Google Drive credentials are missing required scopes. "
f"{ONYX_SCOPE_INSTRUCTIONS}"
)
raise ConnectorValidationError(
f"Unexpected error during Google Drive validation: {e}"
)

View File

@ -87,16 +87,18 @@ class HubSpotConnector(LoadConnector, PollConnector):
contact = api_client.crm.contacts.basic_api.get_by_id(
contact_id=contact.id
)
associated_emails.append(contact.properties["email"])
email = contact.properties.get("email")
if email is not None:
associated_emails.append(email)
if notes:
for note in notes.results:
note = api_client.crm.objects.notes.basic_api.get_by_id(
note_id=note.id, properties=["content", "hs_body_preview"]
)
if note.properties["hs_body_preview"] is None:
continue
associated_notes.append(note.properties["hs_body_preview"])
preview = note.properties.get("hs_body_preview")
if preview is not None:
associated_notes.append(preview)
associated_emails_str = " ,".join(associated_emails)
associated_notes_str = " ".join(associated_notes)

View File

@ -146,46 +146,3 @@ class CheckpointConnector(BaseConnector):
```
"""
raise NotImplementedError
class ConnectorValidationError(Exception):
"""General exception for connector validation errors."""
def __init__(self, message: str):
self.message = message
super().__init__(self.message)
class UnexpectedError(Exception):
"""Raised when an unexpected error occurs during connector validation.
Unexpected errors don't necessarily mean the credential is invalid,
but rather that there was an error during the validation process
or we encountered a currently unhandled error case.
"""
def __init__(self, message: str = "Unexpected error during connector validation"):
super().__init__(message)
class CredentialInvalidError(ConnectorValidationError):
"""Raised when a connector's credential is invalid."""
def __init__(self, message: str = "Credential is invalid"):
super().__init__(message)
class CredentialExpiredError(ConnectorValidationError):
"""Raised when a connector's credential is expired."""
def __init__(self, message: str = "Credential has expired"):
super().__init__(message)
class InsufficientPermissionsError(ConnectorValidationError):
"""Raised when the credential does not have sufficient API permissions."""
def __init__(
self, message: str = "Insufficient permissions for the requested operation"
):
super().__init__(message)

View File

@ -16,10 +16,11 @@ from onyx.configs.constants import DocumentSource
from onyx.connectors.cross_connector_utils.rate_limit_wrapper import (
rl_requests,
)
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import CredentialExpiredError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import InsufficientPermissionsError
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
@ -670,12 +671,12 @@ class NotionConnector(LoadConnector, PollConnector):
"Please try again later."
)
else:
raise Exception(
raise UnexpectedError(
f"Unexpected Notion HTTP error (status={status_code}): {http_err}"
) from http_err
except Exception as exc:
raise Exception(
raise UnexpectedError(
f"Unexpected error during Notion settings validation: {exc}"
)

View File

@ -12,11 +12,11 @@ from onyx.configs.app_configs import JIRA_CONNECTOR_LABELS_TO_SKIP
from onyx.configs.app_configs import JIRA_CONNECTOR_MAX_TICKET_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import CredentialExpiredError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import InsufficientPermissionsError
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch

View File

@ -18,6 +18,10 @@ from slack_sdk.errors import SlackApiError
from onyx.configs.app_configs import ENABLE_EXPENSIVE_EXPERT_CALLS
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedError
from onyx.connectors.interfaces import CheckpointConnector
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
@ -82,14 +86,14 @@ def get_channels(
get_public: bool = True,
get_private: bool = True,
) -> list[ChannelType]:
"""Get all channels in the workspace"""
"""Get all channels in the workspace."""
channels: list[dict[str, Any]] = []
channel_types = []
if get_public:
channel_types.append("public_channel")
if get_private:
channel_types.append("private_channel")
# try getting private channels as well at first
# Try fetching both public and private channels first:
try:
channels = _collect_paginated_channels(
client=client,
@ -97,19 +101,19 @@ def get_channels(
channel_types=channel_types,
)
except SlackApiError as e:
logger.info(f"Unable to fetch private channels due to - {e}")
logger.info("trying again without private channels")
logger.info(
f"Unable to fetch private channels due to: {e}. Trying again without private channels."
)
if get_public:
channel_types = ["public_channel"]
else:
logger.warning("No channels to fetch")
logger.warning("No channels to fetch.")
return []
channels = _collect_paginated_channels(
client=client,
exclude_archived=exclude_archived,
channel_types=channel_types,
)
return channels
@ -666,6 +670,86 @@ class SlackConnector(SlimConnector, CheckpointConnector):
)
return checkpoint
def validate_connector_settings(self) -> None:
"""
1. Verify the bot token is valid for the workspace (via auth_test).
2. Ensure the bot has enough scope to list channels.
3. Check that every channel specified in self.channels exists.
"""
if self.client is None:
raise ConnectorMissingCredentialError("Slack credentials not loaded.")
try:
# 1) Validate connection to workspace
auth_response = self.client.auth_test()
if not auth_response.get("ok", False):
error_msg = auth_response.get(
"error", "Unknown error from Slack auth_test"
)
raise ConnectorValidationError(f"Failed Slack auth_test: {error_msg}")
# 2) Minimal test to confirm listing channels works
test_resp = self.client.conversations_list(
limit=1, types=["public_channel"]
)
if not test_resp.get("ok", False):
error_msg = test_resp.get("error", "Unknown error from Slack")
if error_msg == "invalid_auth":
raise ConnectorValidationError(
f"Invalid Slack bot token ({error_msg})."
)
elif error_msg == "not_authed":
raise CredentialExpiredError(
f"Invalid or expired Slack bot token ({error_msg})."
)
raise UnexpectedError(f"Slack API returned a failure: {error_msg}")
# 3) If channels are specified, verify each is accessible
if self.channels:
accessible_channels = get_channels(
client=self.client,
exclude_archived=True,
get_public=True,
get_private=True,
)
# For quick lookups by name or ID, build a map:
accessible_channel_names = {ch["name"] for ch in accessible_channels}
accessible_channel_ids = {ch["id"] for ch in accessible_channels}
for user_channel in self.channels:
if (
user_channel not in accessible_channel_names
and user_channel not in accessible_channel_ids
):
raise ConnectorValidationError(
f"Channel '{user_channel}' not found or inaccessible in this workspace."
)
except SlackApiError as e:
slack_error = e.response.get("error", "")
if slack_error == "missing_scope":
raise InsufficientPermissionsError(
"Slack bot token lacks the necessary scope to list/access channels. "
"Please ensure your Slack app has 'channels:read' (and/or 'groups:read' for private channels)."
)
elif slack_error == "invalid_auth":
raise CredentialExpiredError(
f"Invalid Slack bot token ({slack_error})."
)
elif slack_error == "not_authed":
raise CredentialExpiredError(
f"Invalid or expired Slack bot token ({slack_error})."
)
raise UnexpectedError(
f"Unexpected Slack error '{slack_error}' during settings validation."
)
except ConnectorValidationError as e:
raise e
except Exception as e:
raise UnexpectedError(
f"Unexpected error during Slack settings validation: {e}"
)
if __name__ == "__main__":
import os

View File

@ -5,6 +5,7 @@ from typing import Any
import msal # type: ignore
from office365.graph_client import GraphClient # type: ignore
from office365.runtime.client_request_exception import ClientRequestException # type: ignore
from office365.teams.channels.channel import Channel # type: ignore
from office365.teams.chats.messages.message import ChatMessage # type: ignore
from office365.teams.team import Team # type: ignore
@ -12,6 +13,10 @@ from office365.teams.team import Team # type: ignore
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
@ -279,6 +284,50 @@ class TeamsConnector(LoadConnector, PollConnector):
end_datetime = datetime.fromtimestamp(end, timezone.utc)
return self._fetch_from_teams(start=start_datetime, end=end_datetime)
def validate_connector_settings(self) -> None:
if self.graph_client is None:
raise ConnectorMissingCredentialError("Teams credentials not loaded.")
try:
# Minimal call to confirm we can retrieve Teams
found_teams = self._get_all_teams()
except ClientRequestException as e:
status_code = e.response.status_code
if status_code == 401:
raise CredentialExpiredError(
"Invalid or expired Microsoft Teams credentials (401 Unauthorized)."
)
elif status_code == 403:
raise InsufficientPermissionsError(
"Your app lacks sufficient permissions to read Teams (403 Forbidden)."
)
raise UnexpectedError(f"Unexpected error retrieving teams: {e}")
except Exception as e:
error_str = str(e).lower()
if (
"unauthorized" in error_str
or "401" in error_str
or "invalid_grant" in error_str
):
raise CredentialExpiredError(
"Invalid or expired Microsoft Teams credentials."
)
elif "forbidden" in error_str or "403" in error_str:
raise InsufficientPermissionsError(
"App lacks required permissions to read from Microsoft Teams."
)
raise ConnectorValidationError(
f"Unexpected error during Teams validation: {e}"
)
if not found_teams:
raise ConnectorValidationError(
"No Teams found for the given credentials. "
"Either there are no Teams in this tenant, or your app does not have permission to view them."
)
if __name__ == "__main__":
connector = TeamsConnector(teams=os.environ["TEAMS"].split(","))

View File

@ -25,12 +25,12 @@ from onyx.configs.app_configs import WEB_CONNECTOR_OAUTH_CLIENT_SECRET
from onyx.configs.app_configs import WEB_CONNECTOR_OAUTH_TOKEN_URL
from onyx.configs.app_configs import WEB_CONNECTOR_VALIDATE_URLS
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import CredentialExpiredError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.exceptions import CredentialExpiredError
from onyx.connectors.exceptions import InsufficientPermissionsError
from onyx.connectors.exceptions import UnexpectedError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import InsufficientPermissionsError
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import UnexpectedError
from onyx.connectors.models import Document
from onyx.connectors.models import Section
from onyx.file_processing.extract_file_text import read_pdf_file
@ -440,7 +440,10 @@ class WebConnector(LoadConnector):
"No URL configured. Please provide at least one valid URL."
)
if self.web_connector_type == WEB_CONNECTOR_VALID_SETTINGS.SITEMAP.value:
if (
self.web_connector_type == WEB_CONNECTOR_VALID_SETTINGS.SITEMAP.value
or self.web_connector_type == WEB_CONNECTOR_VALID_SETTINGS.RECURSIVE.value
):
return None
# We'll just test the first URL for connectivity and correctness

View File

@ -401,8 +401,8 @@ def add_credential_to_connector(
# If we are in the seeding flow, we shouldn't need to check if the credential belongs to the user
if seeding_flow:
credential = fetch_credential_by_id(
db_session=db_session,
credential_id=credential_id,
db_session=db_session,
)
else:
credential = fetch_credential_by_id_for_user(

View File

@ -169,8 +169,8 @@ def fetch_credential_by_id_for_user(
def fetch_credential_by_id(
db_session: Session,
credential_id: int,
db_session: Session,
) -> Credential | None:
stmt = select(Credential).distinct()
stmt = stmt.where(Credential.id == credential_id)
@ -422,8 +422,8 @@ def create_initial_public_credential(db_session: Session) -> None:
"There must exist an empty public credential for data connectors that do not require additional Auth."
)
first_credential = fetch_credential_by_id(
db_session=db_session,
credential_id=PUBLIC_CREDENTIAL_ID,
db_session=db_session,
)
if first_credential is not None:

View File

@ -25,8 +25,8 @@ from onyx.background.celery.versioned_apps.primary import app as primary_app
from onyx.background.indexing.models import IndexAttemptErrorPydantic
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
from onyx.connectors.exceptions import ValidationError
from onyx.connectors.factory import validate_ccpair_for_user
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.db.connector import delete_connector
from onyx.db.connector_credential_pair import add_credential_to_connector
from onyx.db.connector_credential_pair import (
@ -620,9 +620,7 @@ def associate_credential_to_connector(
)
try:
validate_ccpair_for_user(
connector_id, credential_id, db_session, user, tenant_id
)
validate_ccpair_for_user(connector_id, credential_id, db_session, tenant_id)
response = add_credential_to_connector(
db_session=db_session,
@ -649,7 +647,7 @@ def associate_credential_to_connector(
return response
except ConnectorValidationError as e:
except ValidationError as e:
# If validation fails, delete the connector and commit the changes
# Ensures we don't leave invalid connectors in the database
# NOTE: consensus is that it makes sense to unify connector and ccpair creation flows
@ -660,7 +658,6 @@ def associate_credential_to_connector(
raise HTTPException(
status_code=400, detail="Connector validation error: " + str(e)
)
except IntegrityError as e:
logger.error(f"IntegrityError: {e}")
raise HTTPException(status_code=400, detail="Name must be unique")

View File

@ -28,6 +28,7 @@ from onyx.configs.constants import FileOrigin
from onyx.configs.constants import MilestoneRecordType
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.factory import validate_ccpair_for_user
from onyx.connectors.google_utils.google_auth import (
get_google_oauth_creds,
@ -62,7 +63,6 @@ from onyx.connectors.google_utils.shared_constants import DB_CREDENTIALS_DICT_TO
from onyx.connectors.google_utils.shared_constants import (
GoogleOAuthAuthenticationMethod,
)
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.db.connector import create_connector
from onyx.db.connector import delete_connector
from onyx.db.connector import fetch_connector_by_id
@ -854,7 +854,6 @@ def create_connector_with_mock_credential(
connector_id=connector_id,
credential_id=credential_id,
db_session=db_session,
user=user,
tenant_id=tenant_id,
)
response = add_credential_to_connector(

View File

@ -106,7 +106,6 @@ def swap_credentials_for_connector(
credential_swap_req.connector_id,
credential_swap_req.new_credential_id,
db_session,
user,
tenant_id,
)

View File

@ -253,3 +253,8 @@ def print_loggers() -> None:
print(f" Propagate: {logger.propagate}")
print()
def format_error_for_logging(e: Exception) -> str:
"""Clean error message by removing newlines for better logging."""
return str(e).replace("\n", " ")

View File

@ -508,6 +508,7 @@ def get_number_of_chunks_we_think_exist(
class VespaDebugging:
# Class for managing Vespa debugging actions.
def __init__(self, tenant_id: str | None = None):
CURRENT_TENANT_ID_CONTEXTVAR.set(tenant_id)
self.tenant_id = POSTGRES_DEFAULT_SCHEMA if not tenant_id else tenant_id
self.index_name = get_index_name(self.tenant_id)

View File

@ -3,7 +3,7 @@ import os
ADMIN_USER_NAME = "admin_user"
API_SERVER_PROTOCOL = os.getenv("API_SERVER_PROTOCOL") or "http"
API_SERVER_HOST = os.getenv("API_SERVER_HOST") or "127.0.0.1"
API_SERVER_HOST = os.getenv("API_SERVER_HOST") or "localhost"
API_SERVER_PORT = os.getenv("API_SERVER_PORT") or "8080"
API_SERVER_URL = f"{API_SERVER_PROTOCOL}://{API_SERVER_HOST}:{API_SERVER_PORT}"
MAX_DELAY = 45

View File

@ -1,13 +1,18 @@
"use client";
import { Button } from "@/components/ui/button";
import { CCPairFullInfo, ConnectorCredentialPairStatus } from "./types";
import {
CCPairFullInfo,
ConnectorCredentialPairStatus,
statusIsNotCurrentlyActive,
} from "./types";
import { usePopup } from "@/components/admin/connectors/Popup";
import { mutate } from "swr";
import { buildCCPairInfoUrl } from "./lib";
import { setCCPairStatus } from "@/lib/ccPair";
import { useState } from "react";
import { LoadingAnimation } from "@/components/Loading";
import { ConfirmEntityModal } from "@/components/modals/ConfirmEntityModal";
export function ModifyStatusButtonCluster({
ccPair,
@ -16,11 +21,24 @@ export function ModifyStatusButtonCluster({
}) {
const { popup, setPopup } = usePopup();
const [isUpdating, setIsUpdating] = useState(false);
const [showConfirmModal, setShowConfirmModal] = useState(false);
const handleStatusChange = async (
newStatus: ConnectorCredentialPairStatus
) => {
if (isUpdating) return; // Prevent double-clicks or multiple requests
if (
ccPair.status === ConnectorCredentialPairStatus.INVALID &&
newStatus === ConnectorCredentialPairStatus.ACTIVE
) {
setShowConfirmModal(true);
} else {
await updateStatus(newStatus);
}
};
const updateStatus = async (newStatus: ConnectorCredentialPairStatus) => {
setIsUpdating(true);
try {
@ -38,30 +56,23 @@ export function ModifyStatusButtonCluster({
};
// Compute the button text based on current state and backend status
const buttonText =
ccPair.status === ConnectorCredentialPairStatus.PAUSED
? "Re-Enable"
: "Pause";
const isNotActive = statusIsNotCurrentlyActive(ccPair.status);
const buttonText = isNotActive ? "Re-Enable" : "Pause";
const tooltip =
ccPair.status === ConnectorCredentialPairStatus.PAUSED
? "Click to start indexing again!"
: "When paused, the connector's documents will still be visible. However, no new documents will be indexed.";
const tooltip = isNotActive
? "Click to start indexing again!"
: "When paused, the connector's documents will still be visible. However, no new documents will be indexed.";
return (
<>
{popup}
<Button
className="flex items-center justify-center w-auto min-w-[100px] px-4 py-2"
variant={
ccPair.status === ConnectorCredentialPairStatus.PAUSED
? "success-reverse"
: "default"
}
variant={isNotActive ? "success-reverse" : "default"}
disabled={isUpdating}
onClick={() =>
handleStatusChange(
ccPair.status === ConnectorCredentialPairStatus.PAUSED
isNotActive
? ConnectorCredentialPairStatus.ACTIVE
: ConnectorCredentialPairStatus.PAUSED
)
@ -70,17 +81,27 @@ export function ModifyStatusButtonCluster({
>
{isUpdating ? (
<LoadingAnimation
text={
ccPair.status === ConnectorCredentialPairStatus.PAUSED
? "Resuming"
: "Pausing"
}
text={isNotActive ? "Resuming" : "Pausing"}
size="text-md"
/>
) : (
buttonText
)}
</Button>
{showConfirmModal && (
<ConfirmEntityModal
entityType="Invalid Connector"
entityName={ccPair.name}
onClose={() => setShowConfirmModal(false)}
onSubmit={() => {
setShowConfirmModal(false);
updateStatus(ConnectorCredentialPairStatus.ACTIVE);
}}
additionalDetails="This connector was previously marked as invalid. Please verify that your configuration is correct before re-enabling. Are you sure you want to proceed?"
actionButtonText="Re-Enable"
variant="action"
/>
)}
</>
);
}

View File

@ -123,7 +123,8 @@ export function ReIndexButton({
disabled={
isDisabled ||
ccPairStatus == ConnectorCredentialPairStatus.DELETING ||
ccPairStatus == ConnectorCredentialPairStatus.PAUSED
ccPairStatus == ConnectorCredentialPairStatus.PAUSED ||
ccPairStatus == ConnectorCredentialPairStatus.INVALID
}
tooltip={getCCPairStatusMessage(isDisabled, isIndexing, ccPairStatus)}
>

View File

@ -15,6 +15,18 @@ export enum ConnectorCredentialPairStatus {
INVALID = "INVALID",
}
/**
* Returns true if the status is not currently active (i.e. paused or invalid), but not deleting
*/
export function statusIsNotCurrentlyActive(
status: ConnectorCredentialPairStatus
): boolean {
return (
status === ConnectorCredentialPairStatus.PAUSED ||
status === ConnectorCredentialPairStatus.INVALID
);
}
export interface CCPairFullInfo {
id: number;
name: string;

View File

@ -168,7 +168,7 @@ export default function CredentialSection({
onClick={() => {
setShowModifyCredential(true);
}}
className="flex items-center gap-x-2 cursor-pointer bg-background-100 border-border border-2 hover:bg-border p-1.5 rounded-lg text-text-700"
className="flex items-center gap-x-2 cursor-pointer bg-neutral-800 border-neutral-600 border-2 hover:bg-neutral-700 p-1.5 rounded-lg text-neutral-300"
>
<FaSwatchbook />
Update Credentials

View File

@ -37,7 +37,7 @@ export const ConfirmEntityModal = ({
};
return (
<Modal width="rounded max-w-sm w-full" onOutsideClick={onClose}>
<Modal width="rounded max-w-md w-full" onOutsideClick={onClose}>
<>
<div className="flex mb-4">
<h2 className="my-auto text-2xl font-bold">