mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-08-28 14:43:37 +02:00
restructure to signal activity while processing (#4712)
Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
This commit is contained in:
@@ -150,56 +150,6 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
|
||||
logger.info(f"CSV info total: total_csvs={num_csvs} total_bytes={num_bytes}")
|
||||
|
||||
@staticmethod
|
||||
def _load_csvs_to_db(csv_directory: str, sf_db: OnyxSalesforceSQLite) -> set[str]:
|
||||
updated_ids: set[str] = set()
|
||||
|
||||
object_type_to_csv_path = SalesforceConnector.reconstruct_object_types(
|
||||
csv_directory
|
||||
)
|
||||
|
||||
# NOTE(rkuo): this timing note is meaningless without a reference point in terms
|
||||
# of number of records, etc
|
||||
# This takes like 10 seconds
|
||||
|
||||
# This is for testing the rest of the functionality if data has
|
||||
# already been fetched and put in sqlite
|
||||
# from import onyx.connectors.salesforce.sf_db.sqlite_functions find_ids_by_type
|
||||
# for object_type in self.parent_object_list:
|
||||
# updated_ids.update(list(find_ids_by_type(object_type)))
|
||||
|
||||
# This takes 10-70 minutes first time (idk why the range is so big)
|
||||
total_types = len(object_type_to_csv_path)
|
||||
logger.info(f"Starting to process {total_types} object types")
|
||||
|
||||
for i, (object_type, csv_paths) in enumerate(
|
||||
object_type_to_csv_path.items(), 1
|
||||
):
|
||||
logger.info(f"Processing object type {object_type} ({i}/{total_types})")
|
||||
# If path is None, it means it failed to fetch the csv
|
||||
if csv_paths is None:
|
||||
continue
|
||||
|
||||
# Go through each csv path and use it to update the db
|
||||
for csv_path in csv_paths:
|
||||
logger.debug(
|
||||
f"Processing CSV: object_type={object_type} "
|
||||
f"csv={csv_path} "
|
||||
f"len={Path(csv_path).stat().st_size}"
|
||||
)
|
||||
new_ids = sf_db.update_from_csv(
|
||||
object_type=object_type,
|
||||
csv_download_path=csv_path,
|
||||
)
|
||||
updated_ids.update(new_ids)
|
||||
logger.debug(
|
||||
f"Added {len(new_ids)} new/updated records for {object_type}"
|
||||
)
|
||||
|
||||
os.remove(csv_path)
|
||||
|
||||
return updated_ids
|
||||
|
||||
@staticmethod
|
||||
def _get_all_types(parent_types: list[str], sf_client: Salesforce) -> set[str]:
|
||||
all_types: set[str] = set(parent_types)
|
||||
@@ -236,6 +186,7 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
|
||||
updated_ids: set[str] = set()
|
||||
docs_processed = 0
|
||||
docs_to_yield: list[Document] = []
|
||||
|
||||
sf_db = OnyxSalesforceSQLite(os.path.join(temp_dir, "salesforce_db.sqlite"))
|
||||
sf_db.connect()
|
||||
@@ -266,7 +217,43 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
gc.collect()
|
||||
|
||||
# Step 2 - load CSV's to sqlite
|
||||
updated_ids = SalesforceConnector._load_csvs_to_db(temp_dir, sf_db)
|
||||
object_type_to_csv_paths = SalesforceConnector.reconstruct_object_types(
|
||||
temp_dir
|
||||
)
|
||||
|
||||
total_types = len(object_type_to_csv_paths)
|
||||
logger.info(f"Starting to process {total_types} object types")
|
||||
|
||||
for i, (object_type, csv_paths) in enumerate(
|
||||
object_type_to_csv_paths.items(), 1
|
||||
):
|
||||
logger.info(f"Processing object type {object_type} ({i}/{total_types})")
|
||||
# If path is None, it means it failed to fetch the csv
|
||||
if csv_paths is None:
|
||||
continue
|
||||
|
||||
# Go through each csv path and use it to update the db
|
||||
for csv_path in csv_paths:
|
||||
logger.debug(
|
||||
f"Processing CSV: object_type={object_type} "
|
||||
f"csv={csv_path} "
|
||||
f"len={Path(csv_path).stat().st_size}"
|
||||
)
|
||||
|
||||
# yield an empty list to keep the connector alive
|
||||
yield docs_to_yield
|
||||
|
||||
new_ids = sf_db.update_from_csv(
|
||||
object_type=object_type,
|
||||
csv_download_path=csv_path,
|
||||
)
|
||||
updated_ids.update(new_ids)
|
||||
logger.debug(
|
||||
f"Added {len(new_ids)} new/updated records for {object_type}"
|
||||
)
|
||||
|
||||
os.remove(csv_path)
|
||||
|
||||
gc.collect()
|
||||
|
||||
logger.info(f"Found {len(updated_ids)} total updated records")
|
||||
@@ -276,7 +263,6 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
|
||||
|
||||
# Step 3 - extract and index docs
|
||||
batches_processed = 0
|
||||
docs_to_yield: list[Document] = []
|
||||
docs_to_yield_bytes = 0
|
||||
|
||||
# Takes 15-20 seconds per batch
|
||||
|
@@ -103,7 +103,9 @@ def _summarize_image(
|
||||
return message_to_string(llm.invoke(messages))
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Summarization failed. Messages: {messages}") from e
|
||||
error_msg = f"Summarization failed. Messages: {messages}"
|
||||
error_msg = error_msg[:1024]
|
||||
raise ValueError(error_msg) from e
|
||||
|
||||
|
||||
def _encode_image_for_llm_prompt(image_data: bytes) -> str:
|
||||
|
Reference in New Issue
Block a user