diff --git a/backend/onyx/connectors/salesforce/connector.py b/backend/onyx/connectors/salesforce/connector.py index 02f1aac0f..d150b3aed 100644 --- a/backend/onyx/connectors/salesforce/connector.py +++ b/backend/onyx/connectors/salesforce/connector.py @@ -1,5 +1,7 @@ import gc import os +import tempfile +from pathlib import Path from typing import Any from simple_salesforce import Salesforce @@ -101,53 +103,79 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): logger.info("Starting to fetch CSVs for all object types") # This takes like 30 minutes first time and <2 minutes for updates - object_type_to_csv_path = fetch_all_csvs_in_parallel( - sf_client=self.sf_client, - object_types=all_object_types, - start=start, - end=end, - ) + with tempfile.TemporaryDirectory() as temp_dir: + object_type_to_csv_path = fetch_all_csvs_in_parallel( + sf_client=self.sf_client, + object_types=all_object_types, + start=start, + end=end, + target_dir=temp_dir, + ) - gc.collect() + # print useful information + num_csvs = 0 + num_bytes = 0 + for object_type, csv_paths in object_type_to_csv_path.items(): + if not csv_paths: + continue - updated_ids: set[str] = set() - # This takes like 10 seconds - # This is for testing the rest of the functionality if data has - # already been fetched and put in sqlite - # from import onyx.connectors.salesforce.sf_db.sqlite_functions find_ids_by_type - # for object_type in self.parent_object_list: - # updated_ids.update(list(find_ids_by_type(object_type))) + for csv_path in csv_paths: + if not csv_path: + continue - # This takes 10-70 minutes first time (idk why the range is so big) - total_types = len(object_type_to_csv_path) - logger.info(f"Starting to process {total_types} object types") + file_path = Path(csv_path) + file_size = file_path.stat().st_size + num_csvs += 1 + num_bytes += file_size + logger.info( + f"CSV info: object_type={object_type} path={csv_path} bytes={file_size}" + ) - for i, (object_type, csv_paths) in enumerate( - object_type_to_csv_path.items(), 1 - ): - logger.info(f"Processing object type {object_type} ({i}/{total_types})") - # If path is None, it means it failed to fetch the csv - if csv_paths is None: - continue - # Go through each csv path and use it to update the db - for csv_path in csv_paths: - logger.debug(f"Updating {object_type} with {csv_path}") - new_ids = update_sf_db_with_csv( - object_type=object_type, - csv_download_path=csv_path, - ) - updated_ids.update(new_ids) - logger.debug( - f"Added {len(new_ids)} new/updated records for {object_type}" - ) + logger.info( + f"CSV info total: total_csvs={num_csvs} total_bytes={num_bytes}" + ) - gc.collect() + gc.collect() + + updated_ids: set[str] = set() + # This takes like 10 seconds + # This is for testing the rest of the functionality if data has + # already been fetched and put in sqlite + # from import onyx.connectors.salesforce.sf_db.sqlite_functions find_ids_by_type + # for object_type in self.parent_object_list: + # updated_ids.update(list(find_ids_by_type(object_type))) + + # This takes 10-70 minutes first time (idk why the range is so big) + total_types = len(object_type_to_csv_path) + logger.info(f"Starting to process {total_types} object types") + + for i, (object_type, csv_paths) in enumerate( + object_type_to_csv_path.items(), 1 + ): + logger.info(f"Processing object type {object_type} ({i}/{total_types})") + # If path is None, it means it failed to fetch the csv + if csv_paths is None: + continue + # Go through each csv path and use it to update the db + for csv_path in csv_paths: + logger.debug(f"Updating {object_type} with {csv_path}") + new_ids = update_sf_db_with_csv( + object_type=object_type, + csv_download_path=csv_path, + ) + updated_ids.update(new_ids) + logger.debug( + f"Added {len(new_ids)} new/updated records for {object_type}" + ) + + gc.collect() logger.info(f"Found {len(updated_ids)} total updated records") logger.info( f"Starting to process parent objects of types: {self.parent_object_list}" ) + num_object_batches = 0 docs_to_yield: list[Document] = [] docs_processed = 0 # Takes 15-20 seconds per batch @@ -155,8 +183,9 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): updated_ids=list(updated_ids), parent_types=self.parent_object_list, ): + num_object_batches += 1 logger.info( - f"Processing batch of {len(parent_id_batch)} {parent_type} objects" + f"Processing batch: index={num_object_batches} object_type={parent_type} len={len(parent_id_batch)}" ) for parent_id in parent_id_batch: if not (parent_object := get_record(parent_id, parent_type)): @@ -222,7 +251,7 @@ if __name__ == "__main__": "sf_security_token": os.environ["SF_SECURITY_TOKEN"], } ) - start_time = time.time() + start_time = time.monotonic() doc_count = 0 section_count = 0 text_count = 0 @@ -234,7 +263,7 @@ if __name__ == "__main__": for section in doc.sections: if isinstance(section, TextSection) and section.text is not None: text_count += len(section.text) - end_time = time.time() + end_time = time.monotonic() print(f"Doc count: {doc_count}") print(f"Section count: {section_count}") diff --git a/backend/onyx/connectors/salesforce/salesforce_calls.py b/backend/onyx/connectors/salesforce/salesforce_calls.py index 858c240b3..1fcae2f66 100644 --- a/backend/onyx/connectors/salesforce/salesforce_calls.py +++ b/backend/onyx/connectors/salesforce/salesforce_calls.py @@ -133,15 +133,13 @@ def _build_bulk_query(sf_client: Salesforce, sf_type: str, time_filter: str) -> def _bulk_retrieve_from_salesforce( - sf_client: Salesforce, - sf_type: str, - time_filter: str, + sf_client: Salesforce, sf_type: str, time_filter: str, target_dir: str ) -> tuple[str, list[str] | None]: if not _check_if_object_type_is_empty(sf_client, sf_type, time_filter): return sf_type, None - if existing_csvs := _check_for_existing_csvs(sf_type): - return sf_type, existing_csvs + # if existing_csvs := _check_for_existing_csvs(sf_type): + # return sf_type, existing_csvs query = _build_bulk_query(sf_client, sf_type, time_filter) @@ -165,7 +163,7 @@ def _bulk_retrieve_from_salesforce( # This downloads the file to a file in the target path with a random name results = bulk_2_type.download( query=query, - path=get_object_type_path(sf_type), + path=target_dir, max_records=1000000, ) all_download_paths = [result["file"] for result in results] @@ -181,6 +179,7 @@ def fetch_all_csvs_in_parallel( object_types: set[str], start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None, + target_dir: str, ) -> dict[str, list[str] | None]: """ Fetches all the csvs in parallel for the given object types @@ -207,6 +206,7 @@ def fetch_all_csvs_in_parallel( sf_client=sf_client, sf_type=object_type, time_filter=time_filter_for_each_object_type[object_type], + target_dir=target_dir, ), object_types, )