mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-26 20:08:38 +02:00
create a hubspot connector (#482)
This commit is contained in:
1
backend/.gitignore
vendored
1
backend/.gitignore
vendored
@@ -8,3 +8,4 @@ qdrant-data/
|
||||
typesense-data/
|
||||
.env
|
||||
vespa-app.zip
|
||||
dynamic_config_storage/
|
||||
|
@@ -59,6 +59,7 @@ class DocumentSource(str, Enum):
|
||||
NOTION = "notion"
|
||||
ZULIP = "zulip"
|
||||
LINEAR = "linear"
|
||||
HUBSPOT = "hubspot"
|
||||
|
||||
|
||||
class DocumentIndexType(str, Enum):
|
||||
|
@@ -22,6 +22,7 @@ from danswer.connectors.slack.connector import SlackLoadConnector
|
||||
from danswer.connectors.slack.connector import SlackPollConnector
|
||||
from danswer.connectors.web.connector import WebConnector
|
||||
from danswer.connectors.zulip.connector import ZulipConnector
|
||||
from danswer.connectors.hubspot.connector import HubSpotConnector
|
||||
|
||||
|
||||
class ConnectorMissingException(Exception):
|
||||
@@ -50,6 +51,7 @@ def identify_connector_class(
|
||||
DocumentSource.ZULIP: ZulipConnector,
|
||||
DocumentSource.GURU: GuruConnector,
|
||||
DocumentSource.LINEAR: LinearConnector,
|
||||
DocumentSource.HUBSPOT: HubSpotConnector,
|
||||
}
|
||||
connector_by_source = connector_map.get(source, {})
|
||||
|
||||
|
0
backend/danswer/connectors/hubspot/__init__.py
Normal file
0
backend/danswer/connectors/hubspot/__init__.py
Normal file
138
backend/danswer/connectors/hubspot/connector.py
Normal file
138
backend/danswer/connectors/hubspot/connector.py
Normal file
@@ -0,0 +1,138 @@
|
||||
import requests
|
||||
import json
|
||||
from typing import Any
|
||||
from datetime import datetime
|
||||
from hubspot import HubSpot
|
||||
from danswer.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.interfaces import GenerateDocumentsOutput
|
||||
from danswer.connectors.interfaces import LoadConnector
|
||||
from danswer.connectors.interfaces import PollConnector
|
||||
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
|
||||
from danswer.connectors.models import ConnectorMissingCredentialError
|
||||
from danswer.connectors.models import Document
|
||||
from danswer.connectors.models import Section
|
||||
from danswer.utils.logger import setup_logger
|
||||
|
||||
HUBSPOT_BASE_URL = "https://app.hubspot.com/contacts/"
|
||||
HUBSPOT_API_URL = "https://api.hubapi.com/integrations/v1/me"
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
class HubSpotConnector(LoadConnector, PollConnector):
|
||||
def __init__(self, batch_size: int = INDEX_BATCH_SIZE, access_token: str | None = None) -> None:
|
||||
self.batch_size = batch_size
|
||||
self.access_token = access_token
|
||||
self.portal_id = None
|
||||
self.ticket_base_url = HUBSPOT_BASE_URL
|
||||
|
||||
def get_portal_id(self) -> str:
|
||||
headers = {
|
||||
'Authorization': f'Bearer {self.access_token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
response = requests.get(HUBSPOT_API_URL, headers=headers)
|
||||
if response.status_code != 200:
|
||||
raise Exception("Error fetching portal ID")
|
||||
|
||||
data = response.json()
|
||||
return data["portalId"]
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
self.access_token = credentials["hubspot_access_token"]
|
||||
|
||||
if self.access_token:
|
||||
self.portal_id = self.get_portal_id()
|
||||
self.ticket_base_url = f"{HUBSPOT_BASE_URL}{self.portal_id}/ticket/"
|
||||
|
||||
return None
|
||||
|
||||
def _process_tickets(
|
||||
self, start: datetime | None = None, end: datetime | None = None
|
||||
) -> GenerateDocumentsOutput:
|
||||
if self.access_token is None:
|
||||
raise ConnectorMissingCredentialError("HubSpot")
|
||||
|
||||
api_client = HubSpot(access_token=self.access_token)
|
||||
all_tickets = api_client.crm.tickets.get_all(associations=['contacts', 'notes'])
|
||||
|
||||
doc_batch: list[Document] = []
|
||||
|
||||
for ticket in all_tickets:
|
||||
updated_at = ticket.updated_at.replace(tzinfo=None)
|
||||
if start is not None and updated_at < start:
|
||||
continue
|
||||
if end is not None and updated_at > end:
|
||||
continue
|
||||
|
||||
title = ticket.properties["subject"]
|
||||
link = self.ticket_base_url + ticket.id
|
||||
content_text = title + "\n" + ticket.properties["content"]
|
||||
|
||||
associated_emails = []
|
||||
associated_notes = []
|
||||
|
||||
if ticket.associations:
|
||||
contacts = ticket.associations.get("contacts")
|
||||
notes = ticket.associations.get("notes")
|
||||
|
||||
if contacts:
|
||||
for contact in contacts.results:
|
||||
contact = api_client.crm.contacts.basic_api.get_by_id(contact_id=contact.id)
|
||||
associated_emails.append(contact.properties["email"])
|
||||
|
||||
if notes:
|
||||
for note in notes.results:
|
||||
note = api_client.crm.objects.notes.basic_api.get_by_id(note_id=note.id, properties=["content", "hs_body_preview"])
|
||||
associated_notes.append(note.properties["hs_body_preview"])
|
||||
|
||||
associated_emails = " ,".join(associated_emails)
|
||||
associated_notes = " ".join(associated_notes)
|
||||
|
||||
content_text = f"{content_text}\n emails: {associated_emails} \n notes: {associated_notes}"
|
||||
|
||||
doc_batch.append(
|
||||
Document(
|
||||
id=ticket.id,
|
||||
sections=[Section(link=link, text=content_text)],
|
||||
source=DocumentSource.HUBSPOT,
|
||||
semantic_identifier=title,
|
||||
metadata={},
|
||||
)
|
||||
)
|
||||
|
||||
if len(doc_batch) >= self.batch_size:
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
|
||||
if doc_batch:
|
||||
yield doc_batch
|
||||
|
||||
def load_from_state(self) -> GenerateDocumentsOutput:
|
||||
return self._process_tickets()
|
||||
|
||||
def poll_source(
|
||||
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
||||
) -> GenerateDocumentsOutput:
|
||||
start_datetime = datetime.fromtimestamp(start)
|
||||
end_datetime = datetime.fromtimestamp(end)
|
||||
return self._process_tickets(start_datetime, end_datetime)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import os
|
||||
import time
|
||||
test_connector = HubSpotConnector()
|
||||
test_connector.load_credentials({
|
||||
"hubspot_access_token": os.environ["HUBSPOT_ACCESS_TOKEN"]
|
||||
})
|
||||
all_docs = test_connector.load_from_state()
|
||||
|
||||
current = time.time()
|
||||
one_day_ago = current - 24 * 60 * 60 # 1 day
|
||||
latest_docs = test_connector.poll_source(one_day_ago, current)
|
||||
print(latest_docs)
|
||||
|
||||
|
@@ -53,3 +53,4 @@ transformers==4.30.1
|
||||
typesense==0.15.1
|
||||
uvicorn==0.21.1
|
||||
zulip==0.8.2
|
||||
hubspot-api-client==8.1.0
|
||||
|
Reference in New Issue
Block a user