From 3f92ed9d29e465ac1fb581e87be854e77f6010bf Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Mon, 30 Dec 2024 19:06:28 -0800 Subject: [PATCH] Airtable connector (#3564) * Airtable connector * Improvements * improve * Clean up test * Add secrets * Fix mypy + add access token * Mock unstructured call * Adjust comments * Fix ID in test --- .../workflows/pr-python-connector-tests.yml | 6 +- backend/onyx/configs/constants.py | 1 + .../connectors/airtable/airtable_connector.py | 266 ++++++++++++++++++ backend/onyx/connectors/factory.py | 2 + backend/requirements/default.txt | 2 + .../airtable/test_airtable_basic.py | 192 +++++++++++++ web/public/Airtable.svg | 14 + web/src/components/icons/icons.tsx | 15 + web/src/lib/connectors/connectors.tsx | 24 +- web/src/lib/connectors/credentials.ts | 7 + web/src/lib/sources.ts | 19 +- web/src/lib/types.ts | 1 + 12 files changed, 541 insertions(+), 8 deletions(-) create mode 100644 backend/onyx/connectors/airtable/airtable_connector.py create mode 100644 backend/tests/daily/connectors/airtable/test_airtable_basic.py create mode 100644 web/public/Airtable.svg diff --git a/.github/workflows/pr-python-connector-tests.yml b/.github/workflows/pr-python-connector-tests.yml index f5067ad68..88c54b848 100644 --- a/.github/workflows/pr-python-connector-tests.yml +++ b/.github/workflows/pr-python-connector-tests.yml @@ -30,7 +30,11 @@ env: SF_USERNAME: ${{ secrets.SF_USERNAME }} SF_PASSWORD: ${{ secrets.SF_PASSWORD }} SF_SECURITY_TOKEN: ${{ secrets.SF_SECURITY_TOKEN }} - + # Airtable + AIRTABLE_TEST_BASE_ID: ${{ secrets.AIRTABLE_TEST_BASE_ID }} + AIRTABLE_TEST_TABLE_ID: ${{ secrets.AIRTABLE_TEST_TABLE_ID }} + AIRTABLE_TEST_TABLE_NAME: ${{ secrets.AIRTABLE_TEST_TABLE_NAME }} + AIRTABLE_ACCESS_TOKEN: ${{ secrets.AIRTABLE_ACCESS_TOKEN }} jobs: connectors-check: # See https://runs-on.com/runners/linux/ diff --git a/backend/onyx/configs/constants.py b/backend/onyx/configs/constants.py index 738b088fb..e104ddc4a 100644 --- a/backend/onyx/configs/constants.py +++ b/backend/onyx/configs/constants.py @@ -142,6 +142,7 @@ class DocumentSource(str, Enum): FRESHDESK = "freshdesk" FIREFLIES = "fireflies" EGNYTE = "egnyte" + AIRTABLE = "airtable" DocumentSourceRequiringTenantContext: list[DocumentSource] = [DocumentSource.FILE] diff --git a/backend/onyx/connectors/airtable/airtable_connector.py b/backend/onyx/connectors/airtable/airtable_connector.py new file mode 100644 index 000000000..2df356bf9 --- /dev/null +++ b/backend/onyx/connectors/airtable/airtable_connector.py @@ -0,0 +1,266 @@ +from io import BytesIO +from typing import Any + +import requests +from pyairtable import Api as AirtableApi +from retry import retry + +from onyx.configs.app_configs import INDEX_BATCH_SIZE +from onyx.configs.constants import DocumentSource +from onyx.connectors.interfaces import GenerateDocumentsOutput +from onyx.connectors.interfaces import LoadConnector +from onyx.connectors.models import Document +from onyx.connectors.models import Section +from onyx.file_processing.extract_file_text import extract_file_text +from onyx.file_processing.extract_file_text import get_file_ext +from onyx.utils.logger import setup_logger + +logger = setup_logger() + +# NOTE: all are made lowercase to avoid case sensitivity issues +# these are the field types that are considered metadata rather +# than sections +_METADATA_FIELD_TYPES = { + "singlecollaborator", + "collaborator", + "createdby", + "singleselect", + "multipleselects", + "checkbox", + "date", + "datetime", + "email", + "phone", + "url", + "number", + "currency", + "duration", + "percent", + "rating", + "createdtime", + "lastmodifiedtime", + "autonumber", + "rollup", + "lookup", + "count", + "formula", + "date", +} + + +class AirtableClientNotSetUpError(PermissionError): + def __init__(self) -> None: + super().__init__("Airtable Client is not set up, was load_credentials called?") + + +class AirtableConnector(LoadConnector): + def __init__( + self, + base_id: str, + table_name_or_id: str, + batch_size: int = INDEX_BATCH_SIZE, + ) -> None: + self.base_id = base_id + self.table_name_or_id = table_name_or_id + self.batch_size = batch_size + self.airtable_client: AirtableApi | None = None + + def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None: + self.airtable_client = AirtableApi(credentials["airtable_access_token"]) + return None + + def _get_field_value(self, field_info: Any, field_type: str) -> list[str]: + """ + Extract value(s) from a field regardless of its type. + Returns either a single string or list of strings for attachments. + """ + if field_info is None: + return [] + + # skip references to other records for now (would need to do another + # request to get the actual record name/type) + # TODO: support this + if field_type == "multipleRecordLinks": + return [] + + if field_type == "multipleAttachments": + attachment_texts: list[str] = [] + for attachment in field_info: + url = attachment.get("url") + filename = attachment.get("filename", "") + if not url: + continue + + @retry( + tries=5, + delay=1, + backoff=2, + max_delay=10, + ) + def get_attachment_with_retry(url: str) -> bytes | None: + attachment_response = requests.get(url) + if attachment_response.status_code == 200: + return attachment_response.content + return None + + attachment_content = get_attachment_with_retry(url) + if attachment_content: + try: + file_ext = get_file_ext(filename) + attachment_text = extract_file_text( + BytesIO(attachment_content), + filename, + break_on_unprocessable=False, + extension=file_ext, + ) + if attachment_text: + attachment_texts.append(f"{filename}:\n{attachment_text}") + except Exception as e: + logger.warning( + f"Failed to process attachment {filename}: {str(e)}" + ) + return attachment_texts + + if field_type in ["singleCollaborator", "collaborator", "createdBy"]: + combined = [] + collab_name = field_info.get("name") + collab_email = field_info.get("email") + if collab_name: + combined.append(collab_name) + if collab_email: + combined.append(f"({collab_email})") + return [" ".join(combined) if combined else str(field_info)] + + if isinstance(field_info, list): + return [str(item) for item in field_info] + + return [str(field_info)] + + def _should_be_metadata(self, field_type: str) -> bool: + """Determine if a field type should be treated as metadata.""" + return field_type.lower() in _METADATA_FIELD_TYPES + + def _process_field( + self, + field_name: str, + field_info: Any, + field_type: str, + table_id: str, + record_id: str, + ) -> tuple[list[Section], dict[str, Any]]: + """ + Process a single Airtable field and return sections or metadata. + + Args: + field_name: Name of the field + field_info: Raw field information from Airtable + field_type: Airtable field type + + Returns: + (list of Sections, dict of metadata) + """ + if field_info is None: + return [], {} + + # Get the value(s) for the field + field_values = self._get_field_value(field_info, field_type) + if len(field_values) == 0: + return [], {} + + # Determine if it should be metadata or a section + if self._should_be_metadata(field_type): + if len(field_values) > 1: + return [], {field_name: field_values} + return [], {field_name: field_values[0]} + + # Otherwise, create relevant sections + sections = [ + Section( + link=f"https://airtable.com/{self.base_id}/{table_id}/{record_id}", + text=( + f"{field_name}:\n" + "------------------------\n" + f"{text}\n" + "------------------------" + ), + ) + for text in field_values + ] + return sections, {} + + def load_from_state(self) -> GenerateDocumentsOutput: + """ + Fetch all records from the table. + + NOTE: Airtable does not support filtering by time updated, so + we have to fetch all records every time. + """ + if not self.airtable_client: + raise AirtableClientNotSetUpError() + + table = self.airtable_client.table(self.base_id, self.table_name_or_id) + table_id = table.id + record_pages = table.iterate() + + table_schema = table.schema() + # have to get the name from the schema, since the table object will + # give back the ID instead of the name if the ID is used to create + # the table object + table_name = table_schema.name + primary_field_name = None + + # Find a primary field from the schema + for field in table_schema.fields: + if field.id == table_schema.primary_field_id: + primary_field_name = field.name + break + + record_documents: list[Document] = [] + for page in record_pages: + for record in page: + record_id = record["id"] + fields = record["fields"] + sections: list[Section] = [] + metadata: dict[str, Any] = {} + + # Possibly retrieve the primary field's value + primary_field_value = ( + fields.get(primary_field_name) if primary_field_name else None + ) + for field_schema in table_schema.fields: + field_name = field_schema.name + field_val = fields.get(field_name) + field_type = field_schema.type + + field_sections, field_metadata = self._process_field( + field_name=field_name, + field_info=field_val, + field_type=field_type, + table_id=table_id, + record_id=record_id, + ) + + sections.extend(field_sections) + metadata.update(field_metadata) + + semantic_id = ( + f"{table_name}: {primary_field_value}" + if primary_field_value + else table_name + ) + + record_document = Document( + id=f"airtable__{record_id}", + sections=sections, + source=DocumentSource.AIRTABLE, + semantic_identifier=semantic_id, + metadata=metadata, + ) + record_documents.append(record_document) + + if len(record_documents) >= self.batch_size: + yield record_documents + record_documents = [] + + if record_documents: + yield record_documents diff --git a/backend/onyx/connectors/factory.py b/backend/onyx/connectors/factory.py index 5da314d58..8a08689c7 100644 --- a/backend/onyx/connectors/factory.py +++ b/backend/onyx/connectors/factory.py @@ -5,6 +5,7 @@ from sqlalchemy.orm import Session from onyx.configs.constants import DocumentSource from onyx.configs.constants import DocumentSourceRequiringTenantContext +from onyx.connectors.airtable.airtable_connector import AirtableConnector from onyx.connectors.asana.connector import AsanaConnector from onyx.connectors.axero.connector import AxeroConnector from onyx.connectors.blob.connector import BlobStorageConnector @@ -103,6 +104,7 @@ def identify_connector_class( DocumentSource.FRESHDESK: FreshdeskConnector, DocumentSource.FIREFLIES: FirefliesConnector, DocumentSource.EGNYTE: EgnyteConnector, + DocumentSource.AIRTABLE: AirtableConnector, } connector_by_source = connector_map.get(source, {}) diff --git a/backend/requirements/default.txt b/backend/requirements/default.txt index 959f8d38e..6ea72a018 100644 --- a/backend/requirements/default.txt +++ b/backend/requirements/default.txt @@ -23,6 +23,7 @@ httpcore==1.0.5 httpx[http2]==0.27.0 httpx-oauth==0.15.1 huggingface-hub==0.20.1 +inflection==0.5.1 jira==3.5.1 jsonref==1.1.0 trafilatura==1.12.2 @@ -43,6 +44,7 @@ openpyxl==3.1.2 playwright==1.41.2 psutil==5.9.5 psycopg2-binary==2.9.9 +pyairtable==3.0.1 pycryptodome==3.19.1 pydantic==2.8.2 PyGithub==1.58.2 diff --git a/backend/tests/daily/connectors/airtable/test_airtable_basic.py b/backend/tests/daily/connectors/airtable/test_airtable_basic.py new file mode 100644 index 000000000..88cfceb68 --- /dev/null +++ b/backend/tests/daily/connectors/airtable/test_airtable_basic.py @@ -0,0 +1,192 @@ +import os +from unittest.mock import MagicMock +from unittest.mock import patch + +import pytest + +from onyx.configs.constants import DocumentSource +from onyx.connectors.airtable.airtable_connector import AirtableConnector +from onyx.connectors.models import Document +from onyx.connectors.models import Section + + +@pytest.fixture( + params=[ + ("table_name", os.environ["AIRTABLE_TEST_TABLE_NAME"]), + ("table_id", os.environ["AIRTABLE_TEST_TABLE_ID"]), + ] +) +def airtable_connector(request: pytest.FixtureRequest) -> AirtableConnector: + param_type, table_identifier = request.param + connector = AirtableConnector( + base_id=os.environ["AIRTABLE_TEST_BASE_ID"], + table_name_or_id=table_identifier, + ) + + connector.load_credentials( + { + "airtable_access_token": os.environ["AIRTABLE_ACCESS_TOKEN"], + } + ) + return connector + + +def create_test_document( + id: str, + title: str, + description: str, + priority: str, + status: str, + # Link to another record is skipped for now + # category: str, + ticket_id: str, + created_time: str, + status_last_changed: str, + submitted_by: str, + assignee: str, + days_since_status_change: int | None, + attachments: list | None = None, +) -> Document: + link_base = f"https://airtable.com/{os.environ['AIRTABLE_TEST_BASE_ID']}/{os.environ['AIRTABLE_TEST_TABLE_ID']}" + sections = [ + Section( + text=f"Title:\n------------------------\n{title}\n------------------------", + link=f"{link_base}/{id}", + ), + Section( + text=f"Description:\n------------------------\n{description}\n------------------------", + link=f"{link_base}/{id}", + ), + ] + + if attachments: + for attachment in attachments: + sections.append( + Section( + text=f"Attachment:\n------------------------\n{attachment}\n------------------------", + link=f"{link_base}/{id}", + ), + ) + + return Document( + id=f"airtable__{id}", + sections=sections, + source=DocumentSource.AIRTABLE, + semantic_identifier=f"{os.environ['AIRTABLE_TEST_TABLE_NAME']}: {title}", + metadata={ + # "Category": category, + "Assignee": assignee, + "Submitted by": submitted_by, + "Priority": priority, + "Status": status, + "Created time": created_time, + "ID": ticket_id, + "Status last changed": status_last_changed, + **( + {"Days since status change": str(days_since_status_change)} + if days_since_status_change is not None + else {} + ), + }, + doc_updated_at=None, + primary_owners=None, + secondary_owners=None, + title=None, + from_ingestion_api=False, + additional_info=None, + ) + + +@patch( + "onyx.file_processing.extract_file_text.get_unstructured_api_key", + return_value=None, +) +def test_airtable_connector_basic( + mock_get_api_key: MagicMock, airtable_connector: AirtableConnector +) -> None: + doc_batch_generator = airtable_connector.load_from_state() + + doc_batch = next(doc_batch_generator) + with pytest.raises(StopIteration): + next(doc_batch_generator) + + assert len(doc_batch) == 2 + + expected_docs = [ + create_test_document( + id="rec8BnxDLyWeegOuO", + title="Slow Internet", + description="The internet connection is very slow.", + priority="Medium", + status="In Progress", + # Link to another record is skipped for now + # category="Data Science", + ticket_id="2", + created_time="2024-12-24T21:02:49.000Z", + status_last_changed="2024-12-24T21:02:49.000Z", + days_since_status_change=0, + assignee="Chris Weaver (chris@onyx.app)", + submitted_by="Chris Weaver (chris@onyx.app)", + ), + create_test_document( + id="reccSlIA4pZEFxPBg", + title="Printer Issue", + description="The office printer is not working.", + priority="High", + status="Open", + # Link to another record is skipped for now + # category="Software Development", + ticket_id="1", + created_time="2024-12-24T21:02:49.000Z", + status_last_changed="2024-12-24T21:02:49.000Z", + days_since_status_change=0, + assignee="Chris Weaver (chris@onyx.app)", + submitted_by="Chris Weaver (chris@onyx.app)", + attachments=["Test.pdf:\ntesting!!!"], + ), + ] + + # Compare each document field by field + for actual, expected in zip(doc_batch, expected_docs): + assert actual.id == expected.id, f"ID mismatch for document {actual.id}" + assert ( + actual.source == expected.source + ), f"Source mismatch for document {actual.id}" + assert ( + actual.semantic_identifier == expected.semantic_identifier + ), f"Semantic identifier mismatch for document {actual.id}" + assert ( + actual.metadata == expected.metadata + ), f"Metadata mismatch for document {actual.id}" + assert ( + actual.doc_updated_at == expected.doc_updated_at + ), f"Updated at mismatch for document {actual.id}" + assert ( + actual.primary_owners == expected.primary_owners + ), f"Primary owners mismatch for document {actual.id}" + assert ( + actual.secondary_owners == expected.secondary_owners + ), f"Secondary owners mismatch for document {actual.id}" + assert ( + actual.title == expected.title + ), f"Title mismatch for document {actual.id}" + assert ( + actual.from_ingestion_api == expected.from_ingestion_api + ), f"Ingestion API flag mismatch for document {actual.id}" + assert ( + actual.additional_info == expected.additional_info + ), f"Additional info mismatch for document {actual.id}" + + # Compare sections + assert len(actual.sections) == len( + expected.sections + ), f"Number of sections mismatch for document {actual.id}" + for i, (actual_section, expected_section) in enumerate( + zip(actual.sections, expected.sections) + ): + assert ( + actual_section.text == expected_section.text + ), f"Section {i} text mismatch for document {actual.id}" + assert ( + actual_section.link == expected_section.link + ), f"Section {i} link mismatch for document {actual.id}" diff --git a/web/public/Airtable.svg b/web/public/Airtable.svg new file mode 100644 index 000000000..a37923102 --- /dev/null +++ b/web/public/Airtable.svg @@ -0,0 +1,14 @@ + + + \ No newline at end of file diff --git a/web/src/components/icons/icons.tsx b/web/src/components/icons/icons.tsx index d1c84ea75..b0fa46fef 100644 --- a/web/src/components/icons/icons.tsx +++ b/web/src/components/icons/icons.tsx @@ -68,6 +68,7 @@ import zendeskIcon from "../../../public/Zendesk.svg"; import dropboxIcon from "../../../public/Dropbox.png"; import egnyteIcon from "../../../public/Egnyte.png"; import slackIcon from "../../../public/Slack.png"; +import airtableIcon from "../../../public/Airtable.svg"; import s3Icon from "../../../public/S3.png"; import r2Icon from "../../../public/r2.png"; @@ -2769,3 +2770,17 @@ export const EgnyteIcon = ({ ); }; + +export const AirtableIcon = ({ + size = 16, + className = defaultTailwindCSS, +}: IconProps) => { + return ( +