DAN-118 Jira connector (#102)

* Small confluence page QoL changes

* Prevent getting into a bad state with orphan connectors for Jira / Confluence

* Jira connector + admin page
---------

Co-authored-by: Weves <chrisweaver101@gmail.com>
This commit is contained in:
Yuhong Sun
2023-06-24 17:48:38 -07:00
committed by GitHub
parent 3701239283
commit 03006743ab
20 changed files with 595 additions and 49 deletions

View File

@@ -22,4 +22,5 @@ class DocumentSource(str, Enum):
GOOGLE_DRIVE = "google_drive"
GITHUB = "github"
CONFLUENCE = "confluence"
JIRA = "jira"
FILE = "file"

View File

@@ -0,0 +1,155 @@
from datetime import datetime
from datetime import timezone
from typing import Any
from urllib.parse import urlparse
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.utils.logging import setup_logger
from jira import JIRA
from jira.resources import Issue
logger = setup_logger()
PROJECT_URL_PAT = "projects"
def extract_jira_project(url: str) -> tuple[str, str]:
parsed_url = urlparse(url)
jira_base = parsed_url.scheme + "://" + parsed_url.netloc
# Split the path by '/' and find the position of 'projects' to get the project name
split_path = parsed_url.path.split("/")
if PROJECT_URL_PAT in split_path:
project_pos = split_path.index(PROJECT_URL_PAT)
if len(split_path) > project_pos + 1:
jira_project = split_path[project_pos + 1]
else:
raise ValueError("No project name found in the URL")
else:
raise ValueError("'projects' not found in the URL")
return jira_base, jira_project
def fetch_jira_issues_batch(
jql: str,
start_index: int,
jira_client: JIRA,
batch_size: int = INDEX_BATCH_SIZE,
) -> tuple[list[Document], int]:
doc_batch = []
batch = jira_client.search_issues(
jql,
startAt=start_index,
maxResults=batch_size,
)
for jira in batch:
if type(jira) != Issue:
logger.warning(f"Found Jira object not of type Issue {jira}")
continue
semantic_rep = (
f"Jira Ticket Summary: {jira.fields.summary}\n"
f"Description: {jira.fields.description}\n"
+ "\n".join(
[f"Comment: {comment.body}" for comment in jira.fields.comment.comments]
)
)
page_url = f"{jira_client.client_info()}/browse/{jira.key}"
doc_batch.append(
Document(
id=page_url,
sections=[Section(link=page_url, text=semantic_rep)],
source=DocumentSource.JIRA,
semantic_identifier=jira.fields.summary,
metadata={},
)
)
return doc_batch, len(batch)
class JiraConnector(LoadConnector, PollConnector):
def __init__(
self,
jira_project_url: str,
batch_size: int = INDEX_BATCH_SIZE,
) -> None:
self.batch_size = batch_size
self.jira_base, self.jira_project = extract_jira_project(jira_project_url)
self.jira_client: JIRA | None = None
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
email = credentials["jira_user_email"]
api_token = credentials["jira_api_token"]
self.jira_client = JIRA(basic_auth=(email, api_token), server=self.jira_base)
return None
def load_from_state(self) -> GenerateDocumentsOutput:
if self.jira_client is None:
raise PermissionError(
"Jira Client is not set up, was load_credentials called?"
)
start_ind = 0
while True:
doc_batch, fetched_batch_size = fetch_jira_issues_batch(
f"project = {self.jira_project}",
start_ind,
self.jira_client,
self.batch_size,
)
if doc_batch:
yield doc_batch
start_ind += fetched_batch_size
if fetched_batch_size < self.batch_size:
break
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
) -> GenerateDocumentsOutput:
if self.jira_client is None:
raise PermissionError(
"Jira Client is not set up, was load_credentials called?"
)
start_date_str = datetime.fromtimestamp(start, tz=timezone.utc).strftime(
"%Y-%m-%d %H:%M"
)
end_date_str = datetime.fromtimestamp(end, tz=timezone.utc).strftime(
"%Y-%m-%d %H:%M"
)
jql = (
f"project = {self.jira_project} AND "
f"updated >= '{start_date_str}' AND "
f"updated <= '{end_date_str}'"
)
start_ind = 0
while True:
doc_batch, fetched_batch_size = fetch_jira_issues_batch(
jql,
start_ind,
self.jira_client,
self.batch_size,
)
if doc_batch:
yield doc_batch
start_ind += fetched_batch_size
if fetched_batch_size < self.batch_size:
break

View File

@@ -3,6 +3,7 @@ from typing import Type
from danswer.configs.constants import DocumentSource
from danswer.connectors.confluence.connector import ConfluenceConnector
from danswer.connectors.danswer_jira.connector import JiraConnector
from danswer.connectors.file.connector import LocalFileConnector
from danswer.connectors.github.connector import GithubConnector
from danswer.connectors.google_drive.connector import GoogleDriveConnector
@@ -36,6 +37,7 @@ def identify_connector_class(
DocumentSource.GITHUB: GithubConnector,
DocumentSource.GOOGLE_DRIVE: GoogleDriveConnector,
DocumentSource.CONFLUENCE: ConfluenceConnector,
DocumentSource.JIRA: JiraConnector,
}
connector_by_source = connector_map.get(source, {})

View File

@@ -12,6 +12,7 @@ google-auth-oauthlib==1.0.0
httpcore==0.16.3
httpx==0.23.3
httpx-oauth==0.11.2
jira==3.5.1
Mako==1.2.4
nltk==3.8.1
openai==0.27.6

View File

@@ -0,0 +1,27 @@
from danswer.configs.app_configs import TYPESENSE_DEFAULT_COLLECTION
from danswer.utils.clients import get_typesense_client
if __name__ == "__main__":
ts_client = get_typesense_client()
page_number = 1
per_page = 100 # number of documents to retrieve per page
while True:
params = {
"q": "",
"query_by": "content",
"page": page_number,
"per_page": per_page,
}
response = ts_client.collections[TYPESENSE_DEFAULT_COLLECTION].documents.search(
params
)
documents = response.get("hits")
if not documents:
break # if there are no more documents, break out of the loop
for document in documents:
print(document)
page_number += 1 # move on to the next page

View File

@@ -27,6 +27,11 @@ def wipe_all_rows(database: str) -> None:
table_names = cur.fetchall()
# have to delete from these first to not run into psycopg2.errors.ForeignKeyViolation
cur.execute(f"DELETE FROM connector_credential_pair")
cur.execute(f"DELETE FROM index_attempt")
conn.commit()
for table_name in table_names:
if table_name[0] == "alembic_version":
continue