mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-04-01 00:18:18 +02:00
288 lines
9.7 KiB
Python
288 lines
9.7 KiB
Python
import os
|
|
from datetime import datetime
|
|
from datetime import timezone
|
|
from typing import Any
|
|
from typing import cast
|
|
|
|
import requests
|
|
|
|
from onyx.configs.app_configs import INDEX_BATCH_SIZE
|
|
from onyx.configs.app_configs import LINEAR_CLIENT_ID
|
|
from onyx.configs.app_configs import LINEAR_CLIENT_SECRET
|
|
from onyx.configs.constants import DocumentSource
|
|
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
|
|
get_oauth_callback_uri,
|
|
)
|
|
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
|
|
from onyx.connectors.interfaces import GenerateDocumentsOutput
|
|
from onyx.connectors.interfaces import LoadConnector
|
|
from onyx.connectors.interfaces import OAuthConnector
|
|
from onyx.connectors.interfaces import PollConnector
|
|
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
|
from onyx.connectors.models import ConnectorMissingCredentialError
|
|
from onyx.connectors.models import Document
|
|
from onyx.connectors.models import Section
|
|
from onyx.utils.logger import setup_logger
|
|
from onyx.utils.retry_wrapper import request_with_retries
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
_NUM_RETRIES = 5
|
|
_TIMEOUT = 60
|
|
_LINEAR_GRAPHQL_URL = "https://api.linear.app/graphql"
|
|
|
|
|
|
def _make_query(request_body: dict[str, Any], api_key: str) -> requests.Response:
|
|
headers = {
|
|
"Authorization": api_key,
|
|
"Content-Type": "application/json",
|
|
}
|
|
|
|
for i in range(_NUM_RETRIES):
|
|
try:
|
|
response = requests.post(
|
|
_LINEAR_GRAPHQL_URL,
|
|
headers=headers,
|
|
json=request_body,
|
|
timeout=_TIMEOUT,
|
|
)
|
|
if not response.ok:
|
|
raise RuntimeError(
|
|
f"Error fetching issues from Linear: {response.text}"
|
|
)
|
|
|
|
return response
|
|
except Exception as e:
|
|
if i == _NUM_RETRIES - 1:
|
|
raise e
|
|
|
|
logger.warning(f"A Linear GraphQL error occurred: {e}. Retrying...")
|
|
|
|
raise RuntimeError(
|
|
"Unexpected execution when querying Linear. This should never happen."
|
|
)
|
|
|
|
|
|
class LinearConnector(LoadConnector, PollConnector, OAuthConnector):
|
|
def __init__(
|
|
self,
|
|
batch_size: int = INDEX_BATCH_SIZE,
|
|
) -> None:
|
|
self.batch_size = batch_size
|
|
self.linear_api_key: str | None = None
|
|
|
|
@classmethod
|
|
def oauth_id(cls) -> DocumentSource:
|
|
return DocumentSource.LINEAR
|
|
|
|
@classmethod
|
|
def oauth_authorization_url(
|
|
cls, base_domain: str, state: str, additional_kwargs: dict[str, str]
|
|
) -> str:
|
|
if not LINEAR_CLIENT_ID:
|
|
raise ValueError("LINEAR_CLIENT_ID environment variable must be set")
|
|
|
|
callback_uri = get_oauth_callback_uri(base_domain, DocumentSource.LINEAR.value)
|
|
return (
|
|
f"https://linear.app/oauth/authorize"
|
|
f"?client_id={LINEAR_CLIENT_ID}"
|
|
f"&redirect_uri={callback_uri}"
|
|
f"&response_type=code"
|
|
f"&scope=read"
|
|
f"&state={state}"
|
|
f"&prompt=consent" # prompts user for access; allows choosing workspace
|
|
)
|
|
|
|
@classmethod
|
|
def oauth_code_to_token(
|
|
cls, base_domain: str, code: str, additional_kwargs: dict[str, str]
|
|
) -> dict[str, Any]:
|
|
data = {
|
|
"code": code,
|
|
"redirect_uri": get_oauth_callback_uri(
|
|
base_domain, DocumentSource.LINEAR.value
|
|
),
|
|
"client_id": LINEAR_CLIENT_ID,
|
|
"client_secret": LINEAR_CLIENT_SECRET,
|
|
"grant_type": "authorization_code",
|
|
}
|
|
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
|
|
|
response = request_with_retries(
|
|
method="POST",
|
|
url="https://api.linear.app/oauth/token",
|
|
data=data,
|
|
headers=headers,
|
|
backoff=0,
|
|
delay=0.1,
|
|
)
|
|
if not response.ok:
|
|
raise RuntimeError(f"Failed to exchange code for token: {response.text}")
|
|
|
|
token_data = response.json()
|
|
|
|
return {
|
|
"access_token": token_data["access_token"],
|
|
}
|
|
|
|
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
|
if "linear_api_key" in credentials:
|
|
self.linear_api_key = cast(str, credentials["linear_api_key"])
|
|
elif "access_token" in credentials:
|
|
self.linear_api_key = "Bearer " + cast(str, credentials["access_token"])
|
|
else:
|
|
# May need to handle case in the future if the OAuth flow expires
|
|
raise ConnectorMissingCredentialError("Linear")
|
|
|
|
return None
|
|
|
|
def _process_issues(
|
|
self, start_str: datetime | None = None, end_str: datetime | None = None
|
|
) -> GenerateDocumentsOutput:
|
|
if self.linear_api_key is None:
|
|
raise ConnectorMissingCredentialError("Linear")
|
|
|
|
lte_filter = f'lte: "{end_str}"' if end_str else ""
|
|
gte_filter = f'gte: "{start_str}"' if start_str else ""
|
|
updatedAtFilter = f"""
|
|
{lte_filter}
|
|
{gte_filter}
|
|
"""
|
|
|
|
query = (
|
|
"""
|
|
query IterateIssueBatches($first: Int, $after: String) {
|
|
issues(
|
|
orderBy: updatedAt,
|
|
first: $first,
|
|
after: $after,
|
|
filter: {
|
|
updatedAt: {
|
|
"""
|
|
+ updatedAtFilter
|
|
+ """
|
|
},
|
|
|
|
}
|
|
) {
|
|
edges {
|
|
node {
|
|
id
|
|
createdAt
|
|
updatedAt
|
|
archivedAt
|
|
number
|
|
title
|
|
priority
|
|
estimate
|
|
sortOrder
|
|
startedAt
|
|
completedAt
|
|
startedTriageAt
|
|
triagedAt
|
|
canceledAt
|
|
autoClosedAt
|
|
autoArchivedAt
|
|
dueDate
|
|
slaStartedAt
|
|
slaBreachesAt
|
|
trashed
|
|
snoozedUntilAt
|
|
team {
|
|
name
|
|
}
|
|
previousIdentifiers
|
|
subIssueSortOrder
|
|
priorityLabel
|
|
identifier
|
|
url
|
|
branchName
|
|
customerTicketCount
|
|
description
|
|
comments {
|
|
nodes {
|
|
url
|
|
body
|
|
}
|
|
}
|
|
}
|
|
}
|
|
pageInfo {
|
|
hasNextPage
|
|
endCursor
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
)
|
|
|
|
has_more = True
|
|
endCursor = None
|
|
while has_more:
|
|
graphql_query = {
|
|
"query": query,
|
|
"variables": {
|
|
"first": self.batch_size,
|
|
"after": endCursor,
|
|
},
|
|
}
|
|
logger.debug(f"Requesting issues from Linear with query: {graphql_query}")
|
|
|
|
response = _make_query(graphql_query, self.linear_api_key)
|
|
response_json = response.json()
|
|
logger.debug(f"Raw response from Linear: {response_json}")
|
|
edges = response_json["data"]["issues"]["edges"]
|
|
|
|
documents: list[Document] = []
|
|
for edge in edges:
|
|
node = edge["node"]
|
|
documents.append(
|
|
Document(
|
|
id=node["id"],
|
|
sections=[
|
|
Section(
|
|
link=node["url"],
|
|
text=node["description"] or "",
|
|
)
|
|
]
|
|
+ [
|
|
Section(
|
|
link=node["url"],
|
|
text=comment["body"] or "",
|
|
)
|
|
for comment in node["comments"]["nodes"]
|
|
],
|
|
source=DocumentSource.LINEAR,
|
|
semantic_identifier=f"[{node['identifier']}] {node['title']}",
|
|
title=node["title"],
|
|
doc_updated_at=time_str_to_utc(node["updatedAt"]),
|
|
metadata={
|
|
"team": node["team"]["name"],
|
|
},
|
|
)
|
|
)
|
|
yield documents
|
|
|
|
endCursor = response_json["data"]["issues"]["pageInfo"]["endCursor"]
|
|
has_more = response_json["data"]["issues"]["pageInfo"]["hasNextPage"]
|
|
|
|
def load_from_state(self) -> GenerateDocumentsOutput:
|
|
yield from self._process_issues()
|
|
|
|
def poll_source(
|
|
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
|
) -> GenerateDocumentsOutput:
|
|
start_time = datetime.fromtimestamp(start, tz=timezone.utc)
|
|
end_time = datetime.fromtimestamp(end, tz=timezone.utc)
|
|
|
|
yield from self._process_issues(start_str=start_time, end_str=end_time)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
connector = LinearConnector()
|
|
connector.load_credentials({"linear_api_key": os.environ["LINEAR_API_KEY"]})
|
|
|
|
document_batches = connector.load_from_state()
|
|
print(next(document_batches))
|