Bugfix/salesforce (#4335)

* add some gc

* small refactoring for temp directories

* WIP

* add some gc collects and size calculations

* un-xfail

* fix salesforce test

* loose check for number of docs

* adjust test again

* cleanup

* nuke directory param, remove using sqlite db to cache email / id mappings

---------

Co-authored-by: Richard Kuo (Onyx) <rkuo@onyx.app>
This commit is contained in:
rkuo-danswer 2025-04-04 09:21:34 -07:00 committed by GitHub
parent 68f9f157a6
commit 839c8611b7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 632 additions and 217 deletions

View File

@ -51,9 +51,9 @@ def _get_objects_access_for_user_email_from_salesforce(
# This is cached in the function so the first query takes an extra 0.1-0.3 seconds
# but subsequent queries by the same user are essentially instant
start_time = time.time()
start_time = time.monotonic()
user_id = get_salesforce_user_id_from_email(salesforce_client, user_email)
end_time = time.time()
end_time = time.monotonic()
logger.info(
f"Time taken to get Salesforce user ID: {end_time - start_time} seconds"
)

View File

@ -1,10 +1,6 @@
from simple_salesforce import Salesforce
from sqlalchemy.orm import Session
from onyx.connectors.salesforce.sqlite_functions import get_user_id_by_email
from onyx.connectors.salesforce.sqlite_functions import init_db
from onyx.connectors.salesforce.sqlite_functions import NULL_ID_STRING
from onyx.connectors.salesforce.sqlite_functions import update_email_to_id_table
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.document import get_cc_pairs_for_document
from onyx.utils.logger import setup_logger
@ -28,6 +24,8 @@ def get_any_salesforce_client_for_doc_id(
E.g. there are 2 different credential sets for 2 different salesforce cc_pairs
but only one has the permissions to access the permissions needed for the query.
"""
# NOTE: this global seems very very bad
global _ANY_SALESFORCE_CLIENT
if _ANY_SALESFORCE_CLIENT is None:
cc_pairs = get_cc_pairs_for_document(db_session, doc_id)
@ -84,35 +82,21 @@ def get_salesforce_user_id_from_email(
salesforce database. (Around 0.1-0.3 seconds)
If it's cached or stored in the local salesforce database, it's fast (<0.001 seconds).
"""
# NOTE: this global seems bad
global _CACHED_SF_EMAIL_TO_ID_MAP
if user_email in _CACHED_SF_EMAIL_TO_ID_MAP:
if _CACHED_SF_EMAIL_TO_ID_MAP[user_email] is not None:
return _CACHED_SF_EMAIL_TO_ID_MAP[user_email]
db_exists = True
try:
# Check if the user is already in the database
user_id = get_user_id_by_email(user_email)
except Exception:
init_db()
try:
user_id = get_user_id_by_email(user_email)
except Exception as e:
logger.error(f"Error checking if user is in database: {e}")
user_id = None
db_exists = False
# some caching via sqlite existed here before ... check history if interested
# ...query Salesforce and store the result in the database
user_id = _query_salesforce_user_id(sf_client, user_email)
# If no entry is found in the database (indicated by user_id being None)...
if user_id is None:
# ...query Salesforce and store the result in the database
user_id = _query_salesforce_user_id(sf_client, user_email)
if db_exists:
update_email_to_id_table(user_email, user_id)
return user_id
elif user_id is None:
return None
elif user_id == NULL_ID_STRING:
return None
# If the found user_id is real, cache it
_CACHED_SF_EMAIL_TO_ID_MAP[user_email] = user_id
return user_id

View File

@ -1,3 +1,4 @@
import sys
from datetime import datetime
from enum import Enum
from typing import Any
@ -40,6 +41,9 @@ class TextSection(Section):
text: str
link: str | None = None
def __sizeof__(self) -> int:
return sys.getsizeof(self.text) + sys.getsizeof(self.link)
class ImageSection(Section):
"""Section containing an image reference"""
@ -47,6 +51,9 @@ class ImageSection(Section):
image_file_name: str
link: str | None = None
def __sizeof__(self) -> int:
return sys.getsizeof(self.image_file_name) + sys.getsizeof(self.link)
class BasicExpertInfo(BaseModel):
"""Basic Information for the owner of a document, any of the fields can be left as None
@ -110,6 +117,14 @@ class BasicExpertInfo(BaseModel):
)
)
def __sizeof__(self) -> int:
size = sys.getsizeof(self.display_name)
size += sys.getsizeof(self.first_name)
size += sys.getsizeof(self.middle_initial)
size += sys.getsizeof(self.last_name)
size += sys.getsizeof(self.email)
return size
class DocumentBase(BaseModel):
"""Used for Onyx ingestion api, the ID is inferred before use if not provided"""
@ -163,6 +178,32 @@ class DocumentBase(BaseModel):
attributes.append(k + INDEX_SEPARATOR + v)
return attributes
def __sizeof__(self) -> int:
size = sys.getsizeof(self.id)
for section in self.sections:
size += sys.getsizeof(section)
size += sys.getsizeof(self.source)
size += sys.getsizeof(self.semantic_identifier)
size += sys.getsizeof(self.doc_updated_at)
size += sys.getsizeof(self.chunk_count)
if self.primary_owners is not None:
for primary_owner in self.primary_owners:
size += sys.getsizeof(primary_owner)
else:
size += sys.getsizeof(self.primary_owners)
if self.secondary_owners is not None:
for secondary_owner in self.secondary_owners:
size += sys.getsizeof(secondary_owner)
else:
size += sys.getsizeof(self.secondary_owners)
size += sys.getsizeof(self.title)
size += sys.getsizeof(self.from_ingestion_api)
size += sys.getsizeof(self.additional_info)
return size
def get_text_content(self) -> str:
return " ".join([section.text for section in self.sections if section.text])
@ -194,6 +235,12 @@ class Document(DocumentBase):
from_ingestion_api=base.from_ingestion_api,
)
def __sizeof__(self) -> int:
size = super().__sizeof__()
size += sys.getsizeof(self.id)
size += sys.getsizeof(self.source)
return size
class IndexingDocument(Document):
"""Document with processed sections for indexing"""

View File

@ -1,4 +1,9 @@
import gc
import os
import sys
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Any
from simple_salesforce import Salesforce
@ -21,9 +26,13 @@ from onyx.connectors.salesforce.salesforce_calls import get_all_children_of_sf_t
from onyx.connectors.salesforce.sqlite_functions import get_affected_parent_ids_by_type
from onyx.connectors.salesforce.sqlite_functions import get_record
from onyx.connectors.salesforce.sqlite_functions import init_db
from onyx.connectors.salesforce.sqlite_functions import sqlite_log_stats
from onyx.connectors.salesforce.sqlite_functions import update_sf_db_with_csv
from onyx.connectors.salesforce.utils import BASE_DATA_PATH
from onyx.connectors.salesforce.utils import get_sqlite_db_path
from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface
from onyx.utils.logger import setup_logger
from shared_configs.configs import MULTI_TENANT
logger = setup_logger()
@ -32,6 +41,8 @@ _DEFAULT_PARENT_OBJECT_TYPES = ["Account"]
class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
MAX_BATCH_BYTES = 1024 * 1024
def __init__(
self,
batch_size: int = INDEX_BATCH_SIZE,
@ -64,22 +75,45 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
raise ConnectorMissingCredentialError("Salesforce")
return self._sf_client
def _fetch_from_salesforce(
self,
@staticmethod
def reconstruct_object_types(directory: str) -> dict[str, list[str] | None]:
"""
Scans the given directory for all CSV files and reconstructs the available object types.
Assumes filenames are formatted as "ObjectType.filename.csv" or "ObjectType.csv".
Args:
directory (str): The path to the directory containing CSV files.
Returns:
dict[str, list[str]]: A dictionary mapping object types to lists of file paths.
"""
object_types = defaultdict(list)
for filename in os.listdir(directory):
if filename.endswith(".csv"):
parts = filename.split(".", 1) # Split on the first period
object_type = parts[0] # Take the first part as the object type
object_types[object_type].append(os.path.join(directory, filename))
return dict(object_types)
@staticmethod
def _download_object_csvs(
directory: str,
parent_object_list: list[str],
sf_client: Salesforce,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> GenerateDocumentsOutput:
init_db()
all_object_types: set[str] = set(self.parent_object_list)
) -> None:
all_object_types: set[str] = set(parent_object_list)
logger.info(f"Starting with {len(self.parent_object_list)} parent object types")
logger.debug(f"Parent object types: {self.parent_object_list}")
logger.info(
f"Parent object types: num={len(parent_object_list)} list={parent_object_list}"
)
# This takes like 20 seconds
for parent_object_type in self.parent_object_list:
child_types = get_all_children_of_sf_type(
self.sf_client, parent_object_type
)
for parent_object_type in parent_object_list:
child_types = get_all_children_of_sf_type(sf_client, parent_object_type)
all_object_types.update(child_types)
logger.debug(
f"Found {len(child_types)} child types for {parent_object_type}"
@ -88,20 +122,53 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
# Always want to make sure user is grabbed for permissioning purposes
all_object_types.add("User")
logger.info(f"Found total of {len(all_object_types)} object types to fetch")
logger.debug(f"All object types: {all_object_types}")
logger.info(
f"All object types: num={len(all_object_types)} list={all_object_types}"
)
# gc.collect()
# checkpoint - we've found all object types, now time to fetch the data
logger.info("Starting to fetch CSVs for all object types")
logger.info("Fetching CSVs for all object types")
# This takes like 30 minutes first time and <2 minutes for updates
object_type_to_csv_path = fetch_all_csvs_in_parallel(
sf_client=self.sf_client,
sf_client=sf_client,
object_types=all_object_types,
start=start,
end=end,
target_dir=directory,
)
# print useful information
num_csvs = 0
num_bytes = 0
for object_type, csv_paths in object_type_to_csv_path.items():
if not csv_paths:
continue
for csv_path in csv_paths:
if not csv_path:
continue
file_path = Path(csv_path)
file_size = file_path.stat().st_size
num_csvs += 1
num_bytes += file_size
logger.info(
f"CSV info: object_type={object_type} path={csv_path} bytes={file_size}"
)
logger.info(f"CSV info total: total_csvs={num_csvs} total_bytes={num_bytes}")
@staticmethod
def _load_csvs_to_db(csv_directory: str, db_directory: str) -> set[str]:
updated_ids: set[str] = set()
object_type_to_csv_path = SalesforceConnector.reconstruct_object_types(
csv_directory
)
# This takes like 10 seconds
# This is for testing the rest of the functionality if data has
# already been fetched and put in sqlite
@ -120,10 +187,16 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
# If path is None, it means it failed to fetch the csv
if csv_paths is None:
continue
# Go through each csv path and use it to update the db
for csv_path in csv_paths:
logger.debug(f"Updating {object_type} with {csv_path}")
logger.debug(
f"Processing CSV: object_type={object_type} "
f"csv={csv_path} "
f"len={Path(csv_path).stat().st_size}"
)
new_ids = update_sf_db_with_csv(
db_directory,
object_type=object_type,
csv_download_path=csv_path,
)
@ -132,49 +205,127 @@ class SalesforceConnector(LoadConnector, PollConnector, SlimConnector):
f"Added {len(new_ids)} new/updated records for {object_type}"
)
os.remove(csv_path)
return updated_ids
def _fetch_from_salesforce(
self,
temp_dir: str,
start: SecondsSinceUnixEpoch | None = None,
end: SecondsSinceUnixEpoch | None = None,
) -> GenerateDocumentsOutput:
logger.info("_fetch_from_salesforce starting.")
if not self._sf_client:
raise RuntimeError("self._sf_client is None!")
init_db(temp_dir)
sqlite_log_stats(temp_dir)
# Step 1 - download
SalesforceConnector._download_object_csvs(
temp_dir, self.parent_object_list, self._sf_client, start, end
)
gc.collect()
# Step 2 - load CSV's to sqlite
updated_ids = SalesforceConnector._load_csvs_to_db(temp_dir, temp_dir)
gc.collect()
logger.info(f"Found {len(updated_ids)} total updated records")
logger.info(
f"Starting to process parent objects of types: {self.parent_object_list}"
)
docs_to_yield: list[Document] = []
# Step 3 - extract and index docs
batches_processed = 0
docs_processed = 0
docs_to_yield: list[Document] = []
docs_to_yield_bytes = 0
# Takes 15-20 seconds per batch
for parent_type, parent_id_batch in get_affected_parent_ids_by_type(
temp_dir,
updated_ids=list(updated_ids),
parent_types=self.parent_object_list,
):
batches_processed += 1
logger.info(
f"Processing batch of {len(parent_id_batch)} {parent_type} objects"
f"Processing batch: index={batches_processed} "
f"object_type={parent_type} "
f"len={len(parent_id_batch)} "
f"processed={docs_processed} "
f"remaining={len(updated_ids) - docs_processed}"
)
for parent_id in parent_id_batch:
if not (parent_object := get_record(parent_id, parent_type)):
if not (parent_object := get_record(temp_dir, parent_id, parent_type)):
logger.warning(
f"Failed to get parent object {parent_id} for {parent_type}"
)
continue
docs_to_yield.append(
convert_sf_object_to_doc(
sf_object=parent_object,
sf_instance=self.sf_client.sf_instance,
)
doc = convert_sf_object_to_doc(
temp_dir,
sf_object=parent_object,
sf_instance=self.sf_client.sf_instance,
)
doc_sizeof = sys.getsizeof(doc)
docs_to_yield_bytes += doc_sizeof
docs_to_yield.append(doc)
docs_processed += 1
if len(docs_to_yield) >= self.batch_size:
# memory usage is sensitive to the input length, so we're yielding immediately
# if the batch exceeds a certain byte length
if (
len(docs_to_yield) >= self.batch_size
or docs_to_yield_bytes > SalesforceConnector.MAX_BATCH_BYTES
):
yield docs_to_yield
docs_to_yield = []
docs_to_yield_bytes = 0
# observed a memory leak / size issue with the account table if we don't gc.collect here.
gc.collect()
yield docs_to_yield
logger.info(
f"Final processing stats: "
f"processed={docs_processed} "
f"remaining={len(updated_ids) - docs_processed}"
)
def load_from_state(self) -> GenerateDocumentsOutput:
return self._fetch_from_salesforce()
if MULTI_TENANT:
# if multi tenant, we cannot expect the sqlite db to be cached/present
with tempfile.TemporaryDirectory() as temp_dir:
return self._fetch_from_salesforce(temp_dir)
# nuke the db since we're starting from scratch
sqlite_db_path = get_sqlite_db_path(BASE_DATA_PATH)
if os.path.exists(sqlite_db_path):
logger.info(f"load_from_state: Removing db at {sqlite_db_path}.")
os.remove(sqlite_db_path)
return self._fetch_from_salesforce(BASE_DATA_PATH)
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
return self._fetch_from_salesforce(start=start, end=end)
if MULTI_TENANT:
# if multi tenant, we cannot expect the sqlite db to be cached/present
with tempfile.TemporaryDirectory() as temp_dir:
return self._fetch_from_salesforce(temp_dir, start=start, end=end)
if start == 0:
# nuke the db if we're starting from scratch
sqlite_db_path = get_sqlite_db_path(BASE_DATA_PATH)
if os.path.exists(sqlite_db_path):
logger.info(
f"poll_source: Starting at time 0, removing db at {sqlite_db_path}."
)
os.remove(sqlite_db_path)
return self._fetch_from_salesforce(BASE_DATA_PATH)
def retrieve_all_slim_documents(
self,
@ -209,7 +360,7 @@ if __name__ == "__main__":
"sf_security_token": os.environ["SF_SECURITY_TOKEN"],
}
)
start_time = time.time()
start_time = time.monotonic()
doc_count = 0
section_count = 0
text_count = 0
@ -221,7 +372,7 @@ if __name__ == "__main__":
for section in doc.sections:
if isinstance(section, TextSection) and section.text is not None:
text_count += len(section.text)
end_time = time.time()
end_time = time.monotonic()
print(f"Doc count: {doc_count}")
print(f"Section count: {section_count}")

View File

@ -124,13 +124,14 @@ def _extract_section(salesforce_object: SalesforceObject, base_url: str) -> Text
def _extract_primary_owners(
directory: str,
sf_object: SalesforceObject,
) -> list[BasicExpertInfo] | None:
object_dict = sf_object.data
if not (last_modified_by_id := object_dict.get("LastModifiedById")):
logger.warning(f"No LastModifiedById found for {sf_object.id}")
return None
if not (last_modified_by := get_record(last_modified_by_id)):
if not (last_modified_by := get_record(directory, last_modified_by_id)):
logger.warning(f"No LastModifiedBy found for {last_modified_by_id}")
return None
@ -159,6 +160,7 @@ def _extract_primary_owners(
def convert_sf_object_to_doc(
directory: str,
sf_object: SalesforceObject,
sf_instance: str,
) -> Document:
@ -170,8 +172,8 @@ def convert_sf_object_to_doc(
extracted_semantic_identifier = object_dict.get("Name", "Unknown Object")
sections = [_extract_section(sf_object, base_url)]
for id in get_child_ids(sf_object.id):
if not (child_object := get_record(id)):
for id in get_child_ids(directory, sf_object.id):
if not (child_object := get_record(directory, id)):
continue
sections.append(_extract_section(child_object, base_url))
@ -181,7 +183,7 @@ def convert_sf_object_to_doc(
source=DocumentSource.SALESFORCE,
semantic_identifier=extracted_semantic_identifier,
doc_updated_at=extracted_doc_updated_at,
primary_owners=_extract_primary_owners(sf_object),
primary_owners=_extract_primary_owners(directory, sf_object),
metadata={},
)
return doc

View File

@ -11,13 +11,12 @@ from simple_salesforce.bulk2 import SFBulk2Type
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.salesforce.sqlite_functions import has_at_least_one_object_of_type
from onyx.connectors.salesforce.utils import get_object_type_path
from onyx.utils.logger import setup_logger
logger = setup_logger()
def _build_time_filter_for_salesforce(
def _build_last_modified_time_filter_for_salesforce(
start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None
) -> str:
if start is None or end is None:
@ -30,6 +29,19 @@ def _build_time_filter_for_salesforce(
)
def _build_created_date_time_filter_for_salesforce(
start: SecondsSinceUnixEpoch | None, end: SecondsSinceUnixEpoch | None
) -> str:
if start is None or end is None:
return ""
start_datetime = datetime.fromtimestamp(start, UTC)
end_datetime = datetime.fromtimestamp(end, UTC)
return (
f" WHERE CreatedDate > {start_datetime.isoformat()} "
f"AND CreatedDate < {end_datetime.isoformat()}"
)
def _get_sf_type_object_json(sf_client: Salesforce, type_name: str) -> Any:
sf_object = SFType(type_name, sf_client.session_id, sf_client.sf_instance)
return sf_object.describe()
@ -109,23 +121,6 @@ def _check_if_object_type_is_empty(
return True
def _check_for_existing_csvs(sf_type: str) -> list[str] | None:
# Check if the csv already exists
if os.path.exists(get_object_type_path(sf_type)):
existing_csvs = [
os.path.join(get_object_type_path(sf_type), f)
for f in os.listdir(get_object_type_path(sf_type))
if f.endswith(".csv")
]
# If the csv already exists, return the path
# This is likely due to a previous run that failed
# after downloading the csv but before the data was
# written to the db
if existing_csvs:
return existing_csvs
return None
def _build_bulk_query(sf_client: Salesforce, sf_type: str, time_filter: str) -> str:
queryable_fields = _get_all_queryable_fields_of_sf_type(sf_client, sf_type)
query = f"SELECT {', '.join(queryable_fields)} FROM {sf_type}{time_filter}"
@ -133,16 +128,15 @@ def _build_bulk_query(sf_client: Salesforce, sf_type: str, time_filter: str) ->
def _bulk_retrieve_from_salesforce(
sf_client: Salesforce,
sf_type: str,
time_filter: str,
sf_client: Salesforce, sf_type: str, time_filter: str, target_dir: str
) -> tuple[str, list[str] | None]:
"""Returns a tuple of
1. the salesforce object type
2. the list of CSV's
"""
if not _check_if_object_type_is_empty(sf_client, sf_type, time_filter):
return sf_type, None
if existing_csvs := _check_for_existing_csvs(sf_type):
return sf_type, existing_csvs
query = _build_bulk_query(sf_client, sf_type, time_filter)
bulk_2_handler = SFBulk2Handler(
@ -159,20 +153,33 @@ def _bulk_retrieve_from_salesforce(
)
logger.info(f"Downloading {sf_type}")
logger.info(f"Query: {query}")
logger.debug(f"Query: {query}")
try:
# This downloads the file to a file in the target path with a random name
results = bulk_2_type.download(
query=query,
path=get_object_type_path(sf_type),
path=target_dir,
max_records=1000000,
)
all_download_paths = [result["file"] for result in results]
# prepend each downloaded csv with the object type (delimiter = '.')
all_download_paths: list[str] = []
for result in results:
original_file_path = result["file"]
directory, filename = os.path.split(original_file_path)
new_filename = f"{sf_type}.{filename}"
new_file_path = os.path.join(directory, new_filename)
os.rename(original_file_path, new_file_path)
all_download_paths.append(new_file_path)
logger.info(f"Downloaded {sf_type} to {all_download_paths}")
return sf_type, all_download_paths
except Exception as e:
logger.info(f"Failed to download salesforce csv for object type {sf_type}: {e}")
logger.error(
f"Failed to download salesforce csv for object type {sf_type}: {e}"
)
logger.warning(f"Exceptioning query for object type {sf_type}: {query}")
return sf_type, None
@ -181,12 +188,35 @@ def fetch_all_csvs_in_parallel(
object_types: set[str],
start: SecondsSinceUnixEpoch | None,
end: SecondsSinceUnixEpoch | None,
target_dir: str,
) -> dict[str, list[str] | None]:
"""
Fetches all the csvs in parallel for the given object types
Returns a dict of (sf_type, full_download_path)
"""
time_filter = _build_time_filter_for_salesforce(start, end)
# these types don't query properly and need looking at
# problem_types: set[str] = {
# "ContentDocumentLink",
# "RecordActionHistory",
# "PendingOrderSummary",
# "UnifiedActivityRelation",
# }
# these types don't have a LastModifiedDate field and instead use CreatedDate
created_date_types: set[str] = {
"AccountHistory",
"AccountTag",
"EntitySubscription",
}
last_modified_time_filter = _build_last_modified_time_filter_for_salesforce(
start, end
)
created_date_time_filter = _build_created_date_time_filter_for_salesforce(
start, end
)
time_filter_for_each_object_type = {}
# We do this outside of the thread pool executor because this requires
# a database connection and we don't want to block the thread pool
@ -195,8 +225,11 @@ def fetch_all_csvs_in_parallel(
"""Only add time filter if there is at least one object of the type
in the database. We aren't worried about partially completed object update runs
because this occurs after we check for existing csvs which covers this case"""
if has_at_least_one_object_of_type(sf_type):
time_filter_for_each_object_type[sf_type] = time_filter
if has_at_least_one_object_of_type(target_dir, sf_type):
if sf_type in created_date_types:
time_filter_for_each_object_type[sf_type] = created_date_time_filter
else:
time_filter_for_each_object_type[sf_type] = last_modified_time_filter
else:
time_filter_for_each_object_type[sf_type] = ""
@ -207,6 +240,7 @@ def fetch_all_csvs_in_parallel(
sf_client=sf_client,
sf_type=object_type,
time_filter=time_filter_for_each_object_type[object_type],
target_dir=target_dir,
),
object_types,
)

View File

@ -2,8 +2,10 @@ import csv
import json
import os
import sqlite3
import time
from collections.abc import Iterator
from contextlib import contextmanager
from pathlib import Path
from onyx.connectors.salesforce.utils import get_sqlite_db_path
from onyx.connectors.salesforce.utils import SalesforceObject
@ -16,6 +18,7 @@ logger = setup_logger()
@contextmanager
def get_db_connection(
directory: str,
isolation_level: str | None = None,
) -> Iterator[sqlite3.Connection]:
"""Get a database connection with proper isolation level and error handling.
@ -25,7 +28,7 @@ def get_db_connection(
can be "IMMEDIATE" or "EXCLUSIVE" for more strict isolation.
"""
# 60 second timeout for locks
conn = sqlite3.connect(get_sqlite_db_path(), timeout=60.0)
conn = sqlite3.connect(get_sqlite_db_path(directory), timeout=60.0)
if isolation_level is not None:
conn.isolation_level = isolation_level
@ -38,17 +41,41 @@ def get_db_connection(
conn.close()
def init_db() -> None:
def sqlite_log_stats(directory: str) -> None:
with get_db_connection(directory, "EXCLUSIVE") as conn:
cache_pages = conn.execute("PRAGMA cache_size").fetchone()[0]
page_size = conn.execute("PRAGMA page_size").fetchone()[0]
if cache_pages >= 0:
cache_bytes = cache_pages * page_size
else:
cache_bytes = abs(cache_pages * 1024)
logger.info(
f"SQLite stats: sqlite_version={sqlite3.sqlite_version} "
f"cache_pages={cache_pages} "
f"page_size={page_size} "
f"cache_bytes={cache_bytes}"
)
def init_db(directory: str) -> None:
"""Initialize the SQLite database with required tables if they don't exist."""
# Create database directory if it doesn't exist
os.makedirs(os.path.dirname(get_sqlite_db_path()), exist_ok=True)
start = time.monotonic()
with get_db_connection("EXCLUSIVE") as conn:
os.makedirs(os.path.dirname(get_sqlite_db_path(directory)), exist_ok=True)
with get_db_connection(directory, "EXCLUSIVE") as conn:
cursor = conn.cursor()
db_exists = os.path.exists(get_sqlite_db_path())
db_exists = os.path.exists(get_sqlite_db_path(directory))
if db_exists:
file_path = Path(get_sqlite_db_path(directory))
file_size = file_path.stat().st_size
logger.info(f"init_db - found existing sqlite db: len={file_size}")
else:
# why is this only if the db doesn't exist?
if not db_exists:
# Enable WAL mode for better concurrent access and write performance
cursor.execute("PRAGMA journal_mode=WAL")
cursor.execute("PRAGMA synchronous=NORMAL")
@ -143,16 +170,31 @@ def init_db() -> None:
""",
)
elapsed = time.monotonic() - start
logger.info(f"init_db - create tables and indices: elapsed={elapsed:.2f}")
# Analyze tables to help query planner
cursor.execute("ANALYZE relationships")
cursor.execute("ANALYZE salesforce_objects")
cursor.execute("ANALYZE relationship_types")
cursor.execute("ANALYZE user_email_map")
# NOTE(rkuo): skip ANALYZE - it takes too long and we likely don't have
# complicated queries that need this
# start = time.monotonic()
# cursor.execute("ANALYZE relationships")
# cursor.execute("ANALYZE salesforce_objects")
# cursor.execute("ANALYZE relationship_types")
# cursor.execute("ANALYZE user_email_map")
# elapsed = time.monotonic() - start
# logger.info(f"init_db - analyze: elapsed={elapsed:.2f}")
# If database already existed but user_email_map needs to be populated
start = time.monotonic()
cursor.execute("SELECT COUNT(*) FROM user_email_map")
elapsed = time.monotonic() - start
logger.info(f"init_db - count user_email_map: elapsed={elapsed:.2f}")
start = time.monotonic()
if cursor.fetchone()[0] == 0:
_update_user_email_map(conn)
elapsed = time.monotonic() - start
logger.info(f"init_db - update_user_email_map: elapsed={elapsed:.2f}")
conn.commit()
@ -240,15 +282,15 @@ def _update_user_email_map(conn: sqlite3.Connection) -> None:
def update_sf_db_with_csv(
directory: str,
object_type: str,
csv_download_path: str,
delete_csv_after_use: bool = True,
) -> list[str]:
"""Update the SF DB with a CSV file using SQLite storage."""
updated_ids = []
# Use IMMEDIATE to get a write lock at the start of the transaction
with get_db_connection("IMMEDIATE") as conn:
with get_db_connection(directory, "IMMEDIATE") as conn:
cursor = conn.cursor()
with open(csv_download_path, "r", newline="", encoding="utf-8") as f:
@ -295,17 +337,12 @@ def update_sf_db_with_csv(
conn.commit()
if delete_csv_after_use:
# Remove the csv file after it has been used
# to successfully update the db
os.remove(csv_download_path)
return updated_ids
def get_child_ids(parent_id: str) -> set[str]:
def get_child_ids(directory: str, parent_id: str) -> set[str]:
"""Get all child IDs for a given parent ID."""
with get_db_connection() as conn:
with get_db_connection(directory) as conn:
cursor = conn.cursor()
# Force index usage with INDEXED BY
@ -317,9 +354,9 @@ def get_child_ids(parent_id: str) -> set[str]:
return child_ids
def get_type_from_id(object_id: str) -> str | None:
def get_type_from_id(directory: str, object_id: str) -> str | None:
"""Get the type of an object from its ID."""
with get_db_connection() as conn:
with get_db_connection(directory) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT object_type FROM salesforce_objects WHERE id = ?", (object_id,)
@ -332,15 +369,15 @@ def get_type_from_id(object_id: str) -> str | None:
def get_record(
object_id: str, object_type: str | None = None
directory: str, object_id: str, object_type: str | None = None
) -> SalesforceObject | None:
"""Retrieve the record and return it as a SalesforceObject."""
if object_type is None:
object_type = get_type_from_id(object_id)
object_type = get_type_from_id(directory, object_id)
if not object_type:
return None
with get_db_connection() as conn:
with get_db_connection(directory) as conn:
cursor = conn.cursor()
cursor.execute("SELECT data FROM salesforce_objects WHERE id = ?", (object_id,))
result = cursor.fetchone()
@ -352,9 +389,9 @@ def get_record(
return SalesforceObject(id=object_id, type=object_type, data=data)
def find_ids_by_type(object_type: str) -> list[str]:
def find_ids_by_type(directory: str, object_type: str) -> list[str]:
"""Find all object IDs for rows of the specified type."""
with get_db_connection() as conn:
with get_db_connection(directory) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT id FROM salesforce_objects WHERE object_type = ?", (object_type,)
@ -363,6 +400,7 @@ def find_ids_by_type(object_type: str) -> list[str]:
def get_affected_parent_ids_by_type(
directory: str,
updated_ids: list[str],
parent_types: list[str],
batch_size: int = 500,
@ -374,7 +412,7 @@ def get_affected_parent_ids_by_type(
updated_ids_batches = batch_list(updated_ids, batch_size)
updated_parent_ids: set[str] = set()
with get_db_connection() as conn:
with get_db_connection(directory) as conn:
cursor = conn.cursor()
for batch_ids in updated_ids_batches:
@ -419,7 +457,7 @@ def get_affected_parent_ids_by_type(
yield parent_type, new_affected_ids
def has_at_least_one_object_of_type(object_type: str) -> bool:
def has_at_least_one_object_of_type(directory: str, object_type: str) -> bool:
"""Check if there is at least one object of the specified type in the database.
Args:
@ -428,7 +466,7 @@ def has_at_least_one_object_of_type(object_type: str) -> bool:
Returns:
bool: True if at least one object exists, False otherwise
"""
with get_db_connection() as conn:
with get_db_connection(directory) as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM salesforce_objects WHERE object_type = ?",
@ -443,7 +481,7 @@ def has_at_least_one_object_of_type(object_type: str) -> bool:
NULL_ID_STRING = "N/A"
def get_user_id_by_email(email: str) -> str | None:
def get_user_id_by_email(directory: str, email: str) -> str | None:
"""Get the Salesforce User ID for a given email address.
Args:
@ -454,7 +492,7 @@ def get_user_id_by_email(email: str) -> str | None:
- was_found: True if the email exists in the table, False if not found
- user_id: The Salesforce User ID if exists, None otherwise
"""
with get_db_connection() as conn:
with get_db_connection(directory) as conn:
cursor = conn.cursor()
cursor.execute("SELECT user_id FROM user_email_map WHERE email = ?", (email,))
result = cursor.fetchone()
@ -463,10 +501,10 @@ def get_user_id_by_email(email: str) -> str | None:
return result[0]
def update_email_to_id_table(email: str, id: str | None) -> None:
def update_email_to_id_table(directory: str, email: str, id: str | None) -> None:
"""Update the email to ID map table with a new email and ID."""
id_to_use = id or NULL_ID_STRING
with get_db_connection() as conn:
with get_db_connection(directory) as conn:
cursor = conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO user_email_map (email, user_id) VALUES (?, ?)",

View File

@ -30,9 +30,9 @@ class SalesforceObject:
BASE_DATA_PATH = os.path.join(os.path.dirname(__file__), "data")
def get_sqlite_db_path() -> str:
def get_sqlite_db_path(directory: str) -> str:
"""Get the path to the sqlite db file."""
return os.path.join(BASE_DATA_PATH, "salesforce_db.sqlite")
return os.path.join(directory, "salesforce_db.sqlite")
def get_object_type_path(object_type: str) -> str:

View File

@ -227,16 +227,13 @@ class SearchPipeline:
# If ee is enabled, censor the chunk sections based on user access
# Otherwise, return the retrieved chunks
censored_chunks = cast(
list[InferenceChunk],
fetch_ee_implementation_or_noop(
"onyx.external_permissions.post_query_censoring",
"_post_query_chunk_censoring",
retrieved_chunks,
)(
chunks=retrieved_chunks,
user=self.user,
),
censored_chunks: list[InferenceChunk] = fetch_ee_implementation_or_noop(
"onyx.external_permissions.post_query_censoring",
"_post_query_chunk_censoring",
retrieved_chunks,
)(
chunks=retrieved_chunks,
user=self.user,
)
above = self.search_query.chunks_above

View File

@ -459,10 +459,6 @@ def process_image_sections(documents: list[Document]) -> list[IndexingDocument]:
llm = get_default_llm_with_vision()
if not llm:
logger.warning(
"No vision-capable LLM available. Image sections will not be processed."
)
# Even without LLM, we still convert to IndexingDocument with base Sections
return [
IndexingDocument(
@ -929,10 +925,12 @@ def index_doc_batch(
for chunk_num, chunk in enumerate(chunks_with_embeddings)
]
logger.debug(
"Indexing the following chunks: "
f"{[chunk.to_short_descriptor() for chunk in access_aware_chunks]}"
)
short_descriptor_list = [
chunk.to_short_descriptor() for chunk in access_aware_chunks
]
short_descriptor_log = str(short_descriptor_list)[:1024]
logger.debug(f"Indexing the following chunks: {short_descriptor_log}")
# A document will not be spread across different batches, so all the
# documents with chunks in this set, are fully represented by the chunks
# in this set

View File

@ -202,8 +202,8 @@ class EmbeddingModel:
end_time = time.time()
processing_time = end_time - start_time
logger.info(
f"Batch {batch_idx} processing time: {processing_time:.2f} seconds"
logger.debug(
f"EmbeddingModel.process_batch: Batch {batch_idx} processing time: {processing_time:.2f} seconds"
)
return batch_idx, response.embeddings

View File

@ -35,23 +35,22 @@ def salesforce_connector() -> SalesforceConnector:
connector = SalesforceConnector(
requested_objects=["Account", "Contact", "Opportunity"],
)
username = os.environ["SF_USERNAME"]
password = os.environ["SF_PASSWORD"]
security_token = os.environ["SF_SECURITY_TOKEN"]
connector.load_credentials(
{
"sf_username": os.environ["SF_USERNAME"],
"sf_password": os.environ["SF_PASSWORD"],
"sf_security_token": os.environ["SF_SECURITY_TOKEN"],
"sf_username": username,
"sf_password": password,
"sf_security_token": security_token,
}
)
return connector
# TODO: make the credentials not expire
@pytest.mark.xfail(
reason=(
"Credentials change over time, so this test will fail if run when "
"the credentials expire."
)
)
def test_salesforce_connector_basic(salesforce_connector: SalesforceConnector) -> None:
test_data = load_test_data()
target_test_doc: Document | None = None
@ -61,21 +60,26 @@ def test_salesforce_connector_basic(salesforce_connector: SalesforceConnector) -
all_docs.append(doc)
if doc.id == test_data["id"]:
target_test_doc = doc
break
# The number of docs here seems to change actively so do a very loose check
# as of 2025-03-28 it was around 32472
assert len(all_docs) > 32000
assert len(all_docs) < 40000
assert len(all_docs) == 6
assert target_test_doc is not None
# Set of received links
received_links: set[str] = set()
# List of received text fields, which contain key-value pairs seperated by newlines
recieved_text: list[str] = []
received_text: list[str] = []
# Iterate over the sections of the target test doc to extract the links and text
for section in target_test_doc.sections:
assert section.link
assert section.text
received_links.add(section.link)
recieved_text.append(section.text)
received_text.append(section.text)
# Check that the received links match the expected links from the test data json
expected_links = set(test_data["expected_links"])
@ -85,8 +89,9 @@ def test_salesforce_connector_basic(salesforce_connector: SalesforceConnector) -
expected_text = test_data["expected_text"]
if not isinstance(expected_text, list):
raise ValueError("Expected text is not a list")
unparsed_expected_key_value_pairs: list[str] = expected_text
received_key_value_pairs = extract_key_value_pairs_to_set(recieved_text)
received_key_value_pairs = extract_key_value_pairs_to_set(received_text)
expected_key_value_pairs = extract_key_value_pairs_to_set(
unparsed_expected_key_value_pairs
)
@ -96,13 +101,21 @@ def test_salesforce_connector_basic(salesforce_connector: SalesforceConnector) -
assert target_test_doc.source == DocumentSource.SALESFORCE
assert target_test_doc.semantic_identifier == test_data["semantic_identifier"]
assert target_test_doc.metadata == test_data["metadata"]
assert target_test_doc.primary_owners == test_data["primary_owners"]
assert target_test_doc.primary_owners is not None
primary_owner = target_test_doc.primary_owners[0]
expected_primary_owner = test_data["primary_owners"]
assert isinstance(expected_primary_owner, dict)
assert primary_owner.email == expected_primary_owner["email"]
assert primary_owner.first_name == expected_primary_owner["first_name"]
assert primary_owner.last_name == expected_primary_owner["last_name"]
assert target_test_doc.secondary_owners == test_data["secondary_owners"]
assert target_test_doc.title == test_data["title"]
# TODO: make the credentials not expire
@pytest.mark.xfail(
@pytest.mark.skip(
reason=(
"Credentials change over time, so this test will fail if run when "
"the credentials expire."

View File

@ -1,20 +1,162 @@
{
"id": "SALESFORCE_001fI000005drUcQAI",
"id": "SALESFORCE_001bm00000eu6n5AAA",
"expected_links": [
"https://customization-ruby-2195.my.salesforce.com/001fI000005drUcQAI",
"https://customization-ruby-2195.my.salesforce.com/003fI000001jiCPQAY",
"https://customization-ruby-2195.my.salesforce.com/017fI00000T7hvsQAB",
"https://customization-ruby-2195.my.salesforce.com/006fI000000rDvBQAU"
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpEeAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqd3AAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoKiAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvDSAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrmHAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrl2AAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvejAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStlvAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpPfAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrP9AAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvlMAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESt3JAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoBkAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStw2AAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrkMAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESojKAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuLEAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoSIAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESu2YAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvgSAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESurnAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrnqAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoB5AAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuJuAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrfyAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/001bm00000eu6n5AAA",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpUHAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsgGAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESr7UAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESu1BAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpqzAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESplZAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvJ3AAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESurKAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStSiAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuJFAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESu8xAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqfzAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqsrAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStoZAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsIUAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsAGAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESv8GAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrOKAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoUmAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESudKAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuJ8AAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvf2AAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESw3qAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESugRAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESr18AAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqV1AAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuLVAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpjoAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESqULAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuCAAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrfpAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESp5YAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrMNAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStaUAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESt5LAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrtcAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESomaAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrtIAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESoToAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuWLAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESrWvAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsJEAA1",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESsxwAAD",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvUgAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESvWjAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000EStBuAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESpZiAAL",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuhYAAT",
"https://danswer-dev-ed.develop.my.salesforce.com/003bm00000ESuWAAA1"
],
"expected_text": [
"BillingPostalCode: 60601\nType: Prospect\nWebsite: www.globalistindustries.com\nBillingCity: Chicago\nDescription: Globalist company\nIsDeleted: false\nIsPartner: false\nPhone: (312) 555-0456\nShippingCountry: USA\nShippingState: IL\nIsBuyer: false\nBillingCountry: USA\nBillingState: IL\nShippingPostalCode: 60601\nBillingStreet: 456 Market St\nIsCustomerPortal: false\nPersonActiveTrackerCount: 0\nShippingCity: Chicago\nShippingStreet: 456 Market St",
"FirstName: Michael\nMailingCountry: USA\nActiveTrackerCount: 0\nEmail: m.brown@globalindustries.com\nMailingState: IL\nMailingStreet: 456 Market St\nMailingCity: Chicago\nLastName: Brown\nTitle: CTO\nIsDeleted: false\nPhone: (312) 555-0456\nHasOptedOutOfEmail: false\nIsEmailBounced: false\nMailingPostalCode: 60601",
"ForecastCategory: Closed\nName: Global Industries Equipment Sale\nIsDeleted: false\nForecastCategoryName: Closed\nFiscalYear: 2024\nFiscalQuarter: 4\nIsClosed: true\nIsWon: true\nAmount: 5000000.0\nProbability: 100.0\nPushCount: 0\nHasOverdueTask: false\nStageName: Closed Won\nHasOpenActivity: false\nHasOpportunityLineItem: false",
"Field: created\nDataType: Text\nIsDeleted: false"
"IsDeleted: false\nBillingCity: Shaykh al \u00e1\u00b8\u00a8ad\u00c4\u00abd\nName: Voonder\nCleanStatus: Pending\nBillingStreet: 12 Cambridge Parkway",
"Email: eslayqzs@icio.us\nIsDeleted: false\nLastName: Slay\nIsEmailBounced: false\nFirstName: Ebeneser\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ptweedgdh@umich.edu\nIsDeleted: false\nLastName: Tweed\nIsEmailBounced: false\nFirstName: Paulita\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ehurnellnlx@facebook.com\nIsDeleted: false\nLastName: Hurnell\nIsEmailBounced: false\nFirstName: Eliot\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ccarik4q4@google.it\nIsDeleted: false\nLastName: Carik\nIsEmailBounced: false\nFirstName: Chadwick\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: cvannozziina6@moonfruit.com\nIsDeleted: false\nLastName: Vannozzii\nIsEmailBounced: false\nFirstName: Christophorus\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: mikringill2kz@hugedomains.com\nIsDeleted: false\nLastName: Ikringill\nIsEmailBounced: false\nFirstName: Meghann\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: bgrinvalray@fda.gov\nIsDeleted: false\nLastName: Grinval\nIsEmailBounced: false\nFirstName: Berti\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: aollanderhr7@cam.ac.uk\nIsDeleted: false\nLastName: Ollander\nIsEmailBounced: false\nFirstName: Annemarie\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: rwhitesideq38@gravatar.com\nIsDeleted: false\nLastName: Whiteside\nIsEmailBounced: false\nFirstName: Rolando\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: vkrafthmz@techcrunch.com\nIsDeleted: false\nLastName: Kraft\nIsEmailBounced: false\nFirstName: Vidovik\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: jhillaut@4shared.com\nIsDeleted: false\nLastName: Hill\nIsEmailBounced: false\nFirstName: Janel\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: lralstonycs@discovery.com\nIsDeleted: false\nLastName: Ralston\nIsEmailBounced: false\nFirstName: Lorrayne\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: blyttlewba@networkadvertising.org\nIsDeleted: false\nLastName: Lyttle\nIsEmailBounced: false\nFirstName: Ban\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: pplummernvf@technorati.com\nIsDeleted: false\nLastName: Plummer\nIsEmailBounced: false\nFirstName: Pete\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: babrahamoffxpb@theatlantic.com\nIsDeleted: false\nLastName: Abrahamoff\nIsEmailBounced: false\nFirstName: Brander\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ahargieym0@homestead.com\nIsDeleted: false\nLastName: Hargie\nIsEmailBounced: false\nFirstName: Aili\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: hstotthp2@yelp.com\nIsDeleted: false\nLastName: Stott\nIsEmailBounced: false\nFirstName: Hartley\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: jganniclifftuvj@blinklist.com\nIsDeleted: false\nLastName: Ganniclifft\nIsEmailBounced: false\nFirstName: Jamima\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ldodelly8q@ed.gov\nIsDeleted: false\nLastName: Dodell\nIsEmailBounced: false\nFirstName: Lynde\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: rmilner3cp@smh.com.au\nIsDeleted: false\nLastName: Milner\nIsEmailBounced: false\nFirstName: Ralph\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: gghiriardellic19@state.tx.us\nIsDeleted: false\nLastName: Ghiriardelli\nIsEmailBounced: false\nFirstName: Garv\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: rhubatschfpu@nature.com\nIsDeleted: false\nLastName: Hubatsch\nIsEmailBounced: false\nFirstName: Rose\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: mtrenholme1ws@quantcast.com\nIsDeleted: false\nLastName: Trenholme\nIsEmailBounced: false\nFirstName: Mariejeanne\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: jmussettpbd@over-blog.com\nIsDeleted: false\nLastName: Mussett\nIsEmailBounced: false\nFirstName: Juliann\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: bgoroni145@illinois.edu\nIsDeleted: false\nLastName: Goroni\nIsEmailBounced: false\nFirstName: Bernarr\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: afalls3ph@theguardian.com\nIsDeleted: false\nLastName: Falls\nIsEmailBounced: false\nFirstName: Angelia\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: lswettjoi@go.com\nIsDeleted: false\nLastName: Swett\nIsEmailBounced: false\nFirstName: Levon\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: emullinsz38@dailymotion.com\nIsDeleted: false\nLastName: Mullins\nIsEmailBounced: false\nFirstName: Elsa\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ibernettehco@ebay.co.uk\nIsDeleted: false\nLastName: Bernette\nIsEmailBounced: false\nFirstName: Ingrid\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: trisleybtt@simplemachines.org\nIsDeleted: false\nLastName: Risley\nIsEmailBounced: false\nFirstName: Toma\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: rgypsonqx1@goodreads.com\nIsDeleted: false\nLastName: Gypson\nIsEmailBounced: false\nFirstName: Reed\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: cposvneri28@jiathis.com\nIsDeleted: false\nLastName: Posvner\nIsEmailBounced: false\nFirstName: Culley\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: awilmut2rz@geocities.jp\nIsDeleted: false\nLastName: Wilmut\nIsEmailBounced: false\nFirstName: Andy\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: aluckwellra5@exblog.jp\nIsDeleted: false\nLastName: Luckwell\nIsEmailBounced: false\nFirstName: Andreana\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: irollings26j@timesonline.co.uk\nIsDeleted: false\nLastName: Rollings\nIsEmailBounced: false\nFirstName: Ibrahim\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: gspireqpd@g.co\nIsDeleted: false\nLastName: Spire\nIsEmailBounced: false\nFirstName: Gaelan\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: sbezleyk2y@acquirethisname.com\nIsDeleted: false\nLastName: Bezley\nIsEmailBounced: false\nFirstName: Sindee\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: icollerrr@flickr.com\nIsDeleted: false\nLastName: Coller\nIsEmailBounced: false\nFirstName: Inesita\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: kfolliott1bo@nature.com\nIsDeleted: false\nLastName: Folliott\nIsEmailBounced: false\nFirstName: Kennan\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: kroofjfo@gnu.org\nIsDeleted: false\nLastName: Roof\nIsEmailBounced: false\nFirstName: Karlik\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: lcovotti8s4@rediff.com\nIsDeleted: false\nLastName: Covotti\nIsEmailBounced: false\nFirstName: Lucho\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: gpatriskson1rs@census.gov\nIsDeleted: false\nLastName: Patriskson\nIsEmailBounced: false\nFirstName: Gardener\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: spidgleyqvw@usgs.gov\nIsDeleted: false\nLastName: Pidgley\nIsEmailBounced: false\nFirstName: Simona\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: cbecarrak0i@over-blog.com\nIsDeleted: false\nLastName: Becarra\nIsEmailBounced: false\nFirstName: Cally\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: aparkman9td@bbc.co.uk\nIsDeleted: false\nLastName: Parkman\nIsEmailBounced: false\nFirstName: Agneta\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: bboddingtonhn@quantcast.com\nIsDeleted: false\nLastName: Boddington\nIsEmailBounced: false\nFirstName: Betta\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: dcasementx0p@cafepress.com\nIsDeleted: false\nLastName: Casement\nIsEmailBounced: false\nFirstName: Dannie\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: hzornbhe@latimes.com\nIsDeleted: false\nLastName: Zorn\nIsEmailBounced: false\nFirstName: Haleigh\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: cfifieldbjb@blogspot.com\nIsDeleted: false\nLastName: Fifield\nIsEmailBounced: false\nFirstName: Christalle\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ddewerson4t3@skype.com\nIsDeleted: false\nLastName: Dewerson\nIsEmailBounced: false\nFirstName: Dyann\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: khullock52p@sohu.com\nIsDeleted: false\nLastName: Hullock\nIsEmailBounced: false\nFirstName: Kellina\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: tfremantle32n@bandcamp.com\nIsDeleted: false\nLastName: Fremantle\nIsEmailBounced: false\nFirstName: Turner\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: sbernardtylp@nps.gov\nIsDeleted: false\nLastName: Bernardt\nIsEmailBounced: false\nFirstName: Selina\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: smcgettigan8kk@slideshare.net\nIsDeleted: false\nLastName: McGettigan\nIsEmailBounced: false\nFirstName: Sada\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: wdelafontvgn@businesswire.com\nIsDeleted: false\nLastName: Delafont\nIsEmailBounced: false\nFirstName: West\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: lbelsher9ne@indiatimes.com\nIsDeleted: false\nLastName: Belsher\nIsEmailBounced: false\nFirstName: Lou\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: cgoody27y@blogtalkradio.com\nIsDeleted: false\nLastName: Goody\nIsEmailBounced: false\nFirstName: Colene\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: cstodejzz@ucoz.ru\nIsDeleted: false\nLastName: Stode\nIsEmailBounced: false\nFirstName: Curcio\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: abromidgejb@china.com.cn\nIsDeleted: false\nLastName: Bromidge\nIsEmailBounced: false\nFirstName: Ariela\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ldelgardilloqvp@xrea.com\nIsDeleted: false\nLastName: Delgardillo\nIsEmailBounced: false\nFirstName: Lauralee\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: dcroal9t4@businessinsider.com\nIsDeleted: false\nLastName: Croal\nIsEmailBounced: false\nFirstName: Devlin\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: dclarageqzb@wordpress.com\nIsDeleted: false\nLastName: Clarage\nIsEmailBounced: false\nFirstName: Dre\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: dthirlwall3jf@taobao.com\nIsDeleted: false\nLastName: Thirlwall\nIsEmailBounced: false\nFirstName: Dareen\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: tkeddie2lj@wiley.com\nIsDeleted: false\nLastName: Keddie\nIsEmailBounced: false\nFirstName: Tandi\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: jrimingtoni3i@istockphoto.com\nIsDeleted: false\nLastName: Rimington\nIsEmailBounced: false\nFirstName: Judy\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: gtroynet@slashdot.org\nIsDeleted: false\nLastName: Troy\nIsEmailBounced: false\nFirstName: Gail\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: ebunneyh0n@meetup.com\nIsDeleted: false\nLastName: Bunney\nIsEmailBounced: false\nFirstName: Efren\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: yhaken8p3@slate.com\nIsDeleted: false\nLastName: Haken\nIsEmailBounced: false\nFirstName: Yard\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: nolliffeq6q@biblegateway.com\nIsDeleted: false\nLastName: Olliffe\nIsEmailBounced: false\nFirstName: Nani\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: bgalia9jz@odnoklassniki.ru\nIsDeleted: false\nLastName: Galia\nIsEmailBounced: false\nFirstName: Berrie\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: djedrzej3v1@google.com\nIsDeleted: false\nLastName: Jedrzej\nIsEmailBounced: false\nFirstName: Deanne\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: mcamiesh1t@fc2.com\nIsDeleted: false\nLastName: Camies\nIsEmailBounced: false\nFirstName: Mikaela\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: csunshineqni@state.tx.us\nIsDeleted: false\nLastName: Sunshine\nIsEmailBounced: false\nFirstName: Curtis\nIsPriorityRecord: false\nCleanStatus: Pending",
"Email: fiannellib46@marriott.com\nIsDeleted: false\nLastName: Iannelli\nIsEmailBounced: false\nFirstName: Felicio\nIsPriorityRecord: false\nCleanStatus: Pending"
],
"semantic_identifier": "Unknown Object",
"semantic_identifier": "Voonder",
"metadata": {},
"primary_owners": null,
"primary_owners": {"email": "hagen@danswer.ai", "first_name": "Hagen", "last_name": "oneill"},
"secondary_owners": null,
"title": null
}

View File

@ -5,8 +5,11 @@ from ee.onyx.external_permissions.salesforce.postprocessing import (
)
from onyx.configs.app_configs import BLURB_SIZE
from onyx.configs.constants import DocumentSource
from onyx.connectors.salesforce.utils import BASE_DATA_PATH
from onyx.context.search.models import InferenceChunk
SQLITE_DIR = BASE_DATA_PATH
def create_test_chunk(
doc_id: str,
@ -39,6 +42,7 @@ def create_test_chunk(
def test_validate_salesforce_access_single_object() -> None:
"""Test filtering when chunk has a single Salesforce object reference"""
section = "This is a test document about a Salesforce object."
test_content = section
test_chunk = create_test_chunk(

View File

@ -113,15 +113,18 @@ _VALID_SALESFORCE_IDS = [
]
def _clear_sf_db() -> None:
def _clear_sf_db(directory: str) -> None:
"""
Clears the SF DB by deleting all files in the data directory.
"""
shutil.rmtree(BASE_DATA_PATH, ignore_errors=True)
shutil.rmtree(directory, ignore_errors=True)
def _create_csv_file(
object_type: str, records: list[dict], filename: str = "test_data.csv"
directory: str,
object_type: str,
records: list[dict],
filename: str = "test_data.csv",
) -> None:
"""
Creates a CSV file for the given object type and records.
@ -149,10 +152,10 @@ def _create_csv_file(
writer.writerow(record)
# Update the database with the CSV
update_sf_db_with_csv(object_type, csv_path)
update_sf_db_with_csv(directory, object_type, csv_path)
def _create_csv_with_example_data() -> None:
def _create_csv_with_example_data(directory: str) -> None:
"""
Creates CSV files with example data, organized by object type.
"""
@ -342,10 +345,10 @@ def _create_csv_with_example_data() -> None:
# Create CSV files for each object type
for object_type, records in example_data.items():
_create_csv_file(object_type, records)
_create_csv_file(directory, object_type, records)
def _test_query() -> None:
def _test_query(directory: str) -> None:
"""
Tests querying functionality by verifying:
1. All expected Account IDs are found
@ -401,7 +404,7 @@ def _test_query() -> None:
}
# Get all Account IDs
account_ids = find_ids_by_type("Account")
account_ids = find_ids_by_type(directory, "Account")
# Verify we found all expected accounts
assert len(account_ids) == len(
@ -413,7 +416,7 @@ def _test_query() -> None:
# Verify each account's data
for acc_id in account_ids:
combined = get_record(acc_id)
combined = get_record(directory, acc_id)
assert combined is not None, f"Could not find account {acc_id}"
expected = expected_accounts[acc_id]
@ -428,7 +431,7 @@ def _test_query() -> None:
print("All query tests passed successfully!")
def _test_upsert() -> None:
def _test_upsert(directory: str) -> None:
"""
Tests upsert functionality by:
1. Updating an existing account
@ -453,10 +456,10 @@ def _test_upsert() -> None:
},
]
_create_csv_file("Account", update_data, "update_data.csv")
_create_csv_file(directory, "Account", update_data, "update_data.csv")
# Verify the update worked
updated_record = get_record(_VALID_SALESFORCE_IDS[0])
updated_record = get_record(directory, _VALID_SALESFORCE_IDS[0])
assert updated_record is not None, "Updated record not found"
assert updated_record.data["Name"] == "Acme Inc. Updated", "Name not updated"
assert (
@ -464,7 +467,7 @@ def _test_upsert() -> None:
), "Description not added"
# Verify the new record was created
new_record = get_record(_VALID_SALESFORCE_IDS[2])
new_record = get_record(directory, _VALID_SALESFORCE_IDS[2])
assert new_record is not None, "New record not found"
assert new_record.data["Name"] == "New Company Inc.", "New record name incorrect"
assert new_record.data["AnnualRevenue"] == "1000000", "New record revenue incorrect"
@ -472,7 +475,7 @@ def _test_upsert() -> None:
print("All upsert tests passed successfully!")
def _test_relationships() -> None:
def _test_relationships(directory: str) -> None:
"""
Tests relationship shelf updates and queries by:
1. Creating test data with relationships
@ -513,11 +516,11 @@ def _test_relationships() -> None:
# Create and update CSV files for each object type
for object_type, records in test_data.items():
_create_csv_file(object_type, records, "relationship_test.csv")
_create_csv_file(directory, object_type, records, "relationship_test.csv")
# Test relationship queries
# All these objects should be children of Acme Inc.
child_ids = get_child_ids(_VALID_SALESFORCE_IDS[0])
child_ids = get_child_ids(directory, _VALID_SALESFORCE_IDS[0])
assert len(child_ids) == 4, f"Expected 4 child objects, found {len(child_ids)}"
assert _VALID_SALESFORCE_IDS[13] in child_ids, "Case 1 not found in relationship"
assert _VALID_SALESFORCE_IDS[14] in child_ids, "Case 2 not found in relationship"
@ -527,7 +530,7 @@ def _test_relationships() -> None:
), "Opportunity not found in relationship"
# Test querying relationships for a different account (should be empty)
other_account_children = get_child_ids(_VALID_SALESFORCE_IDS[1])
other_account_children = get_child_ids(directory, _VALID_SALESFORCE_IDS[1])
assert (
len(other_account_children) == 0
), "Expected no children for different account"
@ -535,7 +538,7 @@ def _test_relationships() -> None:
print("All relationship tests passed successfully!")
def _test_account_with_children() -> None:
def _test_account_with_children(directory: str) -> None:
"""
Tests querying all accounts and retrieving their child objects.
This test verifies that:
@ -544,16 +547,16 @@ def _test_account_with_children() -> None:
3. Child object data is complete and accurate
"""
# First get all account IDs
account_ids = find_ids_by_type("Account")
account_ids = find_ids_by_type(directory, "Account")
assert len(account_ids) > 0, "No accounts found"
# For each account, get its children and verify the data
for account_id in account_ids:
account = get_record(account_id)
account = get_record(directory, account_id)
assert account is not None, f"Could not find account {account_id}"
# Get all child objects
child_ids = get_child_ids(account_id)
child_ids = get_child_ids(directory, account_id)
# For Acme Inc., verify specific relationships
if account_id == _VALID_SALESFORCE_IDS[0]: # Acme Inc.
@ -564,7 +567,7 @@ def _test_account_with_children() -> None:
# Get all child records
child_records = []
for child_id in child_ids:
child_record = get_record(child_id)
child_record = get_record(directory, child_id)
if child_record is not None:
child_records.append(child_record)
# Verify Cases
@ -599,7 +602,7 @@ def _test_account_with_children() -> None:
print("All account with children tests passed successfully!")
def _test_relationship_updates() -> None:
def _test_relationship_updates(directory: str) -> None:
"""
Tests that relationships are properly updated when a child object's parent reference changes.
This test verifies:
@ -616,10 +619,10 @@ def _test_relationship_updates() -> None:
"LastName": "Contact",
}
]
_create_csv_file("Contact", initial_contact, "initial_contact.csv")
_create_csv_file(directory, "Contact", initial_contact, "initial_contact.csv")
# Verify initial relationship
acme_children = get_child_ids(_VALID_SALESFORCE_IDS[0])
acme_children = get_child_ids(directory, _VALID_SALESFORCE_IDS[0])
assert (
_VALID_SALESFORCE_IDS[40] in acme_children
), "Initial relationship not created"
@ -633,22 +636,22 @@ def _test_relationship_updates() -> None:
"LastName": "Contact",
}
]
_create_csv_file("Contact", updated_contact, "updated_contact.csv")
_create_csv_file(directory, "Contact", updated_contact, "updated_contact.csv")
# Verify old relationship is removed
acme_children = get_child_ids(_VALID_SALESFORCE_IDS[0])
acme_children = get_child_ids(directory, _VALID_SALESFORCE_IDS[0])
assert (
_VALID_SALESFORCE_IDS[40] not in acme_children
), "Old relationship not removed"
# Verify new relationship is created
globex_children = get_child_ids(_VALID_SALESFORCE_IDS[1])
globex_children = get_child_ids(directory, _VALID_SALESFORCE_IDS[1])
assert _VALID_SALESFORCE_IDS[40] in globex_children, "New relationship not created"
print("All relationship update tests passed successfully!")
def _test_get_affected_parent_ids() -> None:
def _test_get_affected_parent_ids(directory: str) -> None:
"""
Tests get_affected_parent_ids functionality by verifying:
1. IDs that are directly in the parent_types list are included
@ -683,13 +686,13 @@ def _test_get_affected_parent_ids() -> None:
# Create and update CSV files for test data
for object_type, records in test_data.items():
_create_csv_file(object_type, records)
_create_csv_file(directory, object_type, records)
# Test Case 1: Account directly in updated_ids and parent_types
updated_ids = [_VALID_SALESFORCE_IDS[1]] # Parent Account 2
parent_types = ["Account"]
affected_ids_by_type = dict(
get_affected_parent_ids_by_type(updated_ids, parent_types)
get_affected_parent_ids_by_type(directory, updated_ids, parent_types)
)
assert "Account" in affected_ids_by_type, "Account type not in affected_ids_by_type"
assert (
@ -700,7 +703,7 @@ def _test_get_affected_parent_ids() -> None:
updated_ids = [_VALID_SALESFORCE_IDS[40]] # Child Contact
parent_types = ["Account"]
affected_ids_by_type = dict(
get_affected_parent_ids_by_type(updated_ids, parent_types)
get_affected_parent_ids_by_type(directory, updated_ids, parent_types)
)
assert "Account" in affected_ids_by_type, "Account type not in affected_ids_by_type"
assert (
@ -711,7 +714,7 @@ def _test_get_affected_parent_ids() -> None:
updated_ids = [_VALID_SALESFORCE_IDS[1], _VALID_SALESFORCE_IDS[40]] # Both cases
parent_types = ["Account"]
affected_ids_by_type = dict(
get_affected_parent_ids_by_type(updated_ids, parent_types)
get_affected_parent_ids_by_type(directory, updated_ids, parent_types)
)
assert "Account" in affected_ids_by_type, "Account type not in affected_ids_by_type"
affected_ids = affected_ids_by_type["Account"]
@ -726,7 +729,7 @@ def _test_get_affected_parent_ids() -> None:
updated_ids = [_VALID_SALESFORCE_IDS[40]] # Child Contact
parent_types = ["Opportunity"] # Wrong type
affected_ids_by_type = dict(
get_affected_parent_ids_by_type(updated_ids, parent_types)
get_affected_parent_ids_by_type(directory, updated_ids, parent_types)
)
assert len(affected_ids_by_type) == 0, "Should return empty dict when no matches"
@ -734,13 +737,15 @@ def _test_get_affected_parent_ids() -> None:
def test_salesforce_sqlite() -> None:
_clear_sf_db()
init_db()
_create_csv_with_example_data()
_test_query()
_test_upsert()
_test_relationships()
_test_account_with_children()
_test_relationship_updates()
_test_get_affected_parent_ids()
_clear_sf_db()
directory = BASE_DATA_PATH
_clear_sf_db(directory)
init_db(directory)
_create_csv_with_example_data(directory)
_test_query(directory)
_test_upsert(directory)
_test_relationships(directory)
_test_account_with_children(directory)
_test_relationship_updates(directory)
_test_get_affected_parent_ids(directory)
_clear_sf_db(directory)