mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-28 21:05:17 +02:00
Add Github Polling and Issues (#424)
This commit is contained in:
@@ -1,8 +1,11 @@
|
|||||||
import itertools
|
import itertools
|
||||||
from collections.abc import Generator
|
from collections.abc import Iterator
|
||||||
|
from datetime import datetime
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
from typing import cast
|
||||||
|
|
||||||
from github import Github
|
from github import Github
|
||||||
|
from github.Issue import Issue
|
||||||
from github.PaginatedList import PaginatedList
|
from github.PaginatedList import PaginatedList
|
||||||
from github.PullRequest import PullRequest
|
from github.PullRequest import PullRequest
|
||||||
|
|
||||||
@@ -10,6 +13,7 @@ from danswer.configs.app_configs import INDEX_BATCH_SIZE
|
|||||||
from danswer.configs.constants import DocumentSource
|
from danswer.configs.constants import DocumentSource
|
||||||
from danswer.connectors.interfaces import GenerateDocumentsOutput
|
from danswer.connectors.interfaces import GenerateDocumentsOutput
|
||||||
from danswer.connectors.interfaces import LoadConnector
|
from danswer.connectors.interfaces import LoadConnector
|
||||||
|
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
|
||||||
from danswer.connectors.models import ConnectorMissingCredentialError
|
from danswer.connectors.models import ConnectorMissingCredentialError
|
||||||
from danswer.connectors.models import Document
|
from danswer.connectors.models import Document
|
||||||
from danswer.connectors.models import Section
|
from danswer.connectors.models import Section
|
||||||
@@ -19,10 +23,10 @@ from danswer.utils.logger import setup_logger
|
|||||||
logger = setup_logger()
|
logger = setup_logger()
|
||||||
|
|
||||||
|
|
||||||
def get_pr_batches(
|
def _batch_github_objects(
|
||||||
pull_requests: PaginatedList, batch_size: int
|
git_objs: PaginatedList, batch_size: int
|
||||||
) -> Generator[list[PullRequest], None, None]:
|
) -> Iterator[list[PullRequest | Issue]]:
|
||||||
it = iter(pull_requests)
|
it = iter(git_objs)
|
||||||
while True:
|
while True:
|
||||||
batch = list(itertools.islice(it, batch_size))
|
batch = list(itertools.islice(it, batch_size))
|
||||||
if not batch:
|
if not batch:
|
||||||
@@ -30,6 +34,40 @@ def get_pr_batches(
|
|||||||
yield batch
|
yield batch
|
||||||
|
|
||||||
|
|
||||||
|
def _convert_pr_to_document(pull_request: PullRequest) -> Document:
|
||||||
|
full_context = f"Pull-Request {pull_request.title}\n{pull_request.body}"
|
||||||
|
return Document(
|
||||||
|
id=pull_request.html_url,
|
||||||
|
sections=[Section(link=pull_request.html_url, text=full_context)],
|
||||||
|
source=DocumentSource.GITHUB,
|
||||||
|
semantic_identifier=pull_request.title,
|
||||||
|
metadata={
|
||||||
|
"last_modified": pull_request.last_modified,
|
||||||
|
"merged": pull_request.merged,
|
||||||
|
"state": pull_request.state,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _fetch_issue_comments(issue: Issue) -> str:
|
||||||
|
comments = issue.get_comments()
|
||||||
|
return "\nComment: ".join(comment.body for comment in comments)
|
||||||
|
|
||||||
|
|
||||||
|
def _convert_issue_to_document(issue: Issue) -> Document:
|
||||||
|
full_context = f"Issue {issue.title}\n{issue.body}"
|
||||||
|
return Document(
|
||||||
|
id=issue.html_url,
|
||||||
|
sections=[Section(link=issue.html_url, text=full_context)],
|
||||||
|
source=DocumentSource.GITHUB,
|
||||||
|
semantic_identifier=issue.title,
|
||||||
|
metadata={
|
||||||
|
"last_modified": issue.updated_at,
|
||||||
|
"state": issue.state,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class GithubConnector(LoadConnector):
|
class GithubConnector(LoadConnector):
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
@@ -37,40 +75,85 @@ class GithubConnector(LoadConnector):
|
|||||||
repo_name: str,
|
repo_name: str,
|
||||||
batch_size: int = INDEX_BATCH_SIZE,
|
batch_size: int = INDEX_BATCH_SIZE,
|
||||||
state_filter: str = "all",
|
state_filter: str = "all",
|
||||||
|
include_prs: bool = True,
|
||||||
|
include_issues: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
self.repo_owner = repo_owner
|
self.repo_owner = repo_owner
|
||||||
self.repo_name = repo_name
|
self.repo_name = repo_name
|
||||||
self.batch_size = batch_size
|
self.batch_size = batch_size
|
||||||
self.state_filter = state_filter
|
self.state_filter = state_filter
|
||||||
|
self.include_prs = include_prs
|
||||||
|
self.include_issues = include_issues
|
||||||
self.github_client: Github | None = None
|
self.github_client: Github | None = None
|
||||||
|
|
||||||
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
|
||||||
self.github_client = Github(credentials["github_access_token"])
|
self.github_client = Github(credentials["github_access_token"])
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def load_from_state(self) -> GenerateDocumentsOutput:
|
def _fetch_from_github(
|
||||||
|
self, start: datetime | None = None, end: datetime | None = None
|
||||||
|
) -> GenerateDocumentsOutput:
|
||||||
if self.github_client is None:
|
if self.github_client is None:
|
||||||
raise ConnectorMissingCredentialError("GitHub")
|
raise ConnectorMissingCredentialError("GitHub")
|
||||||
repo = self.github_client.get_repo(f"{self.repo_owner}/{self.repo_name}")
|
|
||||||
pull_requests = repo.get_pulls(state=self.state_filter)
|
|
||||||
for pr_batch in get_pr_batches(pull_requests, self.batch_size):
|
|
||||||
doc_batch = []
|
|
||||||
for pull_request in pr_batch:
|
|
||||||
full_context = f"Pull-Request {pull_request.title} {pull_request.body}"
|
|
||||||
doc_batch.append(
|
|
||||||
Document(
|
|
||||||
id=pull_request.html_url,
|
|
||||||
sections=[
|
|
||||||
Section(link=pull_request.html_url, text=full_context)
|
|
||||||
],
|
|
||||||
source=DocumentSource.GITHUB,
|
|
||||||
semantic_identifier=pull_request.title,
|
|
||||||
metadata={
|
|
||||||
"last_modified": pull_request.last_modified,
|
|
||||||
"merged": pull_request.merged,
|
|
||||||
"state": pull_request.state,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
yield doc_batch
|
repo = self.github_client.get_repo(f"{self.repo_owner}/{self.repo_name}")
|
||||||
|
|
||||||
|
if self.include_prs:
|
||||||
|
pull_requests = repo.get_pulls(
|
||||||
|
state=self.state_filter, sort="updated", direction="desc"
|
||||||
|
)
|
||||||
|
|
||||||
|
for pr_batch in _batch_github_objects(pull_requests, self.batch_size):
|
||||||
|
doc_batch: list[Document] = []
|
||||||
|
for pr in pr_batch:
|
||||||
|
if start is not None and pr.updated_at < start:
|
||||||
|
yield doc_batch
|
||||||
|
return
|
||||||
|
if end is not None and pr.updated_at > end:
|
||||||
|
continue
|
||||||
|
doc_batch.append(_convert_pr_to_document(cast(PullRequest, pr)))
|
||||||
|
yield doc_batch
|
||||||
|
|
||||||
|
if self.include_issues:
|
||||||
|
issues = repo.get_issues(
|
||||||
|
state=self.state_filter, sort="updated", direction="desc"
|
||||||
|
)
|
||||||
|
|
||||||
|
for issue_batch in _batch_github_objects(issues, self.batch_size):
|
||||||
|
doc_batch = []
|
||||||
|
for issue in issue_batch:
|
||||||
|
issue = cast(Issue, issue)
|
||||||
|
if start is not None and issue.updated_at < start:
|
||||||
|
yield doc_batch
|
||||||
|
return
|
||||||
|
if end is not None and issue.updated_at > end:
|
||||||
|
continue
|
||||||
|
if issue.pull_request is not None:
|
||||||
|
# PRs are handled separately
|
||||||
|
continue
|
||||||
|
doc_batch.append(_convert_issue_to_document(issue))
|
||||||
|
yield doc_batch
|
||||||
|
|
||||||
|
def load_from_state(self) -> GenerateDocumentsOutput:
|
||||||
|
return self._fetch_from_github()
|
||||||
|
|
||||||
|
def poll_source(
|
||||||
|
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
||||||
|
) -> GenerateDocumentsOutput:
|
||||||
|
start_datetime = datetime.fromtimestamp(start)
|
||||||
|
end_datetime = datetime.fromtimestamp(end)
|
||||||
|
return self._fetch_from_github(start_datetime, end_datetime)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import os
|
||||||
|
|
||||||
|
connector = GithubConnector(
|
||||||
|
repo_owner=os.environ["REPO_OWNER"],
|
||||||
|
repo_name=os.environ["REPO_NAME"],
|
||||||
|
)
|
||||||
|
connector.load_credentials(
|
||||||
|
{"github_access_token": os.environ["GITHUB_ACCESS_TOKEN"]}
|
||||||
|
)
|
||||||
|
document_batches = connector.load_from_state()
|
||||||
|
print(next(document_batches))
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
import abc
|
import abc
|
||||||
from collections.abc import Generator
|
from collections.abc import Iterator
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from danswer.connectors.models import Document
|
from danswer.connectors.models import Document
|
||||||
@@ -7,7 +7,7 @@ from danswer.connectors.models import Document
|
|||||||
|
|
||||||
SecondsSinceUnixEpoch = float
|
SecondsSinceUnixEpoch = float
|
||||||
|
|
||||||
GenerateDocumentsOutput = Generator[list[Document], None, None]
|
GenerateDocumentsOutput = Iterator[list[Document]]
|
||||||
|
|
||||||
|
|
||||||
class BaseConnector(abc.ABC):
|
class BaseConnector(abc.ABC):
|
||||||
|
Reference in New Issue
Block a user