From 839c8611b764f9c2517374e0c9b6600489cec3d7 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Fri, 4 Apr 2025 09:21:34 -0700 Subject: [PATCH] Bugfix/salesforce (#4335) * add some gc * small refactoring for temp directories * WIP * add some gc collects and size calculations * un-xfail * fix salesforce test * loose check for number of docs * adjust test again * cleanup * nuke directory param, remove using sqlite db to cache email / id mappings --------- Co-authored-by: Richard Kuo (Onyx) --- .../salesforce/postprocessing.py | 4 +- .../external_permissions/salesforce/utils.py | 34 +-- backend/onyx/connectors/models.py | 47 ++++ .../onyx/connectors/salesforce/connector.py | 209 +++++++++++++++--- .../connectors/salesforce/doc_conversion.py | 10 +- .../connectors/salesforce/salesforce_calls.py | 98 +++++--- .../connectors/salesforce/sqlite_functions.py | 104 ++++++--- backend/onyx/connectors/salesforce/utils.py | 4 +- backend/onyx/context/search/pipeline.py | 17 +- backend/onyx/indexing/indexing_pipeline.py | 14 +- .../search_nlp_models.py | 4 +- .../salesforce/test_salesforce_connector.py | 43 ++-- .../salesforce/test_salesforce_data.json | 164 +++++++++++++- .../salesforce/test_postprocessing.py | 4 + .../salesforce/test_salesforce_sqlite.py | 93 ++++---- 15 files changed, 632 insertions(+), 217 deletions(-) diff --git a/backend/ee/onyx/external_permissions/salesforce/postprocessing.py b/backend/ee/onyx/external_permissions/salesforce/postprocessing.py index 67d79fea3..e4e29d677 100644 --- a/backend/ee/onyx/external_permissions/salesforce/postprocessing.py +++ b/backend/ee/onyx/external_permissions/salesforce/postprocessing.py @@ -51,9 +51,9 @@ def _get_objects_access_for_user_email_from_salesforce( # This is cached in the function so the first query takes an extra 0.1-0.3 seconds # but subsequent queries by the same user are essentially instant - start_time = time.time() + start_time = time.monotonic() user_id = get_salesforce_user_id_from_email(salesforce_client, user_email) - end_time = time.time() + end_time = time.monotonic() logger.info( f"Time taken to get Salesforce user ID: {end_time - start_time} seconds" ) diff --git a/backend/ee/onyx/external_permissions/salesforce/utils.py b/backend/ee/onyx/external_permissions/salesforce/utils.py index 18571ee52..a1fd692be 100644 --- a/backend/ee/onyx/external_permissions/salesforce/utils.py +++ b/backend/ee/onyx/external_permissions/salesforce/utils.py @@ -1,10 +1,6 @@ from simple_salesforce import Salesforce from sqlalchemy.orm import Session -from onyx.connectors.salesforce.sqlite_functions import get_user_id_by_email -from onyx.connectors.salesforce.sqlite_functions import init_db -from onyx.connectors.salesforce.sqlite_functions import NULL_ID_STRING -from onyx.connectors.salesforce.sqlite_functions import update_email_to_id_table from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.document import get_cc_pairs_for_document from onyx.utils.logger import setup_logger @@ -28,6 +24,8 @@ def get_any_salesforce_client_for_doc_id( E.g. there are 2 different credential sets for 2 different salesforce cc_pairs but only one has the permissions to access the permissions needed for the query. """ + + # NOTE: this global seems very very bad global _ANY_SALESFORCE_CLIENT if _ANY_SALESFORCE_CLIENT is None: cc_pairs = get_cc_pairs_for_document(db_session, doc_id) @@ -84,35 +82,21 @@ def get_salesforce_user_id_from_email( salesforce database. (Around 0.1-0.3 seconds) If it's cached or stored in the local salesforce database, it's fast (<0.001 seconds). """ + + # NOTE: this global seems bad global _CACHED_SF_EMAIL_TO_ID_MAP if user_email in _CACHED_SF_EMAIL_TO_ID_MAP: if _CACHED_SF_EMAIL_TO_ID_MAP[user_email] is not None: return _CACHED_SF_EMAIL_TO_ID_MAP[user_email] - db_exists = True - try: - # Check if the user is already in the database - user_id = get_user_id_by_email(user_email) - except Exception: - init_db() - try: - user_id = get_user_id_by_email(user_email) - except Exception as e: - logger.error(f"Error checking if user is in database: {e}") - user_id = None - db_exists = False + # some caching via sqlite existed here before ... check history if interested + + # ...query Salesforce and store the result in the database + user_id = _query_salesforce_user_id(sf_client, user_email) - # If no entry is found in the database (indicated by user_id being None)... if user_id is None: - # ...query Salesforce and store the result in the database - user_id = _query_salesforce_user_id(sf_client, user_email) - if db_exists: - update_email_to_id_table(user_email, user_id) - return user_id - elif user_id is None: - return None - elif user_id == NULL_ID_STRING: return None + # If the found user_id is real, cache it _CACHED_SF_EMAIL_TO_ID_MAP[user_email] = user_id return user_id diff --git a/backend/onyx/connectors/models.py b/backend/onyx/connectors/models.py index ac3fa42bd..4dfadb1d9 100644 --- a/backend/onyx/connectors/models.py +++ b/backend/onyx/connectors/models.py @@ -1,3 +1,4 @@ +import sys from datetime import datetime from enum import Enum from typing import Any @@ -40,6 +41,9 @@ class TextSection(Section): text: str link: str | None = None + def __sizeof__(self) -> int: + return sys.getsizeof(self.text) + sys.getsizeof(self.link) + class ImageSection(Section): """Section containing an image reference""" @@ -47,6 +51,9 @@ class ImageSection(Section): image_file_name: str link: str | None = None + def __sizeof__(self) -> int: + return sys.getsizeof(self.image_file_name) + sys.getsizeof(self.link) + class BasicExpertInfo(BaseModel): """Basic Information for the owner of a document, any of the fields can be left as None @@ -110,6 +117,14 @@ class BasicExpertInfo(BaseModel): ) ) + def __sizeof__(self) -> int: + size = sys.getsizeof(self.display_name) + size += sys.getsizeof(self.first_name) + size += sys.getsizeof(self.middle_initial) + size += sys.getsizeof(self.last_name) + size += sys.getsizeof(self.email) + return size + class DocumentBase(BaseModel): """Used for Onyx ingestion api, the ID is inferred before use if not provided""" @@ -163,6 +178,32 @@ class DocumentBase(BaseModel): attributes.append(k + INDEX_SEPARATOR + v) return attributes + def __sizeof__(self) -> int: + size = sys.getsizeof(self.id) + for section in self.sections: + size += sys.getsizeof(section) + size += sys.getsizeof(self.source) + size += sys.getsizeof(self.semantic_identifier) + size += sys.getsizeof(self.doc_updated_at) + size += sys.getsizeof(self.chunk_count) + + if self.primary_owners is not None: + for primary_owner in self.primary_owners: + size += sys.getsizeof(primary_owner) + else: + size += sys.getsizeof(self.primary_owners) + + if self.secondary_owners is not None: + for secondary_owner in self.secondary_owners: + size += sys.getsizeof(secondary_owner) + else: + size += sys.getsizeof(self.secondary_owners) + + size += sys.getsizeof(self.title) + size += sys.getsizeof(self.from_ingestion_api) + size += sys.getsizeof(self.additional_info) + return size + def get_text_content(self) -> str: return " ".join([section.text for section in self.sections if section.text]) @@ -194,6 +235,12 @@ class Document(DocumentBase): from_ingestion_api=base.from_ingestion_api, ) + def __sizeof__(self) -> int: + size = super().__sizeof__() + size += sys.getsizeof(self.id) + size += sys.getsizeof(self.source) + return size + class IndexingDocument(Document): """Document with processed sections for indexing""" diff --git a/backend/onyx/connectors/salesforce/connector.py b/backend/onyx/connectors/salesforce/connector.py index 6fa17c327..520a6f9d3 100644 --- a/backend/onyx/connectors/salesforce/connector.py +++ b/backend/onyx/connectors/salesforce/connector.py @@ -1,4 +1,9 @@ +import gc import os +import sys +import tempfile +from collections import defaultdict +from pathlib import Path from typing import Any from simple_salesforce import Salesforce @@ -21,9 +26,13 @@ from onyx.connectors.salesforce.salesforce_calls import get_all_children_of_sf_t from onyx.connectors.salesforce.sqlite_functions import get_affected_parent_ids_by_type from onyx.connectors.salesforce.sqlite_functions import get_record from onyx.connectors.salesforce.sqlite_functions import init_db +from onyx.connectors.salesforce.sqlite_functions import sqlite_log_stats from onyx.connectors.salesforce.sqlite_functions import update_sf_db_with_csv +from onyx.connectors.salesforce.utils import BASE_DATA_PATH +from onyx.connectors.salesforce.utils import get_sqlite_db_path from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface from onyx.utils.logger import setup_logger +from shared_configs.configs import MULTI_TENANT logger = setup_logger() @@ -32,6 +41,8 @@ _DEFAULT_PARENT_OBJECT_TYPES = ["Account"] class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): + MAX_BATCH_BYTES = 1024 * 1024 + def __init__( self, batch_size: int = INDEX_BATCH_SIZE, @@ -64,22 +75,45 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): raise ConnectorMissingCredentialError("Salesforce") return self._sf_client - def _fetch_from_salesforce( - self, + @staticmethod + def reconstruct_object_types(directory: str) -> dict[str, list[str] | None]: + """ + Scans the given directory for all CSV files and reconstructs the available object types. + Assumes filenames are formatted as "ObjectType.filename.csv" or "ObjectType.csv". + + Args: + directory (str): The path to the directory containing CSV files. + + Returns: + dict[str, list[str]]: A dictionary mapping object types to lists of file paths. + """ + object_types = defaultdict(list) + + for filename in os.listdir(directory): + if filename.endswith(".csv"): + parts = filename.split(".", 1) # Split on the first period + object_type = parts[0] # Take the first part as the object type + object_types[object_type].append(os.path.join(directory, filename)) + + return dict(object_types) + + @staticmethod + def _download_object_csvs( + directory: str, + parent_object_list: list[str], + sf_client: Salesforce, start: SecondsSinceUnixEpoch | None = None, end: SecondsSinceUnixEpoch | None = None, - ) -> GenerateDocumentsOutput: - init_db() - all_object_types: set[str] = set(self.parent_object_list) + ) -> None: + all_object_types: set[str] = set(parent_object_list) - logger.info(f"Starting with {len(self.parent_object_list)} parent object types") - logger.debug(f"Parent object types: {self.parent_object_list}") + logger.info( + f"Parent object types: num={len(parent_object_list)} list={parent_object_list}" + ) # This takes like 20 seconds - for parent_object_type in self.parent_object_list: - child_types = get_all_children_of_sf_type( - self.sf_client, parent_object_type - ) + for parent_object_type in parent_object_list: + child_types = get_all_children_of_sf_type(sf_client, parent_object_type) all_object_types.update(child_types) logger.debug( f"Found {len(child_types)} child types for {parent_object_type}" @@ -88,20 +122,53 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): # Always want to make sure user is grabbed for permissioning purposes all_object_types.add("User") - logger.info(f"Found total of {len(all_object_types)} object types to fetch") - logger.debug(f"All object types: {all_object_types}") + logger.info( + f"All object types: num={len(all_object_types)} list={all_object_types}" + ) + + # gc.collect() # checkpoint - we've found all object types, now time to fetch the data - logger.info("Starting to fetch CSVs for all object types") + logger.info("Fetching CSVs for all object types") + # This takes like 30 minutes first time and <2 minutes for updates object_type_to_csv_path = fetch_all_csvs_in_parallel( - sf_client=self.sf_client, + sf_client=sf_client, object_types=all_object_types, start=start, end=end, + target_dir=directory, ) + # print useful information + num_csvs = 0 + num_bytes = 0 + for object_type, csv_paths in object_type_to_csv_path.items(): + if not csv_paths: + continue + + for csv_path in csv_paths: + if not csv_path: + continue + + file_path = Path(csv_path) + file_size = file_path.stat().st_size + num_csvs += 1 + num_bytes += file_size + logger.info( + f"CSV info: object_type={object_type} path={csv_path} bytes={file_size}" + ) + + logger.info(f"CSV info total: total_csvs={num_csvs} total_bytes={num_bytes}") + + @staticmethod + def _load_csvs_to_db(csv_directory: str, db_directory: str) -> set[str]: updated_ids: set[str] = set() + + object_type_to_csv_path = SalesforceConnector.reconstruct_object_types( + csv_directory + ) + # This takes like 10 seconds # This is for testing the rest of the functionality if data has # already been fetched and put in sqlite @@ -120,10 +187,16 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): # If path is None, it means it failed to fetch the csv if csv_paths is None: continue + # Go through each csv path and use it to update the db for csv_path in csv_paths: - logger.debug(f"Updating {object_type} with {csv_path}") + logger.debug( + f"Processing CSV: object_type={object_type} " + f"csv={csv_path} " + f"len={Path(csv_path).stat().st_size}" + ) new_ids = update_sf_db_with_csv( + db_directory, object_type=object_type, csv_download_path=csv_path, ) @@ -132,49 +205,127 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): f"Added {len(new_ids)} new/updated records for {object_type}" ) + os.remove(csv_path) + + return updated_ids + + def _fetch_from_salesforce( + self, + temp_dir: str, + start: SecondsSinceUnixEpoch | None = None, + end: SecondsSinceUnixEpoch | None = None, + ) -> GenerateDocumentsOutput: + logger.info("_fetch_from_salesforce starting.") + if not self._sf_client: + raise RuntimeError("self._sf_client is None!") + + init_db(temp_dir) + + sqlite_log_stats(temp_dir) + + # Step 1 - download + SalesforceConnector._download_object_csvs( + temp_dir, self.parent_object_list, self._sf_client, start, end + ) + gc.collect() + + # Step 2 - load CSV's to sqlite + updated_ids = SalesforceConnector._load_csvs_to_db(temp_dir, temp_dir) + gc.collect() + logger.info(f"Found {len(updated_ids)} total updated records") logger.info( f"Starting to process parent objects of types: {self.parent_object_list}" ) - docs_to_yield: list[Document] = [] + # Step 3 - extract and index docs + batches_processed = 0 docs_processed = 0 + docs_to_yield: list[Document] = [] + docs_to_yield_bytes = 0 + # Takes 15-20 seconds per batch for parent_type, parent_id_batch in get_affected_parent_ids_by_type( + temp_dir, updated_ids=list(updated_ids), parent_types=self.parent_object_list, ): + batches_processed += 1 logger.info( - f"Processing batch of {len(parent_id_batch)} {parent_type} objects" + f"Processing batch: index={batches_processed} " + f"object_type={parent_type} " + f"len={len(parent_id_batch)} " + f"processed={docs_processed} " + f"remaining={len(updated_ids) - docs_processed}" ) for parent_id in parent_id_batch: - if not (parent_object := get_record(parent_id, parent_type)): + if not (parent_object := get_record(temp_dir, parent_id, parent_type)): logger.warning( f"Failed to get parent object {parent_id} for {parent_type}" ) continue - docs_to_yield.append( - convert_sf_object_to_doc( - sf_object=parent_object, - sf_instance=self.sf_client.sf_instance, - ) + doc = convert_sf_object_to_doc( + temp_dir, + sf_object=parent_object, + sf_instance=self.sf_client.sf_instance, ) + doc_sizeof = sys.getsizeof(doc) + docs_to_yield_bytes += doc_sizeof + docs_to_yield.append(doc) docs_processed += 1 - if len(docs_to_yield) >= self.batch_size: + # memory usage is sensitive to the input length, so we're yielding immediately + # if the batch exceeds a certain byte length + if ( + len(docs_to_yield) >= self.batch_size + or docs_to_yield_bytes > SalesforceConnector.MAX_BATCH_BYTES + ): yield docs_to_yield docs_to_yield = [] + docs_to_yield_bytes = 0 + + # observed a memory leak / size issue with the account table if we don't gc.collect here. + gc.collect() yield docs_to_yield + logger.info( + f"Final processing stats: " + f"processed={docs_processed} " + f"remaining={len(updated_ids) - docs_processed}" + ) def load_from_state(self) -> GenerateDocumentsOutput: - return self._fetch_from_salesforce() + if MULTI_TENANT: + # if multi tenant, we cannot expect the sqlite db to be cached/present + with tempfile.TemporaryDirectory() as temp_dir: + return self._fetch_from_salesforce(temp_dir) + + # nuke the db since we're starting from scratch + sqlite_db_path = get_sqlite_db_path(BASE_DATA_PATH) + if os.path.exists(sqlite_db_path): + logger.info(f"load_from_state: Removing db at {sqlite_db_path}.") + os.remove(sqlite_db_path) + return self._fetch_from_salesforce(BASE_DATA_PATH) def poll_source( self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch ) -> GenerateDocumentsOutput: - return self._fetch_from_salesforce(start=start, end=end) + if MULTI_TENANT: + # if multi tenant, we cannot expect the sqlite db to be cached/present + with tempfile.TemporaryDirectory() as temp_dir: + return self._fetch_from_salesforce(temp_dir, start=start, end=end) + + if start == 0: + # nuke the db if we're starting from scratch + sqlite_db_path = get_sqlite_db_path(BASE_DATA_PATH) + if os.path.exists(sqlite_db_path): + logger.info( + f"poll_source: Starting at time 0, removing db at {sqlite_db_path}." + ) + os.remove(sqlite_db_path) + + return self._fetch_from_salesforce(BASE_DATA_PATH) def retrieve_all_slim_documents( self, @@ -209,7 +360,7 @@ if __name__ == "__main__": "sf_security_token": os.environ["SF_SECURITY_TOKEN"], } ) - start_time = time.time() + start_time = time.monotonic() doc_count = 0 section_count = 0 text_count = 0 @@ -221,7 +372,7 @@ if __name__ == "__main__": for section in doc.sections: if isinstance(section, TextSection) and section.text is not None: text_count += len(section.text) - end_time = time.time() + end_time = time.monotonic() print(f"Doc count: {doc_count}") print(f"Section count: {section_count}") diff --git a/backend/onyx/connectors/salesforce/doc_conversion.py b/backend/onyx/connectors/salesforce/doc_conversion.py index 1e83b5dd1..287e5ce93 100644 --- a/backend/onyx/connectors/salesforce/doc_conversion.py +++ b/backend/onyx/connectors/salesforce/doc_conversion.py @@ -124,13 +124,14 @@ def _extract_section(salesforce_object: SalesforceObject, base_url: str) -> Text def _extract_primary_owners( + directory: str, sf_object: SalesforceObject, ) -> list[BasicExpertInfo] | None: object_dict = sf_object.data if not (last_modified_by_id := object_dict.get("LastModifiedById")): logger.warning(f"No LastModifiedById found for {sf_object.id}") return None - if not (last_modified_by := get_record(last_modified_by_id)): + if not (last_modified_by := get_record(directory, last_modified_by_id)): logger.warning(f"No LastModifiedBy found for {last_modified_by_id}") return None @@ -159,6 +160,7 @@ def _extract_primary_owners( def convert_sf_object_to_doc( + directory: str, sf_object: SalesforceObject, sf_instance: str, ) -> Document: @@ -170,8 +172,8 @@ def convert_sf_object_to_doc( extracted_semantic_identifier = object_dict.get("Name", "Unknown Object") sections = [_extract_section(sf_object, base_url)] - for id in get_child_ids(sf_object.id): - if not (child_object := get_record(id)): + for id in get_child_ids(directory, sf_object.id): + if not (child_object := get_record(directory, id)): continue sections.append(_extract_section(child_object, base_url)) @@ -181,7 +183,7 @@ def convert_sf_object_to_doc( source=DocumentSource.SALESFORCE, semantic_identifier=extracted_semantic_identifier, doc_updated_at=extracted_doc_updated_at, - primary_owners=_extract_primary_owners(sf_object), + primary_owners=_extract_primary_owners(directory, sf_object), metadata={}, ) return doc diff --git a/backend/onyx/connectors/salesforce/salesforce_calls.py b/backend/onyx/connectors/salesforce/salesforce_calls.py index 858c240b3..7273a526f 100644 --- a/backend/onyx/connectors/salesforce/salesforce_calls.py +++ b/backend/onyx/connectors/salesforce/salesforce_calls.py @@ -11,13 +11,12 @@ from simple_salesforce.bulk2 import SFBulk2Type from onyx.connectors.interfaces import SecondsSinceUnixEpoch from onyx.connectors.salesforce.sqlite_functions import has_at_least_one_object_of_type -from onyx.connectors.salesforce.utils import get_object_type_path from onyx.utils.logger import setup_logger logger = setup_logger() -def _build_time_filter_for_salesforce( +def _build_last_modified_time_filter_for_salesforce( start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None ) -> str: if start is None or end is None: @@ -30,6 +29,19 @@ def _build_time_filter_for_salesforce( ) +def _build_created_date_time_filter_for_salesforce( + start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None +) -> str: + if start is None or end is None: + return "" + start_datetime = datetime.fromtimestamp(start, UTC) + end_datetime = datetime.fromtimestamp(end, UTC) + return ( + f" WHERE CreatedDate > {start_datetime.isoformat()} " + f"AND CreatedDate < {end_datetime.isoformat()}" + ) + + def _get_sf_type_object_json(sf_client: Salesforce, type_name: str) -> Any: sf_object = SFType(type_name, sf_client.session_id, sf_client.sf_instance) return sf_object.describe() @@ -109,23 +121,6 @@ def _check_if_object_type_is_empty( return True -def _check_for_existing_csvs(sf_type: str) -> list[str] | None: - # Check if the csv already exists - if os.path.exists(get_object_type_path(sf_type)): - existing_csvs = [ - os.path.join(get_object_type_path(sf_type), f) - for f in os.listdir(get_object_type_path(sf_type)) - if f.endswith(".csv") - ] - # If the csv already exists, return the path - # This is likely due to a previous run that failed - # after downloading the csv but before the data was - # written to the db - if existing_csvs: - return existing_csvs - return None - - def _build_bulk_query(sf_client: Salesforce, sf_type: str, time_filter: str) -> str: queryable_fields = _get_all_queryable_fields_of_sf_type(sf_client, sf_type) query = f"SELECT {', '.join(queryable_fields)} FROM {sf_type}{time_filter}" @@ -133,16 +128,15 @@ def _build_bulk_query(sf_client: Salesforce, sf_type: str, time_filter: str) -> def _bulk_retrieve_from_salesforce( - sf_client: Salesforce, - sf_type: str, - time_filter: str, + sf_client: Salesforce, sf_type: str, time_filter: str, target_dir: str ) -> tuple[str, list[str] | None]: + """Returns a tuple of + 1. the salesforce object type + 2. the list of CSV's + """ if not _check_if_object_type_is_empty(sf_client, sf_type, time_filter): return sf_type, None - if existing_csvs := _check_for_existing_csvs(sf_type): - return sf_type, existing_csvs - query = _build_bulk_query(sf_client, sf_type, time_filter) bulk_2_handler = SFBulk2Handler( @@ -159,20 +153,33 @@ def _bulk_retrieve_from_salesforce( ) logger.info(f"Downloading {sf_type}") - logger.info(f"Query: {query}") + + logger.debug(f"Query: {query}") try: # This downloads the file to a file in the target path with a random name results = bulk_2_type.download( query=query, - path=get_object_type_path(sf_type), + path=target_dir, max_records=1000000, ) - all_download_paths = [result["file"] for result in results] + + # prepend each downloaded csv with the object type (delimiter = '.') + all_download_paths: list[str] = [] + for result in results: + original_file_path = result["file"] + directory, filename = os.path.split(original_file_path) + new_filename = f"{sf_type}.{filename}" + new_file_path = os.path.join(directory, new_filename) + os.rename(original_file_path, new_file_path) + all_download_paths.append(new_file_path) logger.info(f"Downloaded {sf_type} to {all_download_paths}") return sf_type, all_download_paths except Exception as e: - logger.info(f"Failed to download salesforce csv for object type {sf_type}: {e}") + logger.error( + f"Failed to download salesforce csv for object type {sf_type}: {e}" + ) + logger.warning(f"Exceptioning query for object type {sf_type}: {query}") return sf_type, None @@ -181,12 +188,35 @@ def fetch_all_csvs_in_parallel( object_types: set[str], start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None, + target_dir: str, ) -> dict[str, list[str] | None]: """ Fetches all the csvs in parallel for the given object types Returns a dict of (sf_type, full_download_path) """ - time_filter = _build_time_filter_for_salesforce(start, end) + + # these types don't query properly and need looking at + # problem_types: set[str] = { + # "ContentDocumentLink", + # "RecordActionHistory", + # "PendingOrderSummary", + # "UnifiedActivityRelation", + # } + + # these types don't have a LastModifiedDate field and instead use CreatedDate + created_date_types: set[str] = { + "AccountHistory", + "AccountTag", + "EntitySubscription", + } + + last_modified_time_filter = _build_last_modified_time_filter_for_salesforce( + start, end + ) + created_date_time_filter = _build_created_date_time_filter_for_salesforce( + start, end + ) + time_filter_for_each_object_type = {} # We do this outside of the thread pool executor because this requires # a database connection and we don't want to block the thread pool @@ -195,8 +225,11 @@ def fetch_all_csvs_in_parallel( """Only add time filter if there is at least one object of the type in the database. We aren't worried about partially completed object update runs because this occurs after we check for existing csvs which covers this case""" - if has_at_least_one_object_of_type(sf_type): - time_filter_for_each_object_type[sf_type] = time_filter + if has_at_least_one_object_of_type(target_dir, sf_type): + if sf_type in created_date_types: + time_filter_for_each_object_type[sf_type] = created_date_time_filter + else: + time_filter_for_each_object_type[sf_type] = last_modified_time_filter else: time_filter_for_each_object_type[sf_type] = "" @@ -207,6 +240,7 @@ def fetch_all_csvs_in_parallel( sf_client=sf_client, sf_type=object_type, time_filter=time_filter_for_each_object_type[object_type], + target_dir=target_dir, ), object_types, ) diff --git a/backend/onyx/connectors/salesforce/sqlite_functions.py b/backend/onyx/connectors/salesforce/sqlite_functions.py index 029b4e123..786a8ce90 100644 --- a/backend/onyx/connectors/salesforce/sqlite_functions.py +++ b/backend/onyx/connectors/salesforce/sqlite_functions.py @@ -2,8 +2,10 @@ import csv import json import os import sqlite3 +import time from collections.abc import Iterator from contextlib import contextmanager +from pathlib import Path from onyx.connectors.salesforce.utils import get_sqlite_db_path from onyx.connectors.salesforce.utils import SalesforceObject @@ -16,6 +18,7 @@ logger = setup_logger() @contextmanager def get_db_connection( + directory: str, isolation_level: str | None = None, ) -> Iterator[sqlite3.Connection]: """Get a database connection with proper isolation level and error handling. @@ -25,7 +28,7 @@ def get_db_connection( can be "IMMEDIATE" or "EXCLUSIVE" for more strict isolation. """ # 60 second timeout for locks - conn = sqlite3.connect(get_sqlite_db_path(), timeout=60.0) + conn = sqlite3.connect(get_sqlite_db_path(directory), timeout=60.0) if isolation_level is not None: conn.isolation_level = isolation_level @@ -38,17 +41,41 @@ def get_db_connection( conn.close() -def init_db() -> None: +def sqlite_log_stats(directory: str) -> None: + with get_db_connection(directory, "EXCLUSIVE") as conn: + cache_pages = conn.execute("PRAGMA cache_size").fetchone()[0] + page_size = conn.execute("PRAGMA page_size").fetchone()[0] + if cache_pages >= 0: + cache_bytes = cache_pages * page_size + else: + cache_bytes = abs(cache_pages * 1024) + logger.info( + f"SQLite stats: sqlite_version={sqlite3.sqlite_version} " + f"cache_pages={cache_pages} " + f"page_size={page_size} " + f"cache_bytes={cache_bytes}" + ) + + +def init_db(directory: str) -> None: """Initialize the SQLite database with required tables if they don't exist.""" # Create database directory if it doesn't exist - os.makedirs(os.path.dirname(get_sqlite_db_path()), exist_ok=True) + start = time.monotonic() - with get_db_connection("EXCLUSIVE") as conn: + os.makedirs(os.path.dirname(get_sqlite_db_path(directory)), exist_ok=True) + + with get_db_connection(directory, "EXCLUSIVE") as conn: cursor = conn.cursor() - db_exists = os.path.exists(get_sqlite_db_path()) + db_exists = os.path.exists(get_sqlite_db_path(directory)) + + if db_exists: + file_path = Path(get_sqlite_db_path(directory)) + file_size = file_path.stat().st_size + logger.info(f"init_db - found existing sqlite db: len={file_size}") + else: + # why is this only if the db doesn't exist? - if not db_exists: # Enable WAL mode for better concurrent access and write performance cursor.execute("PRAGMA journal_mode=WAL") cursor.execute("PRAGMA synchronous=NORMAL") @@ -143,16 +170,31 @@ def init_db() -> None: """, ) + elapsed = time.monotonic() - start + logger.info(f"init_db - create tables and indices: elapsed={elapsed:.2f}") + # Analyze tables to help query planner - cursor.execute("ANALYZE relationships") - cursor.execute("ANALYZE salesforce_objects") - cursor.execute("ANALYZE relationship_types") - cursor.execute("ANALYZE user_email_map") + # NOTE(rkuo): skip ANALYZE - it takes too long and we likely don't have + # complicated queries that need this + # start = time.monotonic() + # cursor.execute("ANALYZE relationships") + # cursor.execute("ANALYZE salesforce_objects") + # cursor.execute("ANALYZE relationship_types") + # cursor.execute("ANALYZE user_email_map") + # elapsed = time.monotonic() - start + # logger.info(f"init_db - analyze: elapsed={elapsed:.2f}") # If database already existed but user_email_map needs to be populated + start = time.monotonic() cursor.execute("SELECT COUNT(*) FROM user_email_map") + elapsed = time.monotonic() - start + logger.info(f"init_db - count user_email_map: elapsed={elapsed:.2f}") + + start = time.monotonic() if cursor.fetchone()[0] == 0: _update_user_email_map(conn) + elapsed = time.monotonic() - start + logger.info(f"init_db - update_user_email_map: elapsed={elapsed:.2f}") conn.commit() @@ -240,15 +282,15 @@ def _update_user_email_map(conn: sqlite3.Connection) -> None: def update_sf_db_with_csv( + directory: str, object_type: str, csv_download_path: str, - delete_csv_after_use: bool = True, ) -> list[str]: """Update the SF DB with a CSV file using SQLite storage.""" updated_ids = [] # Use IMMEDIATE to get a write lock at the start of the transaction - with get_db_connection("IMMEDIATE") as conn: + with get_db_connection(directory, "IMMEDIATE") as conn: cursor = conn.cursor() with open(csv_download_path, "r", newline="", encoding="utf-8") as f: @@ -295,17 +337,12 @@ def update_sf_db_with_csv( conn.commit() - if delete_csv_after_use: - # Remove the csv file after it has been used - # to successfully update the db - os.remove(csv_download_path) - return updated_ids -def get_child_ids(parent_id: str) -> set[str]: +def get_child_ids(directory: str, parent_id: str) -> set[str]: """Get all child IDs for a given parent ID.""" - with get_db_connection() as conn: + with get_db_connection(directory) as conn: cursor = conn.cursor() # Force index usage with INDEXED BY @@ -317,9 +354,9 @@ def get_child_ids(parent_id: str) -> set[str]: return child_ids -def get_type_from_id(object_id: str) -> str | None: +def get_type_from_id(directory: str, object_id: str) -> str | None: """Get the type of an object from its ID.""" - with get_db_connection() as conn: + with get_db_connection(directory) as conn: cursor = conn.cursor() cursor.execute( "SELECT object_type FROM salesforce_objects WHERE id = ?", (object_id,) @@ -332,15 +369,15 @@ def get_type_from_id(object_id: str) -> str | None: def get_record( - object_id: str, object_type: str | None = None + directory: str, object_id: str, object_type: str | None = None ) -> SalesforceObject | None: """Retrieve the record and return it as a SalesforceObject.""" if object_type is None: - object_type = get_type_from_id(object_id) + object_type = get_type_from_id(directory, object_id) if not object_type: return None - with get_db_connection() as conn: + with get_db_connection(directory) as conn: cursor = conn.cursor() cursor.execute("SELECT data FROM salesforce_objects WHERE id = ?", (object_id,)) result = cursor.fetchone() @@ -352,9 +389,9 @@ def get_record( return SalesforceObject(id=object_id, type=object_type, data=data) -def find_ids_by_type(object_type: str) -> list[str]: +def find_ids_by_type(directory: str, object_type: str) -> list[str]: """Find all object IDs for rows of the specified type.""" - with get_db_connection() as conn: + with get_db_connection(directory) as conn: cursor = conn.cursor() cursor.execute( "SELECT id FROM salesforce_objects WHERE object_type = ?", (object_type,) @@ -363,6 +400,7 @@ def find_ids_by_type(object_type: str) -> list[str]: def get_affected_parent_ids_by_type( + directory: str, updated_ids: list[str], parent_types: list[str], batch_size: int = 500, @@ -374,7 +412,7 @@ def get_affected_parent_ids_by_type( updated_ids_batches = batch_list(updated_ids, batch_size) updated_parent_ids: set[str] = set() - with get_db_connection() as conn: + with get_db_connection(directory) as conn: cursor = conn.cursor() for batch_ids in updated_ids_batches: @@ -419,7 +457,7 @@ def get_affected_parent_ids_by_type( yield parent_type, new_affected_ids -def has_at_least_one_object_of_type(object_type: str) -> bool: +def has_at_least_one_object_of_type(directory: str, object_type: str) -> bool: """Check if there is at least one object of the specified type in the database. Args: @@ -428,7 +466,7 @@ def has_at_least_one_object_of_type(object_type: str) -> bool: Returns: bool: True if at least one object exists, False otherwise """ - with get_db_connection() as conn: + with get_db_connection(directory) as conn: cursor = conn.cursor() cursor.execute( "SELECT COUNT(*) FROM salesforce_objects WHERE object_type = ?", @@ -443,7 +481,7 @@ def has_at_least_one_object_of_type(object_type: str) -> bool: NULL_ID_STRING = "N/A" -def get_user_id_by_email(email: str) -> str | None: +def get_user_id_by_email(directory: str, email: str) -> str | None: """Get the Salesforce User ID for a given email address. Args: @@ -454,7 +492,7 @@ def get_user_id_by_email(email: str) -> str | None: - was_found: True if the email exists in the table, False if not found - user_id: The Salesforce User ID if exists, None otherwise """ - with get_db_connection() as conn: + with get_db_connection(directory) as conn: cursor = conn.cursor() cursor.execute("SELECT user_id FROM user_email_map WHERE email = ?", (email,)) result = cursor.fetchone() @@ -463,10 +501,10 @@ def get_user_id_by_email(email: str) -> str | None: return result[0] -def update_email_to_id_table(email: str, id: str | None) -> None: +def update_email_to_id_table(directory: str, email: str, id: str | None) -> None: """Update the email to ID map table with a new email and ID.""" id_to_use = id or NULL_ID_STRING - with get_db_connection() as conn: + with get_db_connection(directory) as conn: cursor = conn.cursor() cursor.execute( "INSERT OR REPLACE INTO user_email_map (email, user_id) VALUES (?, ?)", diff --git a/backend/onyx/connectors/salesforce/utils.py b/backend/onyx/connectors/salesforce/utils.py index 11ada9d01..c544d1807 100644 --- a/backend/onyx/connectors/salesforce/utils.py +++ b/backend/onyx/connectors/salesforce/utils.py @@ -30,9 +30,9 @@ class SalesforceObject: BASE_DATA_PATH = os.path.join(os.path.dirname(__file__), "data") -def get_sqlite_db_path() -> str: +def get_sqlite_db_path(directory: str) -> str: """Get the path to the sqlite db file.""" - return os.path.join(BASE_DATA_PATH, "salesforce_db.sqlite") + return os.path.join(directory, "salesforce_db.sqlite") def get_object_type_path(object_type: str) -> str: diff --git a/backend/onyx/context/search/pipeline.py b/backend/onyx/context/search/pipeline.py index f1b1cf81d..135225250 100644 --- a/backend/onyx/context/search/pipeline.py +++ b/backend/onyx/context/search/pipeline.py @@ -227,16 +227,13 @@ class SearchPipeline: # If ee is enabled, censor the chunk sections based on user access # Otherwise, return the retrieved chunks - censored_chunks = cast( - list[InferenceChunk], - fetch_ee_implementation_or_noop( - "onyx.external_permissions.post_query_censoring", - "_post_query_chunk_censoring", - retrieved_chunks, - )( - chunks=retrieved_chunks, - user=self.user, - ), + censored_chunks: list[InferenceChunk] = fetch_ee_implementation_or_noop( + "onyx.external_permissions.post_query_censoring", + "_post_query_chunk_censoring", + retrieved_chunks, + )( + chunks=retrieved_chunks, + user=self.user, ) above = self.search_query.chunks_above diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index 7af0b3999..dbc6ad8c2 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -459,10 +459,6 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]: llm = get_default_llm_with_vision() if not llm: - logger.warning( - "No vision-capable LLM available. Image sections will not be processed." - ) - # Even without LLM, we still convert to IndexingDocument with base Sections return [ IndexingDocument( @@ -929,10 +925,12 @@ def index_doc_batch( for chunk_num, chunk in enumerate(chunks_with_embeddings) ] - logger.debug( - "Indexing the following chunks: " - f"{[chunk.to_short_descriptor() for chunk in access_aware_chunks]}" - ) + short_descriptor_list = [ + chunk.to_short_descriptor() for chunk in access_aware_chunks + ] + short_descriptor_log = str(short_descriptor_list)[:1024] + logger.debug(f"Indexing the following chunks: {short_descriptor_log}") + # A document will not be spread across different batches, so all the # documents with chunks in this set, are fully represented by the chunks # in this set diff --git a/backend/onyx/natural_language_processing/search_nlp_models.py b/backend/onyx/natural_language_processing/search_nlp_models.py index ad8ac25f9..91b7d310e 100644 --- a/backend/onyx/natural_language_processing/search_nlp_models.py +++ b/backend/onyx/natural_language_processing/search_nlp_models.py @@ -202,8 +202,8 @@ class EmbeddingModel: end_time = time.time() processing_time = end_time - start_time - logger.info( - f"Batch {batch_idx} processing time: {processing_time:.2f} seconds" + logger.debug( + f"EmbeddingModel.process_batch: Batch {batch_idx} processing time: {processing_time:.2f} seconds" ) return batch_idx, response.embeddings diff --git a/backend/tests/daily/connectors/salesforce/test_salesforce_connector.py b/backend/tests/daily/connectors/salesforce/test_salesforce_connector.py index 500c6ea8c..7c5395660 100644 --- a/backend/tests/daily/connectors/salesforce/test_salesforce_connector.py +++ b/backend/tests/daily/connectors/salesforce/test_salesforce_connector.py @@ -35,23 +35,22 @@ def salesforce_connector() -> SalesforceConnector: connector = SalesforceConnector( requested_objects=["Account", "Contact", "Opportunity"], ) + + username = os.environ["SF_USERNAME"] + password = os.environ["SF_PASSWORD"] + security_token = os.environ["SF_SECURITY_TOKEN"] + connector.load_credentials( { - "sf_username": os.environ["SF_USERNAME"], - "sf_password": os.environ["SF_PASSWORD"], - "sf_security_token": os.environ["SF_SECURITY_TOKEN"], + "sf_username": username, + "sf_password": password, + "sf_security_token": security_token, } ) return connector # TODO: make the credentials not expire -@pytest.mark.xfail( - reason=( - "Credentials change over time, so this test will fail if run when " - "the credentials expire." - ) -) def test_salesforce_connector_basic(salesforce_connector: SalesforceConnector) -> None: test_data = load_test_data() target_test_doc: Document | None = None @@ -61,21 +60,26 @@ def test_salesforce_connector_basic(salesforce_connector: SalesforceConnector) - all_docs.append(doc) if doc.id == test_data["id"]: target_test_doc = doc + break + + # The number of docs here seems to change actively so do a very loose check + # as of 2025-03-28 it was around 32472 + assert len(all_docs) > 32000 + assert len(all_docs) < 40000 - assert len(all_docs) == 6 assert target_test_doc is not None # Set of received links received_links: set[str] = set() # List of received text fields, which contain key-value pairs seperated by newlines - recieved_text: list[str] = [] + received_text: list[str] = [] # Iterate over the sections of the target test doc to extract the links and text for section in target_test_doc.sections: assert section.link assert section.text received_links.add(section.link) - recieved_text.append(section.text) + received_text.append(section.text) # Check that the received links match the expected links from the test data json expected_links = set(test_data["expected_links"]) @@ -85,8 +89,9 @@ def test_salesforce_connector_basic(salesforce_connector: SalesforceConnector) - expected_text = test_data["expected_text"] if not isinstance(expected_text, list): raise ValueError("Expected text is not a list") + unparsed_expected_key_value_pairs: list[str] = expected_text - received_key_value_pairs = extract_key_value_pairs_to_set(recieved_text) + received_key_value_pairs = extract_key_value_pairs_to_set(received_text) expected_key_value_pairs = extract_key_value_pairs_to_set( unparsed_expected_key_value_pairs ) @@ -96,13 +101,21 @@ def test_salesforce_connector_basic(salesforce_connector: SalesforceConnector) - assert target_test_doc.source == DocumentSource.SALESFORCE assert target_test_doc.semantic_identifier == test_data["semantic_identifier"] assert target_test_doc.metadata == test_data["metadata"] - assert target_test_doc.primary_owners == test_data["primary_owners"] + + assert target_test_doc.primary_owners is not None + primary_owner = target_test_doc.primary_owners[0] + expected_primary_owner = test_data["primary_owners"] + assert isinstance(expected_primary_owner, dict) + assert primary_owner.email == expected_primary_owner["email"] + assert primary_owner.first_name == expected_primary_owner["first_name"] + assert primary_owner.last_name == expected_primary_owner["last_name"] + assert target_test_doc.secondary_owners == test_data["secondary_owners"] assert target_test_doc.title == test_data["title"] # TODO: make the credentials not expire -@pytest.mark.xfail( +@pytest.mark.skip( reason=( "Credentials change over time, so this test will fail if run when " "the credentials expire." diff --git a/backend/tests/daily/connectors/salesforce/test_salesforce_data.json b/backend/tests/daily/connectors/salesforce/test_salesforce_data.json index 24980d123..048b4156c 100644 --- a/backend/tests/daily/connectors/salesforce/test_salesforce_data.json +++ b/backend/tests/daily/connectors/salesforce/test_salesforce_data.json @@ -1,20 +1,162 @@ { - "id": "SALESFORCE_001fI000005drUcQAI", + "id": "SALESFORCE_001bm00000eu6n5AAA", "expected_links": [ - "https://customization-ruby-2195.my.salesforce.com/001fI000005drUcQAI", - "https://customization-ruby-2195.my.salesforce.com/003fI000001jiCPQAY", - "https://customization-ruby-2195.my.salesforce.com/017fI00000T7hvsQAB", - "https://customization-ruby-2195.my.salesforce.com/006fI000000rDvBQAU" + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpEeAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqd3AAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoKiAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvDSAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrmHAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrl2AAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvejAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStlvAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpPfAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrP9AAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvlMAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESt3JAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoBkAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStw2AAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrkMAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESojKAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuLEAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoSIAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESu2YAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvgSAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESurnAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrnqAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoB5AAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuJuAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrfyAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/001bm00000eu6n5AAA", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpUHAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsgGAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESr7UAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESu1BAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpqzAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESplZAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvJ3AAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESurKAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStSiAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuJFAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESu8xAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqfzAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqsrAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStoZAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsIUAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsAGAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESv8GAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrOKAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoUmAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESudKAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuJ8AAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvf2AAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESw3qAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESugRAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESr18AAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqV1AAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuLVAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpjoAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqULAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuCAAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrfpAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESp5YAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrMNAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStaUAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESt5LAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrtcAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESomaAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrtIAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoToAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuWLAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrWvAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsJEAA1", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsxwAAD", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvUgAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvWjAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStBuAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpZiAAL", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuhYAAT", + "https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuWAAA1" ], "expected_text": [ - "BillingPostalCode: 60601\nType: Prospect\nWebsite: www.globalistindustries.com\nBillingCity: Chicago\nDescription: Globalist company\nIsDeleted: false\nIsPartner: false\nPhone: (312) 555-0456\nShippingCountry: USA\nShippingState: IL\nIsBuyer: false\nBillingCountry: USA\nBillingState: IL\nShippingPostalCode: 60601\nBillingStreet: 456 Market St\nIsCustomerPortal: false\nPersonActiveTrackerCount: 0\nShippingCity: Chicago\nShippingStreet: 456 Market St", - "FirstName: Michael\nMailingCountry: USA\nActiveTrackerCount: 0\nEmail: m.brown@globalindustries.com\nMailingState: IL\nMailingStreet: 456 Market St\nMailingCity: Chicago\nLastName: Brown\nTitle: CTO\nIsDeleted: false\nPhone: (312) 555-0456\nHasOptedOutOfEmail: false\nIsEmailBounced: false\nMailingPostalCode: 60601", - "ForecastCategory: Closed\nName: Global Industries Equipment Sale\nIsDeleted: false\nForecastCategoryName: Closed\nFiscalYear: 2024\nFiscalQuarter: 4\nIsClosed: true\nIsWon: true\nAmount: 5000000.0\nProbability: 100.0\nPushCount: 0\nHasOverdueTask: false\nStageName: Closed Won\nHasOpenActivity: false\nHasOpportunityLineItem: false", - "Field: created\nDataType: Text\nIsDeleted: false" + "IsDeleted: false\nBillingCity: Shaykh al \u00e1\u00b8\u00a8ad\u00c4\u00abd\nName: Voonder\nCleanStatus: Pending\nBillingStreet: 12 Cambridge Parkway", + "Email: eslayqzs@icio.us\nIsDeleted: false\nLastName: Slay\nIsEmailBounced: false\nFirstName: Ebeneser\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ptweedgdh@umich.edu\nIsDeleted: false\nLastName: Tweed\nIsEmailBounced: false\nFirstName: Paulita\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ehurnellnlx@facebook.com\nIsDeleted: false\nLastName: Hurnell\nIsEmailBounced: false\nFirstName: Eliot\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ccarik4q4@google.it\nIsDeleted: false\nLastName: Carik\nIsEmailBounced: false\nFirstName: Chadwick\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: cvannozziina6@moonfruit.com\nIsDeleted: false\nLastName: Vannozzii\nIsEmailBounced: false\nFirstName: Christophorus\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: mikringill2kz@hugedomains.com\nIsDeleted: false\nLastName: Ikringill\nIsEmailBounced: false\nFirstName: Meghann\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: bgrinvalray@fda.gov\nIsDeleted: false\nLastName: Grinval\nIsEmailBounced: false\nFirstName: Berti\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: aollanderhr7@cam.ac.uk\nIsDeleted: false\nLastName: Ollander\nIsEmailBounced: false\nFirstName: Annemarie\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: rwhitesideq38@gravatar.com\nIsDeleted: false\nLastName: Whiteside\nIsEmailBounced: false\nFirstName: Rolando\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: vkrafthmz@techcrunch.com\nIsDeleted: false\nLastName: Kraft\nIsEmailBounced: false\nFirstName: Vidovik\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: jhillaut@4shared.com\nIsDeleted: false\nLastName: Hill\nIsEmailBounced: false\nFirstName: Janel\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: lralstonycs@discovery.com\nIsDeleted: false\nLastName: Ralston\nIsEmailBounced: false\nFirstName: Lorrayne\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: blyttlewba@networkadvertising.org\nIsDeleted: false\nLastName: Lyttle\nIsEmailBounced: false\nFirstName: Ban\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: pplummernvf@technorati.com\nIsDeleted: false\nLastName: Plummer\nIsEmailBounced: false\nFirstName: Pete\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: babrahamoffxpb@theatlantic.com\nIsDeleted: false\nLastName: Abrahamoff\nIsEmailBounced: false\nFirstName: Brander\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ahargieym0@homestead.com\nIsDeleted: false\nLastName: Hargie\nIsEmailBounced: false\nFirstName: Aili\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: hstotthp2@yelp.com\nIsDeleted: false\nLastName: Stott\nIsEmailBounced: false\nFirstName: Hartley\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: jganniclifftuvj@blinklist.com\nIsDeleted: false\nLastName: Ganniclifft\nIsEmailBounced: false\nFirstName: Jamima\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ldodelly8q@ed.gov\nIsDeleted: false\nLastName: Dodell\nIsEmailBounced: false\nFirstName: Lynde\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: rmilner3cp@smh.com.au\nIsDeleted: false\nLastName: Milner\nIsEmailBounced: false\nFirstName: Ralph\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: gghiriardellic19@state.tx.us\nIsDeleted: false\nLastName: Ghiriardelli\nIsEmailBounced: false\nFirstName: Garv\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: rhubatschfpu@nature.com\nIsDeleted: false\nLastName: Hubatsch\nIsEmailBounced: false\nFirstName: Rose\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: mtrenholme1ws@quantcast.com\nIsDeleted: false\nLastName: Trenholme\nIsEmailBounced: false\nFirstName: Mariejeanne\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: jmussettpbd@over-blog.com\nIsDeleted: false\nLastName: Mussett\nIsEmailBounced: false\nFirstName: Juliann\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: bgoroni145@illinois.edu\nIsDeleted: false\nLastName: Goroni\nIsEmailBounced: false\nFirstName: Bernarr\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: afalls3ph@theguardian.com\nIsDeleted: false\nLastName: Falls\nIsEmailBounced: false\nFirstName: Angelia\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: lswettjoi@go.com\nIsDeleted: false\nLastName: Swett\nIsEmailBounced: false\nFirstName: Levon\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: emullinsz38@dailymotion.com\nIsDeleted: false\nLastName: Mullins\nIsEmailBounced: false\nFirstName: Elsa\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ibernettehco@ebay.co.uk\nIsDeleted: false\nLastName: Bernette\nIsEmailBounced: false\nFirstName: Ingrid\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: trisleybtt@simplemachines.org\nIsDeleted: false\nLastName: Risley\nIsEmailBounced: false\nFirstName: Toma\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: rgypsonqx1@goodreads.com\nIsDeleted: false\nLastName: Gypson\nIsEmailBounced: false\nFirstName: Reed\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: cposvneri28@jiathis.com\nIsDeleted: false\nLastName: Posvner\nIsEmailBounced: false\nFirstName: Culley\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: awilmut2rz@geocities.jp\nIsDeleted: false\nLastName: Wilmut\nIsEmailBounced: false\nFirstName: Andy\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: aluckwellra5@exblog.jp\nIsDeleted: false\nLastName: Luckwell\nIsEmailBounced: false\nFirstName: Andreana\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: irollings26j@timesonline.co.uk\nIsDeleted: false\nLastName: Rollings\nIsEmailBounced: false\nFirstName: Ibrahim\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: gspireqpd@g.co\nIsDeleted: false\nLastName: Spire\nIsEmailBounced: false\nFirstName: Gaelan\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: sbezleyk2y@acquirethisname.com\nIsDeleted: false\nLastName: Bezley\nIsEmailBounced: false\nFirstName: Sindee\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: icollerrr@flickr.com\nIsDeleted: false\nLastName: Coller\nIsEmailBounced: false\nFirstName: Inesita\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: kfolliott1bo@nature.com\nIsDeleted: false\nLastName: Folliott\nIsEmailBounced: false\nFirstName: Kennan\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: kroofjfo@gnu.org\nIsDeleted: false\nLastName: Roof\nIsEmailBounced: false\nFirstName: Karlik\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: lcovotti8s4@rediff.com\nIsDeleted: false\nLastName: Covotti\nIsEmailBounced: false\nFirstName: Lucho\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: gpatriskson1rs@census.gov\nIsDeleted: false\nLastName: Patriskson\nIsEmailBounced: false\nFirstName: Gardener\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: spidgleyqvw@usgs.gov\nIsDeleted: false\nLastName: Pidgley\nIsEmailBounced: false\nFirstName: Simona\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: cbecarrak0i@over-blog.com\nIsDeleted: false\nLastName: Becarra\nIsEmailBounced: false\nFirstName: Cally\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: aparkman9td@bbc.co.uk\nIsDeleted: false\nLastName: Parkman\nIsEmailBounced: false\nFirstName: Agneta\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: bboddingtonhn@quantcast.com\nIsDeleted: false\nLastName: Boddington\nIsEmailBounced: false\nFirstName: Betta\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: dcasementx0p@cafepress.com\nIsDeleted: false\nLastName: Casement\nIsEmailBounced: false\nFirstName: Dannie\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: hzornbhe@latimes.com\nIsDeleted: false\nLastName: Zorn\nIsEmailBounced: false\nFirstName: Haleigh\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: cfifieldbjb@blogspot.com\nIsDeleted: false\nLastName: Fifield\nIsEmailBounced: false\nFirstName: Christalle\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ddewerson4t3@skype.com\nIsDeleted: false\nLastName: Dewerson\nIsEmailBounced: false\nFirstName: Dyann\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: khullock52p@sohu.com\nIsDeleted: false\nLastName: Hullock\nIsEmailBounced: false\nFirstName: Kellina\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: tfremantle32n@bandcamp.com\nIsDeleted: false\nLastName: Fremantle\nIsEmailBounced: false\nFirstName: Turner\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: sbernardtylp@nps.gov\nIsDeleted: false\nLastName: Bernardt\nIsEmailBounced: false\nFirstName: Selina\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: smcgettigan8kk@slideshare.net\nIsDeleted: false\nLastName: McGettigan\nIsEmailBounced: false\nFirstName: Sada\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: wdelafontvgn@businesswire.com\nIsDeleted: false\nLastName: Delafont\nIsEmailBounced: false\nFirstName: West\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: lbelsher9ne@indiatimes.com\nIsDeleted: false\nLastName: Belsher\nIsEmailBounced: false\nFirstName: Lou\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: cgoody27y@blogtalkradio.com\nIsDeleted: false\nLastName: Goody\nIsEmailBounced: false\nFirstName: Colene\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: cstodejzz@ucoz.ru\nIsDeleted: false\nLastName: Stode\nIsEmailBounced: false\nFirstName: Curcio\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: abromidgejb@china.com.cn\nIsDeleted: false\nLastName: Bromidge\nIsEmailBounced: false\nFirstName: Ariela\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ldelgardilloqvp@xrea.com\nIsDeleted: false\nLastName: Delgardillo\nIsEmailBounced: false\nFirstName: Lauralee\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: dcroal9t4@businessinsider.com\nIsDeleted: false\nLastName: Croal\nIsEmailBounced: false\nFirstName: Devlin\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: dclarageqzb@wordpress.com\nIsDeleted: false\nLastName: Clarage\nIsEmailBounced: false\nFirstName: Dre\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: dthirlwall3jf@taobao.com\nIsDeleted: false\nLastName: Thirlwall\nIsEmailBounced: false\nFirstName: Dareen\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: tkeddie2lj@wiley.com\nIsDeleted: false\nLastName: Keddie\nIsEmailBounced: false\nFirstName: Tandi\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: jrimingtoni3i@istockphoto.com\nIsDeleted: false\nLastName: Rimington\nIsEmailBounced: false\nFirstName: Judy\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: gtroynet@slashdot.org\nIsDeleted: false\nLastName: Troy\nIsEmailBounced: false\nFirstName: Gail\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: ebunneyh0n@meetup.com\nIsDeleted: false\nLastName: Bunney\nIsEmailBounced: false\nFirstName: Efren\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: yhaken8p3@slate.com\nIsDeleted: false\nLastName: Haken\nIsEmailBounced: false\nFirstName: Yard\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: nolliffeq6q@biblegateway.com\nIsDeleted: false\nLastName: Olliffe\nIsEmailBounced: false\nFirstName: Nani\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: bgalia9jz@odnoklassniki.ru\nIsDeleted: false\nLastName: Galia\nIsEmailBounced: false\nFirstName: Berrie\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: djedrzej3v1@google.com\nIsDeleted: false\nLastName: Jedrzej\nIsEmailBounced: false\nFirstName: Deanne\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: mcamiesh1t@fc2.com\nIsDeleted: false\nLastName: Camies\nIsEmailBounced: false\nFirstName: Mikaela\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: csunshineqni@state.tx.us\nIsDeleted: false\nLastName: Sunshine\nIsEmailBounced: false\nFirstName: Curtis\nIsPriorityRecord: false\nCleanStatus: Pending", + "Email: fiannellib46@marriott.com\nIsDeleted: false\nLastName: Iannelli\nIsEmailBounced: false\nFirstName: Felicio\nIsPriorityRecord: false\nCleanStatus: Pending" ], - "semantic_identifier": "Unknown Object", + "semantic_identifier": "Voonder", "metadata": {}, - "primary_owners": null, + "primary_owners": {"email": "hagen@danswer.ai", "first_name": "Hagen", "last_name": "oneill"}, "secondary_owners": null, "title": null } diff --git a/backend/tests/unit/ee/onyx/external_permissions/salesforce/test_postprocessing.py b/backend/tests/unit/ee/onyx/external_permissions/salesforce/test_postprocessing.py index a8a913336..615f8102d 100644 --- a/backend/tests/unit/ee/onyx/external_permissions/salesforce/test_postprocessing.py +++ b/backend/tests/unit/ee/onyx/external_permissions/salesforce/test_postprocessing.py @@ -5,8 +5,11 @@ from ee.onyx.external_permissions.salesforce.postprocessing import ( ) from onyx.configs.app_configs import BLURB_SIZE from onyx.configs.constants import DocumentSource +from onyx.connectors.salesforce.utils import BASE_DATA_PATH from onyx.context.search.models import InferenceChunk +SQLITE_DIR = BASE_DATA_PATH + def create_test_chunk( doc_id: str, @@ -39,6 +42,7 @@ def create_test_chunk( def test_validate_salesforce_access_single_object() -> None: """Test filtering when chunk has a single Salesforce object reference""" + section = "This is a test document about a Salesforce object." test_content = section test_chunk = create_test_chunk( diff --git a/backend/tests/unit/onyx/connectors/salesforce/test_salesforce_sqlite.py b/backend/tests/unit/onyx/connectors/salesforce/test_salesforce_sqlite.py index 3afc4f117..f8610bf75 100644 --- a/backend/tests/unit/onyx/connectors/salesforce/test_salesforce_sqlite.py +++ b/backend/tests/unit/onyx/connectors/salesforce/test_salesforce_sqlite.py @@ -113,15 +113,18 @@ _VALID_SALESFORCE_IDS = [ ] -def _clear_sf_db() -> None: +def _clear_sf_db(directory: str) -> None: """ Clears the SF DB by deleting all files in the data directory. """ - shutil.rmtree(BASE_DATA_PATH, ignore_errors=True) + shutil.rmtree(directory, ignore_errors=True) def _create_csv_file( - object_type: str, records: list[dict], filename: str = "test_data.csv" + directory: str, + object_type: str, + records: list[dict], + filename: str = "test_data.csv", ) -> None: """ Creates a CSV file for the given object type and records. @@ -149,10 +152,10 @@ def _create_csv_file( writer.writerow(record) # Update the database with the CSV - update_sf_db_with_csv(object_type, csv_path) + update_sf_db_with_csv(directory, object_type, csv_path) -def _create_csv_with_example_data() -> None: +def _create_csv_with_example_data(directory: str) -> None: """ Creates CSV files with example data, organized by object type. """ @@ -342,10 +345,10 @@ def _create_csv_with_example_data() -> None: # Create CSV files for each object type for object_type, records in example_data.items(): - _create_csv_file(object_type, records) + _create_csv_file(directory, object_type, records) -def _test_query() -> None: +def _test_query(directory: str) -> None: """ Tests querying functionality by verifying: 1. All expected Account IDs are found @@ -401,7 +404,7 @@ def _test_query() -> None: } # Get all Account IDs - account_ids = find_ids_by_type("Account") + account_ids = find_ids_by_type(directory, "Account") # Verify we found all expected accounts assert len(account_ids) == len( @@ -413,7 +416,7 @@ def _test_query() -> None: # Verify each account's data for acc_id in account_ids: - combined = get_record(acc_id) + combined = get_record(directory, acc_id) assert combined is not None, f"Could not find account {acc_id}" expected = expected_accounts[acc_id] @@ -428,7 +431,7 @@ def _test_query() -> None: print("All query tests passed successfully!") -def _test_upsert() -> None: +def _test_upsert(directory: str) -> None: """ Tests upsert functionality by: 1. Updating an existing account @@ -453,10 +456,10 @@ def _test_upsert() -> None: }, ] - _create_csv_file("Account", update_data, "update_data.csv") + _create_csv_file(directory, "Account", update_data, "update_data.csv") # Verify the update worked - updated_record = get_record(_VALID_SALESFORCE_IDS[0]) + updated_record = get_record(directory, _VALID_SALESFORCE_IDS[0]) assert updated_record is not None, "Updated record not found" assert updated_record.data["Name"] == "Acme Inc. Updated", "Name not updated" assert ( @@ -464,7 +467,7 @@ def _test_upsert() -> None: ), "Description not added" # Verify the new record was created - new_record = get_record(_VALID_SALESFORCE_IDS[2]) + new_record = get_record(directory, _VALID_SALESFORCE_IDS[2]) assert new_record is not None, "New record not found" assert new_record.data["Name"] == "New Company Inc.", "New record name incorrect" assert new_record.data["AnnualRevenue"] == "1000000", "New record revenue incorrect" @@ -472,7 +475,7 @@ def _test_upsert() -> None: print("All upsert tests passed successfully!") -def _test_relationships() -> None: +def _test_relationships(directory: str) -> None: """ Tests relationship shelf updates and queries by: 1. Creating test data with relationships @@ -513,11 +516,11 @@ def _test_relationships() -> None: # Create and update CSV files for each object type for object_type, records in test_data.items(): - _create_csv_file(object_type, records, "relationship_test.csv") + _create_csv_file(directory, object_type, records, "relationship_test.csv") # Test relationship queries # All these objects should be children of Acme Inc. - child_ids = get_child_ids(_VALID_SALESFORCE_IDS[0]) + child_ids = get_child_ids(directory, _VALID_SALESFORCE_IDS[0]) assert len(child_ids) == 4, f"Expected 4 child objects, found {len(child_ids)}" assert _VALID_SALESFORCE_IDS[13] in child_ids, "Case 1 not found in relationship" assert _VALID_SALESFORCE_IDS[14] in child_ids, "Case 2 not found in relationship" @@ -527,7 +530,7 @@ def _test_relationships() -> None: ), "Opportunity not found in relationship" # Test querying relationships for a different account (should be empty) - other_account_children = get_child_ids(_VALID_SALESFORCE_IDS[1]) + other_account_children = get_child_ids(directory, _VALID_SALESFORCE_IDS[1]) assert ( len(other_account_children) == 0 ), "Expected no children for different account" @@ -535,7 +538,7 @@ def _test_relationships() -> None: print("All relationship tests passed successfully!") -def _test_account_with_children() -> None: +def _test_account_with_children(directory: str) -> None: """ Tests querying all accounts and retrieving their child objects. This test verifies that: @@ -544,16 +547,16 @@ def _test_account_with_children() -> None: 3. Child object data is complete and accurate """ # First get all account IDs - account_ids = find_ids_by_type("Account") + account_ids = find_ids_by_type(directory, "Account") assert len(account_ids) > 0, "No accounts found" # For each account, get its children and verify the data for account_id in account_ids: - account = get_record(account_id) + account = get_record(directory, account_id) assert account is not None, f"Could not find account {account_id}" # Get all child objects - child_ids = get_child_ids(account_id) + child_ids = get_child_ids(directory, account_id) # For Acme Inc., verify specific relationships if account_id == _VALID_SALESFORCE_IDS[0]: # Acme Inc. @@ -564,7 +567,7 @@ def _test_account_with_children() -> None: # Get all child records child_records = [] for child_id in child_ids: - child_record = get_record(child_id) + child_record = get_record(directory, child_id) if child_record is not None: child_records.append(child_record) # Verify Cases @@ -599,7 +602,7 @@ def _test_account_with_children() -> None: print("All account with children tests passed successfully!") -def _test_relationship_updates() -> None: +def _test_relationship_updates(directory: str) -> None: """ Tests that relationships are properly updated when a child object's parent reference changes. This test verifies: @@ -616,10 +619,10 @@ def _test_relationship_updates() -> None: "LastName": "Contact", } ] - _create_csv_file("Contact", initial_contact, "initial_contact.csv") + _create_csv_file(directory, "Contact", initial_contact, "initial_contact.csv") # Verify initial relationship - acme_children = get_child_ids(_VALID_SALESFORCE_IDS[0]) + acme_children = get_child_ids(directory, _VALID_SALESFORCE_IDS[0]) assert ( _VALID_SALESFORCE_IDS[40] in acme_children ), "Initial relationship not created" @@ -633,22 +636,22 @@ def _test_relationship_updates() -> None: "LastName": "Contact", } ] - _create_csv_file("Contact", updated_contact, "updated_contact.csv") + _create_csv_file(directory, "Contact", updated_contact, "updated_contact.csv") # Verify old relationship is removed - acme_children = get_child_ids(_VALID_SALESFORCE_IDS[0]) + acme_children = get_child_ids(directory, _VALID_SALESFORCE_IDS[0]) assert ( _VALID_SALESFORCE_IDS[40] not in acme_children ), "Old relationship not removed" # Verify new relationship is created - globex_children = get_child_ids(_VALID_SALESFORCE_IDS[1]) + globex_children = get_child_ids(directory, _VALID_SALESFORCE_IDS[1]) assert _VALID_SALESFORCE_IDS[40] in globex_children, "New relationship not created" print("All relationship update tests passed successfully!") -def _test_get_affected_parent_ids() -> None: +def _test_get_affected_parent_ids(directory: str) -> None: """ Tests get_affected_parent_ids functionality by verifying: 1. IDs that are directly in the parent_types list are included @@ -683,13 +686,13 @@ def _test_get_affected_parent_ids() -> None: # Create and update CSV files for test data for object_type, records in test_data.items(): - _create_csv_file(object_type, records) + _create_csv_file(directory, object_type, records) # Test Case 1: Account directly in updated_ids and parent_types updated_ids = [_VALID_SALESFORCE_IDS[1]] # Parent Account 2 parent_types = ["Account"] affected_ids_by_type = dict( - get_affected_parent_ids_by_type(updated_ids, parent_types) + get_affected_parent_ids_by_type(directory, updated_ids, parent_types) ) assert "Account" in affected_ids_by_type, "Account type not in affected_ids_by_type" assert ( @@ -700,7 +703,7 @@ def _test_get_affected_parent_ids() -> None: updated_ids = [_VALID_SALESFORCE_IDS[40]] # Child Contact parent_types = ["Account"] affected_ids_by_type = dict( - get_affected_parent_ids_by_type(updated_ids, parent_types) + get_affected_parent_ids_by_type(directory, updated_ids, parent_types) ) assert "Account" in affected_ids_by_type, "Account type not in affected_ids_by_type" assert ( @@ -711,7 +714,7 @@ def _test_get_affected_parent_ids() -> None: updated_ids = [_VALID_SALESFORCE_IDS[1], _VALID_SALESFORCE_IDS[40]] # Both cases parent_types = ["Account"] affected_ids_by_type = dict( - get_affected_parent_ids_by_type(updated_ids, parent_types) + get_affected_parent_ids_by_type(directory, updated_ids, parent_types) ) assert "Account" in affected_ids_by_type, "Account type not in affected_ids_by_type" affected_ids = affected_ids_by_type["Account"] @@ -726,7 +729,7 @@ def _test_get_affected_parent_ids() -> None: updated_ids = [_VALID_SALESFORCE_IDS[40]] # Child Contact parent_types = ["Opportunity"] # Wrong type affected_ids_by_type = dict( - get_affected_parent_ids_by_type(updated_ids, parent_types) + get_affected_parent_ids_by_type(directory, updated_ids, parent_types) ) assert len(affected_ids_by_type) == 0, "Should return empty dict when no matches" @@ -734,13 +737,15 @@ def _test_get_affected_parent_ids() -> None: def test_salesforce_sqlite() -> None: - _clear_sf_db() - init_db() - _create_csv_with_example_data() - _test_query() - _test_upsert() - _test_relationships() - _test_account_with_children() - _test_relationship_updates() - _test_get_affected_parent_ids() - _clear_sf_db() + directory = BASE_DATA_PATH + + _clear_sf_db(directory) + init_db(directory) + _create_csv_with_example_data(directory) + _test_query(directory) + _test_upsert(directory) + _test_relationships(directory) + _test_account_with_children(directory) + _test_relationship_updates(directory) + _test_get_affected_parent_ids(directory) + _clear_sf_db(directory)