small refactoring for temp directories

This commit is contained in:
Richard Kuo (Onyx) 2025-03-13 19:46:02 -07:00
parent 6860a7458b
commit 27d39ff341
2 changed files with 74 additions and 45 deletions

View File

@ -1,5 +1,7 @@
import gc
import os
import tempfile
from pathlib import Path
from typing import Any
from simple_salesforce import Salesforce
@ -101,53 +103,79 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
logger.info("Starting to fetch CSVs for all object types")
# This takes like 30 minutes first time and <2 minutes for updates
object_type_to_csv_path = fetch_all_csvs_in_parallel(
sf_client=self.sf_client,
object_types=all_object_types,
start=start,
end=end,
)
with tempfile.TemporaryDirectory() as temp_dir:
object_type_to_csv_path = fetch_all_csvs_in_parallel(
sf_client=self.sf_client,
object_types=all_object_types,
start=start,
end=end,
target_dir=temp_dir,
)
gc.collect()
# print useful information
num_csvs = 0
num_bytes = 0
for object_type, csv_paths in object_type_to_csv_path.items():
if not csv_paths:
continue
updated_ids: set[str] = set()
# This takes like 10 seconds
# This is for testing the rest of the functionality if data has
# already been fetched and put in sqlite
# from import onyx.connectors.salesforce.sf_db.sqlite_functions find_ids_by_type
# for object_type in self.parent_object_list:
# updated_ids.update(list(find_ids_by_type(object_type)))
for csv_path in csv_paths:
if not csv_path:
continue
# This takes 10-70 minutes first time (idk why the range is so big)
total_types = len(object_type_to_csv_path)
logger.info(f"Starting to process {total_types} object types")
file_path = Path(csv_path)
file_size = file_path.stat().st_size
num_csvs += 1
num_bytes += file_size
logger.info(
f"CSV info: object_type={object_type} path={csv_path} bytes={file_size}"
)
for i, (object_type, csv_paths) in enumerate(
object_type_to_csv_path.items(), 1
):
logger.info(f"Processing object type {object_type} ({i}/{total_types})")
# If path is None, it means it failed to fetch the csv
if csv_paths is None:
continue
# Go through each csv path and use it to update the db
for csv_path in csv_paths:
logger.debug(f"Updating {object_type} with {csv_path}")
new_ids = update_sf_db_with_csv(
object_type=object_type,
csv_download_path=csv_path,
)
updated_ids.update(new_ids)
logger.debug(
f"Added {len(new_ids)} new/updated records for {object_type}"
)
logger.info(
f"CSV info total: total_csvs={num_csvs} total_bytes={num_bytes}"
)
gc.collect()
gc.collect()
updated_ids: set[str] = set()
# This takes like 10 seconds
# This is for testing the rest of the functionality if data has
# already been fetched and put in sqlite
# from import onyx.connectors.salesforce.sf_db.sqlite_functions find_ids_by_type
# for object_type in self.parent_object_list:
# updated_ids.update(list(find_ids_by_type(object_type)))
# This takes 10-70 minutes first time (idk why the range is so big)
total_types = len(object_type_to_csv_path)
logger.info(f"Starting to process {total_types} object types")
for i, (object_type, csv_paths) in enumerate(
object_type_to_csv_path.items(), 1
):
logger.info(f"Processing object type {object_type} ({i}/{total_types})")
# If path is None, it means it failed to fetch the csv
if csv_paths is None:
continue
# Go through each csv path and use it to update the db
for csv_path in csv_paths:
logger.debug(f"Updating {object_type} with {csv_path}")
new_ids = update_sf_db_with_csv(
object_type=object_type,
csv_download_path=csv_path,
)
updated_ids.update(new_ids)
logger.debug(
f"Added {len(new_ids)} new/updated records for {object_type}"
)
gc.collect()
logger.info(f"Found {len(updated_ids)} total updated records")
logger.info(
f"Starting to process parent objects of types: {self.parent_object_list}"
)
num_object_batches = 0
docs_to_yield: list[Document] = []
docs_processed = 0
# Takes 15-20 seconds per batch
@ -155,8 +183,9 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
updated_ids=list(updated_ids),
parent_types=self.parent_object_list,
):
num_object_batches += 1
logger.info(
f"Processing batch of {len(parent_id_batch)} {parent_type} objects"
f"Processing batch: index={num_object_batches} object_type={parent_type} len={len(parent_id_batch)}"
)
for parent_id in parent_id_batch:
if not (parent_object := get_record(parent_id, parent_type)):
@ -222,7 +251,7 @@ if __name__ == "__main__":
"sf_security_token": os.environ["SF_SECURITY_TOKEN"],
}
)
start_time = time.time()
start_time = time.monotonic()
doc_count = 0
section_count = 0
text_count = 0
@ -234,7 +263,7 @@ if __name__ == "__main__":
for section in doc.sections:
if isinstance(section, TextSection) and section.text is not None:
text_count += len(section.text)
end_time = time.time()
end_time = time.monotonic()
print(f"Doc count: {doc_count}")
print(f"Section count: {section_count}")

View File

@ -133,15 +133,13 @@ def _build_bulk_query(sf_client: Salesforce, sf_type: str, time_filter: str) ->
def _bulk_retrieve_from_salesforce(
sf_client: Salesforce,
sf_type: str,
time_filter: str,
sf_client: Salesforce, sf_type: str, time_filter: str, target_dir: str
) -> tuple[str, list[str] | None]:
if not _check_if_object_type_is_empty(sf_client, sf_type, time_filter):
return sf_type, None
if existing_csvs := _check_for_existing_csvs(sf_type):
return sf_type, existing_csvs
# if existing_csvs := _check_for_existing_csvs(sf_type):
# return sf_type, existing_csvs
query = _build_bulk_query(sf_client, sf_type, time_filter)
@ -165,7 +163,7 @@ def _bulk_retrieve_from_salesforce(
# This downloads the file to a file in the target path with a random name
results = bulk_2_type.download(
query=query,
path=get_object_type_path(sf_type),
path=target_dir,
max_records=1000000,
)
all_download_paths = [result["file"] for result in results]
@ -181,6 +179,7 @@ def fetch_all_csvs_in_parallel(
object_types: set[str],
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
target_dir: str,
) -> dict[str, list[str] | None]:
"""
Fetches all the csvs in parallel for the given object types
@ -207,6 +206,7 @@ def fetch_all_csvs_in_parallel(
sf_client=sf_client,
sf_type=object_type,
time_filter=time_filter_for_each_object_type[object_type],
target_dir=target_dir,
),
object_types,
)