mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-19 12:03:54 +02:00
Product board connector (#228)
Also fixes misc mypy issues across the repo
This commit is contained in:
@@ -26,5 +26,6 @@ class DocumentSource(str, Enum):
|
||||
CONFLUENCE = "confluence"
|
||||
SLAB = "slab"
|
||||
JIRA = "jira"
|
||||
PRODUCTBOARD = "productboard"
|
||||
FILE = "file"
|
||||
NOTION = "notion"
|
||||
|
@@ -1,13 +1,18 @@
|
||||
from typing import Any
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
class BookStackClientRequestFailedError(ConnectionError):
|
||||
def __init__(self, status: int, error: str) -> None:
|
||||
super().__init__(
|
||||
"BookStack Client request failed with status {status}: {error}".format(status=status, error=error)
|
||||
"BookStack Client request failed with status {status}: {error}".format(
|
||||
status=status, error=error
|
||||
)
|
||||
)
|
||||
|
||||
class BookStackApiClient:
|
||||
|
||||
class BookStackApiClient:
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
@@ -18,7 +23,7 @@ class BookStackApiClient:
|
||||
self.token_id = token_id
|
||||
self.token_secret = token_secret
|
||||
|
||||
def get(self, endpoint: str, params: dict[str, str]):
|
||||
def get(self, endpoint: str, params: dict[str, str]) -> dict[str, Any]:
|
||||
url: str = self._build_url(endpoint)
|
||||
headers = self._build_headers()
|
||||
response = requests.get(url, headers=headers, params=params)
|
||||
@@ -38,15 +43,15 @@ class BookStackApiClient:
|
||||
|
||||
return json
|
||||
|
||||
def _build_headers(self):
|
||||
auth = 'Token ' + self.token_id + ':' + self.token_secret
|
||||
def _build_headers(self) -> dict[str, str]:
|
||||
auth = "Token " + self.token_id + ":" + self.token_secret
|
||||
return {
|
||||
'Authorization': auth,
|
||||
'Accept': 'application/json',
|
||||
"Authorization": auth,
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
def _build_url(self, endpoint: str):
|
||||
return self.base_url.rstrip('/') + '/api/' + endpoint.lstrip('/')
|
||||
def _build_url(self, endpoint: str) -> str:
|
||||
return self.base_url.rstrip("/") + "/api/" + endpoint.lstrip("/")
|
||||
|
||||
def build_app_url(self, endpoint: str):
|
||||
return self.base_url.rstrip('/') + '/' + endpoint.lstrip('/')
|
||||
def build_app_url(self, endpoint: str) -> str:
|
||||
return self.base_url.rstrip("/") + "/" + endpoint.lstrip("/")
|
||||
|
@@ -8,20 +8,18 @@ from bs4 import BeautifulSoup
|
||||
from danswer.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.configs.constants import HTML_SEPARATOR
|
||||
from danswer.connectors.bookstack.client import BookStackApiClient
|
||||
from danswer.connectors.interfaces import GenerateDocumentsOutput
|
||||
from danswer.connectors.interfaces import LoadConnector
|
||||
from danswer.connectors.interfaces import PollConnector
|
||||
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
|
||||
from danswer.connectors.bookstack.client import BookStackApiClient
|
||||
from danswer.connectors.models import Document
|
||||
from danswer.connectors.models import Section
|
||||
|
||||
|
||||
class BookstackClientNotSetUpError(PermissionError):
|
||||
def __init__(self) -> None:
|
||||
super().__init__(
|
||||
"BookStack Client is not set up, was load_credentials called?"
|
||||
)
|
||||
super().__init__("BookStack Client is not set up, was load_credentials called?")
|
||||
|
||||
|
||||
class BookstackConnector(LoadConnector, PollConnector):
|
||||
@@ -40,10 +38,12 @@ class BookstackConnector(LoadConnector, PollConnector):
|
||||
)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _get_doc_batch(
|
||||
self,
|
||||
batch_size: int,
|
||||
bookstack_client: BookStackApiClient,
|
||||
endpoint: str,
|
||||
transformer: Callable[[dict], Document],
|
||||
transformer: Callable[[BookStackApiClient, dict], Document],
|
||||
start_ind: int,
|
||||
start: SecondsSinceUnixEpoch | None = None,
|
||||
end: SecondsSinceUnixEpoch | None = None,
|
||||
@@ -51,70 +51,90 @@ class BookstackConnector(LoadConnector, PollConnector):
|
||||
doc_batch: list[Document] = []
|
||||
|
||||
params = {
|
||||
"count": str(self.batch_size),
|
||||
"count": str(batch_size),
|
||||
"offset": str(start_ind),
|
||||
"sort": "+id"
|
||||
"sort": "+id",
|
||||
}
|
||||
|
||||
if start:
|
||||
params["filter[updated_at:gte]"] = datetime.utcfromtimestamp(start).strftime('%Y-%m-%d %H:%M:%S')
|
||||
params["filter[updated_at:gte]"] = datetime.utcfromtimestamp(
|
||||
start
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
if end:
|
||||
params["filter[updated_at:lte]"] = datetime.utcfromtimestamp(end).strftime('%Y-%m-%d %H:%M:%S')
|
||||
params["filter[updated_at:lte]"] = datetime.utcfromtimestamp(end).strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
|
||||
batch = self.bookstack_client.get(endpoint, params=params).get("data", [])
|
||||
batch = bookstack_client.get(endpoint, params=params).get("data", [])
|
||||
for item in batch:
|
||||
doc_batch.append(transformer(item))
|
||||
doc_batch.append(transformer(bookstack_client, item))
|
||||
|
||||
return doc_batch, len(batch)
|
||||
|
||||
def _book_to_document(self, book: dict):
|
||||
url = self.bookstack_client.build_app_url("/books/" + book.get("slug"))
|
||||
@staticmethod
|
||||
def _book_to_document(
|
||||
bookstack_client: BookStackApiClient, book: dict[str, Any]
|
||||
) -> Document:
|
||||
url = bookstack_client.build_app_url("/books/" + str(book.get("slug")))
|
||||
text = book.get("name", "") + "\n" + book.get("description", "")
|
||||
return Document(
|
||||
id="book:" + str(book.get("id")),
|
||||
sections=[Section(link=url, text=text)],
|
||||
source=DocumentSource.BOOKSTACK,
|
||||
semantic_identifier="Book: " + book.get("name"),
|
||||
metadata={
|
||||
"type": "book",
|
||||
"updated_at": book.get("updated_at")
|
||||
},
|
||||
semantic_identifier="Book: " + str(book.get("name")),
|
||||
metadata={"type": "book", "updated_at": str(book.get("updated_at"))},
|
||||
)
|
||||
|
||||
def _chapter_to_document(self, chapter: dict):
|
||||
url = self.bookstack_client.build_app_url("/books/" + chapter.get("book_slug") + "/chapter/" + chapter.get("slug"))
|
||||
@staticmethod
|
||||
def _chapter_to_document(
|
||||
bookstack_client: BookStackApiClient, chapter: dict[str, Any]
|
||||
) -> Document:
|
||||
url = bookstack_client.build_app_url(
|
||||
"/books/"
|
||||
+ str(chapter.get("book_slug"))
|
||||
+ "/chapter/"
|
||||
+ str(chapter.get("slug"))
|
||||
)
|
||||
text = chapter.get("name", "") + "\n" + chapter.get("description", "")
|
||||
return Document(
|
||||
id="chapter:" + str(chapter.get("id")),
|
||||
sections=[Section(link=url, text=text)],
|
||||
source=DocumentSource.BOOKSTACK,
|
||||
semantic_identifier="Chapter: " + chapter.get("name"),
|
||||
metadata={
|
||||
"type": "chapter",
|
||||
"updated_at": chapter.get("updated_at")
|
||||
},
|
||||
semantic_identifier="Chapter: " + str(chapter.get("name")),
|
||||
metadata={"type": "chapter", "updated_at": str(chapter.get("updated_at"))},
|
||||
)
|
||||
|
||||
def _shelf_to_document(self, shelf: dict):
|
||||
url = self.bookstack_client.build_app_url("/shelves/" + shelf.get("slug"))
|
||||
@staticmethod
|
||||
def _shelf_to_document(
|
||||
bookstack_client: BookStackApiClient, shelf: dict[str, Any]
|
||||
) -> Document:
|
||||
url = bookstack_client.build_app_url("/shelves/" + str(shelf.get("slug")))
|
||||
text = shelf.get("name", "") + "\n" + shelf.get("description", "")
|
||||
return Document(
|
||||
id="shelf:" + str(shelf.get("id")),
|
||||
sections=[Section(link=url, text=text)],
|
||||
source=DocumentSource.BOOKSTACK,
|
||||
semantic_identifier="Shelf: " + shelf.get("name"),
|
||||
metadata={
|
||||
"type": "shelf",
|
||||
"updated_at": shelf.get("updated_at")
|
||||
},
|
||||
semantic_identifier="Shelf: " + str(shelf.get("name")),
|
||||
metadata={"type": "shelf", "updated_at": shelf.get("updated_at")},
|
||||
)
|
||||
|
||||
def _page_to_document(self, page: dict):
|
||||
@staticmethod
|
||||
def _page_to_document(
|
||||
bookstack_client: BookStackApiClient, page: dict[str, Any]
|
||||
) -> Document:
|
||||
page_id = str(page.get("id"))
|
||||
page_data = self.bookstack_client.get("/pages/" + page_id, {})
|
||||
url = self.bookstack_client.build_app_url("/books/" + page.get("book_slug") + "/page/" + page_data.get("slug"))
|
||||
page_html = "<h1>" + html.escape(page_data.get("name")) + "</h1>" + page_data.get("html")
|
||||
page_name = str(page.get("name"))
|
||||
page_data = bookstack_client.get("/pages/" + page_id, {})
|
||||
url = bookstack_client.build_app_url(
|
||||
"/books/"
|
||||
+ str(page.get("book_slug"))
|
||||
+ "/page/"
|
||||
+ str(page_data.get("slug"))
|
||||
)
|
||||
page_html = (
|
||||
"<h1>" + html.escape(page_name) + "</h1>" + str(page_data.get("html"))
|
||||
)
|
||||
soup = BeautifulSoup(page_html, "html.parser")
|
||||
text = soup.get_text(HTML_SEPARATOR)
|
||||
time.sleep(0.1)
|
||||
@@ -122,11 +142,8 @@ class BookstackConnector(LoadConnector, PollConnector):
|
||||
id="page:" + page_id,
|
||||
sections=[Section(link=url, text=text)],
|
||||
source=DocumentSource.BOOKSTACK,
|
||||
semantic_identifier="Page: " + page_data.get("name"),
|
||||
metadata={
|
||||
"type": "page",
|
||||
"updated_at": page_data.get("updated_at")
|
||||
},
|
||||
semantic_identifier="Page: " + str(page_name),
|
||||
metadata={"type": "page", "updated_at": page_data.get("updated_at")},
|
||||
)
|
||||
|
||||
def load_from_state(self) -> GenerateDocumentsOutput:
|
||||
@@ -141,7 +158,9 @@ class BookstackConnector(LoadConnector, PollConnector):
|
||||
if self.bookstack_client is None:
|
||||
raise BookstackClientNotSetUpError()
|
||||
|
||||
transform_by_endpoint: dict[str, Callable[[dict], Document]] = {
|
||||
transform_by_endpoint: dict[
|
||||
str, Callable[[BookStackApiClient, dict], Document]
|
||||
] = {
|
||||
"/books": self._book_to_document,
|
||||
"/chapters": self._chapter_to_document,
|
||||
"/shelves": self._shelf_to_document,
|
||||
@@ -151,7 +170,15 @@ class BookstackConnector(LoadConnector, PollConnector):
|
||||
for endpoint, transform in transform_by_endpoint.items():
|
||||
start_ind = 0
|
||||
while True:
|
||||
doc_batch, num_results = self._get_doc_batch(endpoint, transform, start_ind, start, end)
|
||||
doc_batch, num_results = self._get_doc_batch(
|
||||
batch_size=self.batch_size,
|
||||
bookstack_client=self.bookstack_client,
|
||||
endpoint=endpoint,
|
||||
transformer=transform,
|
||||
start_ind=start_ind,
|
||||
start=start,
|
||||
end=end,
|
||||
)
|
||||
start_ind += num_results
|
||||
if doc_batch:
|
||||
yield doc_batch
|
||||
|
@@ -14,6 +14,7 @@ from danswer.connectors.interfaces import EventConnector
|
||||
from danswer.connectors.interfaces import LoadConnector
|
||||
from danswer.connectors.interfaces import PollConnector
|
||||
from danswer.connectors.models import InputType
|
||||
from danswer.connectors.productboard.connector import ProductboardConnector
|
||||
from danswer.connectors.slab.connector import SlabConnector
|
||||
from danswer.connectors.slack.connector import SlackLoadConnector
|
||||
from danswer.connectors.slack.connector import SlackPollConnector
|
||||
@@ -42,6 +43,7 @@ def identify_connector_class(
|
||||
DocumentSource.BOOKSTACK: BookstackConnector,
|
||||
DocumentSource.CONFLUENCE: ConfluenceConnector,
|
||||
DocumentSource.JIRA: JiraConnector,
|
||||
DocumentSource.PRODUCTBOARD: ProductboardConnector,
|
||||
DocumentSource.SLAB: SlabConnector,
|
||||
DocumentSource.NOTION: NotionConnector,
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
import datetime
|
||||
import io
|
||||
from collections.abc import Generator
|
||||
from collections.abc import Sequence
|
||||
from itertools import chain
|
||||
from typing import Any
|
||||
|
||||
@@ -170,11 +171,12 @@ class GoogleDriveConnector(LoadConnector, PollConnector):
|
||||
folder_names = path.split("/")
|
||||
parent_id = "root"
|
||||
for folder_name in folder_names:
|
||||
parent_id = get_folder_id(
|
||||
found_parent_id = get_folder_id(
|
||||
service=service, parent_id=parent_id, folder_name=folder_name
|
||||
)
|
||||
if parent_id is None:
|
||||
if found_parent_id is None:
|
||||
raise ValueError(f"Folder path '{path}' not found in Google Drive")
|
||||
parent_id = found_parent_id
|
||||
folder_ids.append(parent_id)
|
||||
|
||||
return folder_ids
|
||||
@@ -199,7 +201,9 @@ class GoogleDriveConnector(LoadConnector, PollConnector):
|
||||
raise PermissionError("Not logged into Google Drive")
|
||||
|
||||
service = discovery.build("drive", "v3", credentials=self.creds)
|
||||
folder_ids = self._process_folder_paths(service, self.folder_paths)
|
||||
folder_ids: Sequence[str | None] = self._process_folder_paths(
|
||||
service, self.folder_paths
|
||||
)
|
||||
if not folder_ids:
|
||||
folder_ids = [None]
|
||||
|
||||
|
@@ -18,7 +18,7 @@ class Document:
|
||||
sections: list[Section]
|
||||
source: DocumentSource
|
||||
semantic_identifier: str | None
|
||||
metadata: dict[str, Any] | None
|
||||
metadata: dict[str, Any]
|
||||
|
||||
|
||||
class InputType(str, Enum):
|
||||
|
@@ -1,10 +1,13 @@
|
||||
"""Notion reader."""
|
||||
import time
|
||||
from dataclasses import dataclass, fields
|
||||
from typing import Any, Dict, List, Optional
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import fields
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
|
||||
from danswer.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.interfaces import GenerateDocumentsOutput
|
||||
@@ -26,7 +29,7 @@ class NotionPage:
|
||||
properties: Dict[str, Any]
|
||||
url: str
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||
names = set([f.name for f in fields(self)])
|
||||
for k, v in kwargs.items():
|
||||
if k in names:
|
||||
@@ -41,7 +44,7 @@ class NotionSearchResponse:
|
||||
next_cursor: Optional[str]
|
||||
has_more: bool = False
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
def __init__(self, **kwargs: dict[str, Any]) -> None:
|
||||
names = set([f.name for f in fields(self)])
|
||||
for k, v in kwargs.items():
|
||||
if k in names:
|
||||
|
0
backend/danswer/connectors/productboard/__init__.py
Normal file
0
backend/danswer/connectors/productboard/__init__.py
Normal file
253
backend/danswer/connectors/productboard/connector.py
Normal file
253
backend/danswer/connectors/productboard/connector.py
Normal file
@@ -0,0 +1,253 @@
|
||||
from collections.abc import Generator
|
||||
from itertools import chain
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from danswer.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.interfaces import GenerateDocumentsOutput
|
||||
from danswer.connectors.interfaces import PollConnector
|
||||
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
|
||||
from danswer.connectors.models import Document
|
||||
from danswer.connectors.models import Section
|
||||
from danswer.utils.logger import setup_logger
|
||||
from dateutil import parser
|
||||
from retry import retry
|
||||
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
_PRODUCT_BOARD_BASE_URL = "https://api.productboard.com"
|
||||
|
||||
|
||||
class ProductboardApiError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class ProductboardConnector(PollConnector):
|
||||
def __init__(
|
||||
self,
|
||||
batch_size: int = INDEX_BATCH_SIZE,
|
||||
) -> None:
|
||||
self.batch_size = batch_size
|
||||
self.access_token: str | None = None
|
||||
|
||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||
self.access_token = credentials["productboard_access_token"]
|
||||
return None
|
||||
|
||||
def _build_headers(self) -> dict[str, str]:
|
||||
return {
|
||||
"Authorization": f"Bearer {self.access_token}",
|
||||
"X-Version": "1",
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _parse_description_html(description_html: str) -> str:
|
||||
soup = BeautifulSoup(description_html, "html.parser")
|
||||
return soup.get_text()
|
||||
|
||||
@staticmethod
|
||||
def _get_owner_email(productboard_obj: dict[str, Any]) -> str | None:
|
||||
owner_dict = cast(dict[str, str] | None, productboard_obj.get("owner"))
|
||||
if not owner_dict:
|
||||
return None
|
||||
return owner_dict.get("email")
|
||||
|
||||
def _fetch_documents(
|
||||
self,
|
||||
initial_link: str,
|
||||
) -> Generator[dict[str, Any], None, None]:
|
||||
headers = self._build_headers()
|
||||
|
||||
@retry(tries=3, delay=1, backoff=2)
|
||||
def fetch(link: str) -> dict[str, Any]:
|
||||
response = requests.get(link, headers=headers)
|
||||
if not response.ok:
|
||||
# rate-limiting is at 50 requests per second.
|
||||
# The delay in this retry should handle this while this is
|
||||
# not parallelized.
|
||||
raise ProductboardApiError(
|
||||
"Failed to fetch from productboard - status code:"
|
||||
f" {response.status_code} - response: {response.text}"
|
||||
)
|
||||
|
||||
return response.json()
|
||||
|
||||
curr_link = initial_link
|
||||
while True:
|
||||
response_json = fetch(curr_link)
|
||||
for entity in response_json["data"]:
|
||||
yield entity
|
||||
|
||||
curr_link = response_json.get("links", {}).get("next")
|
||||
if not curr_link:
|
||||
break
|
||||
|
||||
def _get_features(self) -> Generator[Document, None, None]:
|
||||
"""A Feature is like a ticket in Jira"""
|
||||
for feature in self._fetch_documents(
|
||||
initial_link=f"{_PRODUCT_BOARD_BASE_URL}/features"
|
||||
):
|
||||
yield Document(
|
||||
id=feature["id"],
|
||||
sections=[
|
||||
Section(
|
||||
link=feature["links"]["html"],
|
||||
text=" - ".join(
|
||||
(
|
||||
feature["name"],
|
||||
self._parse_description_html(feature["description"]),
|
||||
)
|
||||
),
|
||||
)
|
||||
],
|
||||
semantic_identifier=feature["name"],
|
||||
source=DocumentSource.PRODUCTBOARD,
|
||||
metadata={
|
||||
"productboard_entity_type": feature["type"],
|
||||
"status": feature["status"]["name"],
|
||||
"owner": self._get_owner_email(feature),
|
||||
"updated_at": feature["updatedAt"],
|
||||
},
|
||||
)
|
||||
|
||||
def _get_components(self) -> Generator[Document, None, None]:
|
||||
"""A Component is like an epic in Jira. It contains Features"""
|
||||
for component in self._fetch_documents(
|
||||
initial_link=f"{_PRODUCT_BOARD_BASE_URL}/components"
|
||||
):
|
||||
yield Document(
|
||||
id=component["id"],
|
||||
sections=[
|
||||
Section(
|
||||
link=component["links"]["html"],
|
||||
text=" - ".join(
|
||||
(
|
||||
component["name"],
|
||||
self._parse_description_html(component["description"]),
|
||||
)
|
||||
),
|
||||
)
|
||||
],
|
||||
semantic_identifier=component["name"],
|
||||
source=DocumentSource.PRODUCTBOARD,
|
||||
metadata={
|
||||
"productboard_entity_type": "component",
|
||||
"owner": self._get_owner_email(component),
|
||||
"updated_at": component["updatedAt"],
|
||||
},
|
||||
)
|
||||
|
||||
def _get_products(self) -> Generator[Document, None, None]:
|
||||
"""A Product is the highest level of organization.
|
||||
A Product contains components, which contains features."""
|
||||
for product in self._fetch_documents(
|
||||
initial_link=f"{_PRODUCT_BOARD_BASE_URL}/products"
|
||||
):
|
||||
yield Document(
|
||||
id=product["id"],
|
||||
sections=[
|
||||
Section(
|
||||
link=product["links"]["html"],
|
||||
text=" - ".join(
|
||||
(
|
||||
product["name"],
|
||||
self._parse_description_html(product["description"]),
|
||||
)
|
||||
),
|
||||
)
|
||||
],
|
||||
semantic_identifier=product["name"],
|
||||
source=DocumentSource.PRODUCTBOARD,
|
||||
metadata={
|
||||
"productboard_entity_type": "product",
|
||||
"owner": self._get_owner_email(product),
|
||||
"updated_at": product["updatedAt"],
|
||||
},
|
||||
)
|
||||
|
||||
def _get_objectives(self) -> Generator[Document, None, None]:
|
||||
for objective in self._fetch_documents(
|
||||
initial_link=f"{_PRODUCT_BOARD_BASE_URL}/objectives"
|
||||
):
|
||||
yield Document(
|
||||
id=objective["id"],
|
||||
sections=[
|
||||
Section(
|
||||
link=objective["links"]["html"],
|
||||
text=" - ".join(
|
||||
(
|
||||
objective["name"],
|
||||
self._parse_description_html(objective["description"]),
|
||||
)
|
||||
),
|
||||
)
|
||||
],
|
||||
semantic_identifier=objective["name"],
|
||||
source=DocumentSource.PRODUCTBOARD,
|
||||
metadata={
|
||||
"productboard_entity_type": "release",
|
||||
"state": objective["state"],
|
||||
"owner": self._get_owner_email(objective),
|
||||
"updated_at": objective["updatedAt"],
|
||||
},
|
||||
)
|
||||
|
||||
def _is_updated_at_out_of_time_range(
|
||||
self,
|
||||
document: Document,
|
||||
start: SecondsSinceUnixEpoch,
|
||||
end: SecondsSinceUnixEpoch,
|
||||
) -> bool:
|
||||
updated_at = cast(str, document.metadata.get("updated_at", ""))
|
||||
if updated_at:
|
||||
updated_at_datetime = parser.parse(updated_at)
|
||||
if (
|
||||
updated_at_datetime.timestamp() < start
|
||||
or updated_at_datetime.timestamp() > end
|
||||
):
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Unable to find updated_at for document '{document.id}'")
|
||||
|
||||
return False
|
||||
|
||||
def poll_source(
|
||||
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
||||
) -> GenerateDocumentsOutput:
|
||||
if self.access_token is None:
|
||||
raise PermissionError(
|
||||
"Access token is not set up, was load_credentials called?"
|
||||
)
|
||||
|
||||
document_batch: list[Document] = []
|
||||
|
||||
# NOTE: there is a concept of a "Note" in productboard, however
|
||||
# there is no read API for it atm. Additionally, comments are not
|
||||
# included with features. Finally, "Releases" are not fetched atm,
|
||||
# since they do not provide an updatedAt.
|
||||
feature_documents = self._get_features()
|
||||
component_documents = self._get_components()
|
||||
product_documents = self._get_products()
|
||||
objective_documents = self._get_objectives()
|
||||
for document in chain(
|
||||
feature_documents,
|
||||
component_documents,
|
||||
product_documents,
|
||||
objective_documents,
|
||||
):
|
||||
# skip documents that are not in the time range
|
||||
if self._is_updated_at_out_of_time_range(document, start, end):
|
||||
continue
|
||||
|
||||
document_batch.append(document)
|
||||
if len(document_batch) >= self.batch_size:
|
||||
yield document_batch
|
||||
document_batch = []
|
||||
|
||||
if document_batch:
|
||||
yield document_batch
|
@@ -60,6 +60,6 @@ def log_generator_function_time(
|
||||
f"{func_name or func.__name__} took {time.time() - start_time} seconds"
|
||||
)
|
||||
|
||||
return cast(F, wrapped_func)
|
||||
return cast(FG, wrapped_func)
|
||||
|
||||
return timing_wrapper
|
||||
|
Reference in New Issue
Block a user