add some gc

This commit is contained in:
Richard Kuo (Onyx) 2025-03-13 09:43:34 -07:00
parent a2ac9f02fb
commit 6860a7458b
2 changed files with 19 additions and 4 deletions

View File

@ -1,3 +1,4 @@
import gc
import os
from typing import Any
@ -70,6 +71,9 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
end: SecondsSinceUnixEpoch | None = None,
) -> GenerateDocumentsOutput:
init_db()
gc.collect()
all_object_types: set[str] = set(self.parent_object_list)
logger.info(f"Starting with {len(self.parent_object_list)} parent object types")
@ -91,8 +95,11 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
logger.info(f"Found total of {len(all_object_types)} object types to fetch")
logger.debug(f"All object types: {all_object_types}")
gc.collect()
# checkpoint - we've found all object types, now time to fetch the data
logger.info("Starting to fetch CSVs for all object types")
# This takes like 30 minutes first time and <2 minutes for updates
object_type_to_csv_path = fetch_all_csvs_in_parallel(
sf_client=self.sf_client,
@ -101,6 +108,8 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
end=end,
)
gc.collect()
updated_ids: set[str] = set()
# This takes like 10 seconds
# This is for testing the rest of the functionality if data has
@ -132,6 +141,8 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
f"Added {len(new_ids)} new/updated records for {object_type}"
)
gc.collect()
logger.info(f"Found {len(updated_ids)} total updated records")
logger.info(
f"Starting to process parent objects of types: {self.parent_object_list}"
@ -166,6 +177,8 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
yield docs_to_yield
docs_to_yield = []
gc.collect()
yield docs_to_yield
def load_from_state(self) -> GenerateDocumentsOutput:

View File

@ -583,10 +583,12 @@ def index_doc_batch(
for chunk in chunks_with_embeddings
]
logger.debug(
"Indexing the following chunks: "
f"{[chunk.to_short_descriptor() for chunk in access_aware_chunks]}"
)
short_descriptor_list = [
chunk.to_short_descriptor() for chunk in access_aware_chunks
]
short_descriptor_log = str(short_descriptor_list)[:1024]
logger.debug(f"Indexing the following chunks: {short_descriptor_log}")
# A document will not be spread across different batches, so all the
# documents with chunks in this set, are fully represented by the chunks
# in this set