diff --git a/backend/onyx/connectors/salesforce/connector.py b/backend/onyx/connectors/salesforce/connector.py index 0fc9a811c4c..2462b1ab0fc 100644 --- a/backend/onyx/connectors/salesforce/connector.py +++ b/backend/onyx/connectors/salesforce/connector.py @@ -145,10 +145,12 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): num_csvs += 1 num_bytes += file_size logger.info( - f"CSV info: object_type={object_type} path={csv_path} bytes={file_size}" + f"CSV download: object_type={object_type} path={csv_path} bytes={file_size}" ) - logger.info(f"CSV info total: total_csvs={num_csvs} total_bytes={num_bytes}") + logger.info( + f"CSV download total: total_csvs={num_csvs} total_bytes={num_bytes}" + ) @staticmethod def _get_all_types(parent_types: list[str], sf_client: Salesforce) -> set[str]: @@ -253,6 +255,7 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): ) os.remove(csv_path) + gc.collect() gc.collect() diff --git a/backend/onyx/connectors/salesforce/salesforce_calls.py b/backend/onyx/connectors/salesforce/salesforce_calls.py index 51ff6dc6a9e..d6855934f93 100644 --- a/backend/onyx/connectors/salesforce/salesforce_calls.py +++ b/backend/onyx/connectors/salesforce/salesforce_calls.py @@ -1,3 +1,4 @@ +import gc import os from concurrent.futures import ThreadPoolExecutor from datetime import datetime @@ -138,18 +139,29 @@ def _bulk_retrieve_from_salesforce( query = _build_bulk_query(sf_client, sf_type, time_filter) - bulk_2_handler = SFBulk2Handler( + bulk_2_handler: SFBulk2Handler | None = SFBulk2Handler( session_id=sf_client.session_id, bulk2_url=sf_client.bulk2_url, proxies=sf_client.proxies, session=sf_client.session, ) - bulk_2_type = SFBulk2Type( + if not bulk_2_handler: + return sf_type, None + + # NOTE(rkuo): there are signs this download is allocating large + # amounts of memory instead of streaming the results to disk. + # we're doing a gc.collect to try and mitigate this. + + # see https://github.com/simple-salesforce/simple-salesforce/issues/428 for a + # possible solution + bulk_2_type: SFBulk2Type | None = SFBulk2Type( object_name=sf_type, bulk2_url=bulk_2_handler.bulk2_url, headers=bulk_2_handler.headers, session=bulk_2_handler.session, ) + if not bulk_2_type: + return sf_type, None logger.info(f"Downloading {sf_type}") @@ -160,7 +172,7 @@ def _bulk_retrieve_from_salesforce( results = bulk_2_type.download( query=query, path=target_dir, - max_records=1000000, + max_records=500000, ) # prepend each downloaded csv with the object type (delimiter = '.') @@ -172,14 +184,19 @@ def _bulk_retrieve_from_salesforce( new_file_path = os.path.join(directory, new_filename) os.rename(original_file_path, new_file_path) all_download_paths.append(new_file_path) - logger.info(f"Downloaded {sf_type} to {all_download_paths}") - return sf_type, all_download_paths except Exception as e: logger.error( f"Failed to download salesforce csv for object type {sf_type}: {e}" ) logger.warning(f"Exceptioning query for object type {sf_type}: {query}") return sf_type, None + finally: + bulk_2_handler = None + bulk_2_type = None + gc.collect() + + logger.info(f"Downloaded {sf_type} to {all_download_paths}") + return sf_type, all_download_paths def fetch_all_csvs_in_parallel( @@ -229,7 +246,8 @@ def fetch_all_csvs_in_parallel( time_filter_for_each_object_type[sf_type] = last_modified_time_filter # Run the bulk retrieve in parallel - with ThreadPoolExecutor() as executor: + # limit to 4 to help with memory usage + with ThreadPoolExecutor(max_workers=4) as executor: results = executor.map( lambda object_type: _bulk_retrieve_from_salesforce( sf_client=sf_client, diff --git a/backend/tests/daily/connectors/web/test_web_connector.py b/backend/tests/daily/connectors/web/test_web_connector.py index 7c987013470..7f852eaccbb 100644 --- a/backend/tests/daily/connectors/web/test_web_connector.py +++ b/backend/tests/daily/connectors/web/test_web_connector.py @@ -53,6 +53,12 @@ def test_web_connector_no_scroll(quotes_to_scroll_web_connector: WebConnector) - MERCURY_EXPECTED_QUOTE = "How can we help?" +@pytest.mark.xfail( + reason=( + "flaky. maybe we can improve how we avoid triggering bot protection or" + "maybe this is just how it has to be." + ), +) def test_web_connector_bot_protection() -> None: connector = WebConnector( base_url="https://support.mercury.com/hc",