diff --git a/backend/onyx/connectors/salesforce/connector.py b/backend/onyx/connectors/salesforce/connector.py index 6fa17c327..02f1aac0f 100644 --- a/backend/onyx/connectors/salesforce/connector.py +++ b/backend/onyx/connectors/salesforce/connector.py @@ -1,3 +1,4 @@ +import gc import os from typing import Any @@ -70,6 +71,9 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): end: SecondsSinceUnixEpoch | None = None, ) -> GenerateDocumentsOutput: init_db() + + gc.collect() + all_object_types: set[str] = set(self.parent_object_list) logger.info(f"Starting with {len(self.parent_object_list)} parent object types") @@ -91,8 +95,11 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): logger.info(f"Found total of {len(all_object_types)} object types to fetch") logger.debug(f"All object types: {all_object_types}") + gc.collect() + # checkpoint - we've found all object types, now time to fetch the data logger.info("Starting to fetch CSVs for all object types") + # This takes like 30 minutes first time and <2 minutes for updates object_type_to_csv_path = fetch_all_csvs_in_parallel( sf_client=self.sf_client, @@ -101,6 +108,8 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): end=end, ) + gc.collect() + updated_ids: set[str] = set() # This takes like 10 seconds # This is for testing the rest of the functionality if data has @@ -132,6 +141,8 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): f"Added {len(new_ids)} new/updated records for {object_type}" ) + gc.collect() + logger.info(f"Found {len(updated_ids)} total updated records") logger.info( f"Starting to process parent objects of types: {self.parent_object_list}" @@ -166,6 +177,8 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector): yield docs_to_yield docs_to_yield = [] + gc.collect() + yield docs_to_yield def load_from_state(self) -> GenerateDocumentsOutput: diff --git a/backend/onyx/indexing/indexing_pipeline.py b/backend/onyx/indexing/indexing_pipeline.py index f4a6e0075..45201d3db 100644 --- a/backend/onyx/indexing/indexing_pipeline.py +++ b/backend/onyx/indexing/indexing_pipeline.py @@ -583,10 +583,12 @@ def index_doc_batch( for chunk in chunks_with_embeddings ] - logger.debug( - "Indexing the following chunks: " - f"{[chunk.to_short_descriptor() for chunk in access_aware_chunks]}" - ) + short_descriptor_list = [ + chunk.to_short_descriptor() for chunk in access_aware_chunks + ] + short_descriptor_log = str(short_descriptor_list)[:1024] + logger.debug(f"Indexing the following chunks: {short_descriptor_log}") + # A document will not be spread across different batches, so all the # documents with chunks in this set, are fully represented by the chunks # in this set