From a44f289aedc3aaf9d639b0e0e61141dfe03d1bd2 Mon Sep 17 00:00:00 2001 From: rkuo-danswer Date: Wed, 14 May 2025 22:23:11 -0700 Subject: [PATCH] restructure to signal activity while processing (#4712) Co-authored-by: Richard Kuo (Onyx) --- .../onyx/connectors/salesforce/connector.py | 90 ++++++++----------- .../file_processing/image_summarization.py | 4 +- 2 files changed, 41 insertions(+), 53 deletions(-) diff --git a/backend/onyx/connectors/salesforce/connector.py b/backend/onyx/connectors/salesforce/connector.py index 9141e8fb242..0fc9a811c4c 100644 --- a/backend/onyx/connectors/salesforce/connector.py +++ b/backend/onyx/connectors/salesforce/connector.py @@ -150,56 +150,6 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): logger.info(f"CSV info total: total_csvs={num_csvs} total_bytes={num_bytes}") - @staticmethod - def _load_csvs_to_db(csv_directory: str, sf_db: OnyxSalesforceSQLite) -> set[str]: - updated_ids: set[str] = set() - - object_type_to_csv_path = SalesforceConnector.reconstruct_object_types( - csv_directory - ) - - # NOTE(rkuo): this timing note is meaningless without a reference point in terms - # of number of records, etc - # This takes like 10 seconds - - # This is for testing the rest of the functionality if data has - # already been fetched and put in sqlite - # from import onyx.connectors.salesforce.sf_db.sqlite_functions find_ids_by_type - # for object_type in self.parent_object_list: - # updated_ids.update(list(find_ids_by_type(object_type))) - - # This takes 10-70 minutes first time (idk why the range is so big) - total_types = len(object_type_to_csv_path) - logger.info(f"Starting to process {total_types} object types") - - for i, (object_type, csv_paths) in enumerate( - object_type_to_csv_path.items(), 1 - ): - logger.info(f"Processing object type {object_type} ({i}/{total_types})") - # If path is None, it means it failed to fetch the csv - if csv_paths is None: - continue - - # Go through each csv path and use it to update the db - for csv_path in csv_paths: - logger.debug( - f"Processing CSV: object_type={object_type} " - f"csv={csv_path} " - f"len={Path(csv_path).stat().st_size}" - ) - new_ids = sf_db.update_from_csv( - object_type=object_type, - csv_download_path=csv_path, - ) - updated_ids.update(new_ids) - logger.debug( - f"Added {len(new_ids)} new/updated records for {object_type}" - ) - - os.remove(csv_path) - - return updated_ids - @staticmethod def _get_all_types(parent_types: list[str], sf_client: Salesforce) -> set[str]: all_types: set[str] = set(parent_types) @@ -236,6 +186,7 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): updated_ids: set[str] = set() docs_processed = 0 + docs_to_yield: list[Document] = [] sf_db = OnyxSalesforceSQLite(os.path.join(temp_dir, "salesforce_db.sqlite")) sf_db.connect() @@ -266,7 +217,43 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): gc.collect() # Step 2 - load CSV's to sqlite - updated_ids = SalesforceConnector._load_csvs_to_db(temp_dir, sf_db) + object_type_to_csv_paths = SalesforceConnector.reconstruct_object_types( + temp_dir + ) + + total_types = len(object_type_to_csv_paths) + logger.info(f"Starting to process {total_types} object types") + + for i, (object_type, csv_paths) in enumerate( + object_type_to_csv_paths.items(), 1 + ): + logger.info(f"Processing object type {object_type} ({i}/{total_types})") + # If path is None, it means it failed to fetch the csv + if csv_paths is None: + continue + + # Go through each csv path and use it to update the db + for csv_path in csv_paths: + logger.debug( + f"Processing CSV: object_type={object_type} " + f"csv={csv_path} " + f"len={Path(csv_path).stat().st_size}" + ) + + # yield an empty list to keep the connector alive + yield docs_to_yield + + new_ids = sf_db.update_from_csv( + object_type=object_type, + csv_download_path=csv_path, + ) + updated_ids.update(new_ids) + logger.debug( + f"Added {len(new_ids)} new/updated records for {object_type}" + ) + + os.remove(csv_path) + gc.collect() logger.info(f"Found {len(updated_ids)} total updated records") @@ -276,7 +263,6 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): # Step 3 - extract and index docs batches_processed = 0 - docs_to_yield: list[Document] = [] docs_to_yield_bytes = 0 # Takes 15-20 seconds per batch diff --git a/backend/onyx/file_processing/image_summarization.py b/backend/onyx/file_processing/image_summarization.py index 69d4c3999c7..c8c6add996a 100644 --- a/backend/onyx/file_processing/image_summarization.py +++ b/backend/onyx/file_processing/image_summarization.py @@ -103,7 +103,9 @@ def _summarize_image( return message_to_string(llm.invoke(messages)) except Exception as e: - raise ValueError(f"Summarization failed. Messages: {messages}") from e + error_msg = f"Summarization failed. Messages: {messages}" + error_msg = error_msg[:1024] + raise ValueError(error_msg) from e def _encode_image_for_llm_prompt(image_data: bytes) -> str: