evan-danswer 9b0cba367e
small linear connector improvements ()
* small linear connector improvements

* add todo for url handling
2025-02-07 01:31:49 +00:00

288 lines
9.7 KiB
Python

import os
from datetime import datetime
from datetime import timezone
from typing import Any
from typing import cast
import requests
from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.app_configs import LINEAR_CLIENT_ID
from onyx.configs.app_configs import LINEAR_CLIENT_SECRET
from onyx.configs.constants import DocumentSource
from onyx.connectors.cross_connector_utils.miscellaneous_utils import (
get_oauth_callback_uri,
)
from onyx.connectors.cross_connector_utils.miscellaneous_utils import time_str_to_utc
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import OAuthConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import Section
from onyx.utils.logger import setup_logger
from onyx.utils.retry_wrapper import request_with_retries
logger = setup_logger()
_NUM_RETRIES = 5
_TIMEOUT = 60
_LINEAR_GRAPHQL_URL = "https://api.linear.app/graphql"
def _make_query(request_body: dict[str, Any], api_key: str) -> requests.Response:
headers = {
"Authorization": api_key,
"Content-Type": "application/json",
}
for i in range(_NUM_RETRIES):
try:
response = requests.post(
_LINEAR_GRAPHQL_URL,
headers=headers,
json=request_body,
timeout=_TIMEOUT,
)
if not response.ok:
raise RuntimeError(
f"Error fetching issues from Linear: {response.text}"
)
return response
except Exception as e:
if i == _NUM_RETRIES - 1:
raise e
logger.warning(f"A Linear GraphQL error occurred: {e}. Retrying...")
raise RuntimeError(
"Unexpected execution when querying Linear. This should never happen."
)
class LinearConnector(LoadConnector, PollConnector, OAuthConnector):
def __init__(
self,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.batch_size = batch_size
self.linear_api_key: str | None = None
@classmethod
def oauth_id(cls) -> DocumentSource:
return DocumentSource.LINEAR
@classmethod
def oauth_authorization_url(
cls, base_domain: str, state: str, additional_kwargs: dict[str, str]
) -> str:
if not LINEAR_CLIENT_ID:
raise ValueError("LINEAR_CLIENT_ID environment variable must be set")
callback_uri = get_oauth_callback_uri(base_domain, DocumentSource.LINEAR.value)
return (
f"https://linear.app/oauth/authorize"
f"?client_id={LINEAR_CLIENT_ID}"
f"&redirect_uri={callback_uri}"
f"&response_type=code"
f"&scope=read"
f"&state={state}"
f"&prompt=consent" # prompts user for access; allows choosing workspace
)
@classmethod
def oauth_code_to_token(
cls, base_domain: str, code: str, additional_kwargs: dict[str, str]
) -> dict[str, Any]:
data = {
"code": code,
"redirect_uri": get_oauth_callback_uri(
base_domain, DocumentSource.LINEAR.value
),
"client_id": LINEAR_CLIENT_ID,
"client_secret": LINEAR_CLIENT_SECRET,
"grant_type": "authorization_code",
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = request_with_retries(
method="POST",
url="https://api.linear.app/oauth/token",
data=data,
headers=headers,
backoff=0,
delay=0.1,
)
if not response.ok:
raise RuntimeError(f"Failed to exchange code for token: {response.text}")
token_data = response.json()
return {
"access_token": token_data["access_token"],
}
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
if "linear_api_key" in credentials:
self.linear_api_key = cast(str, credentials["linear_api_key"])
elif "access_token" in credentials:
self.linear_api_key = "Bearer " + cast(str, credentials["access_token"])
else:
# May need to handle case in the future if the OAuth flow expires
raise ConnectorMissingCredentialError("Linear")
return None
def _process_issues(
self, start_str: datetime | None = None, end_str: datetime | None = None
) -> GenerateDocumentsOutput:
if self.linear_api_key is None:
raise ConnectorMissingCredentialError("Linear")
lte_filter = f'lte: "{end_str}"' if end_str else ""
gte_filter = f'gte: "{start_str}"' if start_str else ""
updatedAtFilter = f"""
{lte_filter}
{gte_filter}
"""
query = (
"""
query IterateIssueBatches($first: Int, $after: String) {
issues(
orderBy: updatedAt,
first: $first,
after: $after,
filter: {
updatedAt: {
"""
+ updatedAtFilter
+ """
},
}
) {
edges {
node {
id
createdAt
updatedAt
archivedAt
number
title
priority
estimate
sortOrder
startedAt
completedAt
startedTriageAt
triagedAt
canceledAt
autoClosedAt
autoArchivedAt
dueDate
slaStartedAt
slaBreachesAt
trashed
snoozedUntilAt
team {
name
}
previousIdentifiers
subIssueSortOrder
priorityLabel
identifier
url
branchName
customerTicketCount
description
comments {
nodes {
url
body
}
}
}
}
pageInfo {
hasNextPage
endCursor
}
}
}
"""
)
has_more = True
endCursor = None
while has_more:
graphql_query = {
"query": query,
"variables": {
"first": self.batch_size,
"after": endCursor,
},
}
logger.debug(f"Requesting issues from Linear with query: {graphql_query}")
response = _make_query(graphql_query, self.linear_api_key)
response_json = response.json()
logger.debug(f"Raw response from Linear: {response_json}")
edges = response_json["data"]["issues"]["edges"]
documents: list[Document] = []
for edge in edges:
node = edge["node"]
documents.append(
Document(
id=node["id"],
sections=[
Section(
link=node["url"],
text=node["description"] or "",
)
]
+ [
Section(
link=node["url"],
text=comment["body"] or "",
)
for comment in node["comments"]["nodes"]
],
source=DocumentSource.LINEAR,
semantic_identifier=f"[{node['identifier']}] {node['title']}",
title=node["title"],
doc_updated_at=time_str_to_utc(node["updatedAt"]),
metadata={
"team": node["team"]["name"],
},
)
)
yield documents
endCursor = response_json["data"]["issues"]["pageInfo"]["endCursor"]
has_more = response_json["data"]["issues"]["pageInfo"]["hasNextPage"]
def load_from_state(self) -> GenerateDocumentsOutput:
yield from self._process_issues()
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
start_time = datetime.fromtimestamp(start, tz=timezone.utc)
end_time = datetime.fromtimestamp(end, tz=timezone.utc)
yield from self._process_issues(start_str=start_time, end_str=end_time)
if __name__ == "__main__":
connector = LinearConnector()
connector.load_credentials({"linear_api_key": os.environ["LINEAR_API_KEY"]})
document_batches = connector.load_from_state()
print(next(document_batches))