Asana connector (community originated) (#2485)

* initial Asana connector

* hint on how to get Asana workspace ID

* re-format with black

* re-order imports

* update asana connector for clarity

* minor robustification

* minor update to naming

* update for best practice

* update connector

---------

Co-authored-by: Daniel Naber <naber@danielnaber.de>
This commit is contained in:
pablodanswer 2024-09-19 16:54:18 -07:00 committed by GitHub
parent 8a8e2b310e
commit 9f179940f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 425 additions and 1 deletions

View File

@ -99,6 +99,7 @@ class DocumentSource(str, Enum):
CLICKUP = "clickup" CLICKUP = "clickup"
MEDIAWIKI = "mediawiki" MEDIAWIKI = "mediawiki"
WIKIPEDIA = "wikipedia" WIKIPEDIA = "wikipedia"
ASANA = "asana"
S3 = "s3" S3 = "s3"
R2 = "r2" R2 = "r2"
GOOGLE_CLOUD_STORAGE = "google_cloud_storage" GOOGLE_CLOUD_STORAGE = "google_cloud_storage"

View File

@ -0,0 +1,233 @@
import time
from collections.abc import Iterator
from datetime import datetime
from typing import Dict
import asana # type: ignore
from danswer.utils.logger import setup_logger
logger = setup_logger()
# https://github.com/Asana/python-asana/tree/master?tab=readme-ov-file#documentation-for-api-endpoints
class AsanaTask:
def __init__(
self,
id: str,
title: str,
text: str,
link: str,
last_modified: datetime,
project_gid: str,
project_name: str,
) -> None:
self.id = id
self.title = title
self.text = text
self.link = link
self.last_modified = last_modified
self.project_gid = project_gid
self.project_name = project_name
def __str__(self) -> str:
return f"ID: {self.id}\nTitle: {self.title}\nLast modified: {self.last_modified}\nText: {self.text}"
class AsanaAPI:
def __init__(
self, api_token: str, workspace_gid: str, team_gid: str | None
) -> None:
self._user = None # type: ignore
self.workspace_gid = workspace_gid
self.team_gid = team_gid
self.configuration = asana.Configuration()
self.api_client = asana.ApiClient(self.configuration)
self.tasks_api = asana.TasksApi(self.api_client)
self.stories_api = asana.StoriesApi(self.api_client)
self.users_api = asana.UsersApi(self.api_client)
self.project_api = asana.ProjectsApi(self.api_client)
self.workspaces_api = asana.WorkspacesApi(self.api_client)
self.api_error_count = 0
self.configuration.access_token = api_token
self.task_count = 0
def get_tasks(
self, project_gids: list[str] | None, start_date: str
) -> Iterator[AsanaTask]:
"""Get all tasks from the projects with the given gids that were modified since the given date.
If project_gids is None, get all tasks from all projects in the workspace."""
logger.info("Starting to fetch Asana projects")
projects = self.project_api.get_projects(
opts={
"workspace": self.workspace_gid,
"opt_fields": "gid,name,archived,modified_at",
}
)
start_seconds = int(time.mktime(datetime.now().timetuple()))
projects_list = []
project_count = 0
for project_info in projects:
project_gid = project_info["gid"]
if project_gids is None or project_gid in project_gids:
projects_list.append(project_gid)
else:
logger.debug(
f"Skipping project: {project_gid} - not in accepted project_gids"
)
project_count += 1
if project_count % 100 == 0:
logger.info(f"Processed {project_count} projects")
logger.info(f"Found {len(projects_list)} projects to process")
for project_gid in projects_list:
for task in self._get_tasks_for_project(
project_gid, start_date, start_seconds
):
yield task
logger.info(f"Completed fetching {self.task_count} tasks from Asana")
if self.api_error_count > 0:
logger.warning(
f"Encountered {self.api_error_count} API errors during task fetching"
)
def _get_tasks_for_project(
self, project_gid: str, start_date: str, start_seconds: int
) -> Iterator[AsanaTask]:
project = self.project_api.get_project(project_gid, opts={})
if project["archived"]:
logger.info(f"Skipping archived project: {project['name']} ({project_gid})")
return []
if not project["team"] or not project["team"]["gid"]:
logger.info(
f"Skipping project without a team: {project['name']} ({project_gid})"
)
return []
if project["privacy_setting"] == "private":
if self.team_gid and project["team"]["gid"] != self.team_gid:
logger.info(
f"Skipping private project not in configured team: {project['name']} ({project_gid})"
)
return []
else:
logger.info(
f"Processing private project in configured team: {project['name']} ({project_gid})"
)
simple_start_date = start_date.split(".")[0].split("+")[0]
logger.info(
f"Fetching tasks modified since {simple_start_date} for project: {project['name']} ({project_gid})"
)
opts = {
"opt_fields": "name,memberships,memberships.project,completed_at,completed_by,created_at,"
"created_by,custom_fields,dependencies,due_at,due_on,external,html_notes,liked,likes,"
"modified_at,notes,num_hearts,parent,projects,resource_subtype,resource_type,start_on,"
"workspace,permalink_url",
"modified_since": start_date,
}
tasks_from_api = self.tasks_api.get_tasks_for_project(project_gid, opts)
for data in tasks_from_api:
self.task_count += 1
if self.task_count % 10 == 0:
end_seconds = time.mktime(datetime.now().timetuple())
runtime_seconds = end_seconds - start_seconds
if runtime_seconds > 0:
logger.info(
f"Processed {self.task_count} tasks in {runtime_seconds:.0f} seconds "
f"({self.task_count / runtime_seconds:.2f} tasks/second)"
)
logger.debug(f"Processing Asana task: {data['name']}")
text = self._construct_task_text(data)
try:
text += self._fetch_and_add_comments(data["gid"])
last_modified_date = self.format_date(data["modified_at"])
text += f"Last modified: {last_modified_date}\n"
task = AsanaTask(
id=data["gid"],
title=data["name"],
text=text,
link=data["permalink_url"],
last_modified=datetime.fromisoformat(data["modified_at"]),
project_gid=project_gid,
project_name=project["name"],
)
yield task
except Exception:
logger.error(
f"Error processing task {data['gid']} in project {project_gid}",
exc_info=True,
)
self.api_error_count += 1
def _construct_task_text(self, data: Dict) -> str:
text = f"{data['name']}\n\n"
if data["notes"]:
text += f"{data['notes']}\n\n"
if data["created_by"] and data["created_by"]["gid"]:
creator = self.get_user(data["created_by"]["gid"])["name"]
created_date = self.format_date(data["created_at"])
text += f"Created by: {creator} on {created_date}\n"
if data["due_on"]:
due_date = self.format_date(data["due_on"])
text += f"Due date: {due_date}\n"
if data["completed_at"]:
completed_date = self.format_date(data["completed_at"])
text += f"Completed on: {completed_date}\n"
text += "\n"
return text
def _fetch_and_add_comments(self, task_gid: str) -> str:
text = ""
stories_opts: Dict[str, str] = {}
story_start = time.time()
stories = self.stories_api.get_stories_for_task(task_gid, stories_opts)
story_count = 0
comment_count = 0
for story in stories:
story_count += 1
if story["resource_subtype"] == "comment_added":
comment = self.stories_api.get_story(
story["gid"], opts={"opt_fields": "text,created_by,created_at"}
)
commenter = self.get_user(comment["created_by"]["gid"])["name"]
text += f"Comment by {commenter}: {comment['text']}\n\n"
comment_count += 1
story_duration = time.time() - story_start
logger.debug(
f"Processed {story_count} stories (including {comment_count} comments) in {story_duration:.2f} seconds"
)
return text
def get_user(self, user_gid: str) -> Dict:
if self._user is not None:
return self._user
self._user = self.users_api.get_user(user_gid, {"opt_fields": "name,email"})
if not self._user:
logger.warning(f"Unable to fetch user information for user_gid: {user_gid}")
return {"name": "Unknown"}
return self._user
def format_date(self, date_str: str) -> str:
date = datetime.fromisoformat(date_str)
return time.strftime("%Y-%m-%d", date.timetuple())
def get_time(self) -> str:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

View File

@ -0,0 +1,120 @@
import datetime
from typing import Any
from danswer.configs.app_configs import CONTINUE_ON_CONNECTOR_FAILURE
from danswer.configs.app_configs import INDEX_BATCH_SIZE
from danswer.configs.constants import DocumentSource
from danswer.connectors.asana import asana_api
from danswer.connectors.interfaces import GenerateDocumentsOutput
from danswer.connectors.interfaces import LoadConnector
from danswer.connectors.interfaces import PollConnector
from danswer.connectors.interfaces import SecondsSinceUnixEpoch
from danswer.connectors.models import Document
from danswer.connectors.models import Section
from danswer.utils.logger import setup_logger
logger = setup_logger()
class AsanaConnector(LoadConnector, PollConnector):
def __init__(
self,
asana_workspace_id: str,
asana_project_ids: str | None = None,
asana_team_id: str | None = None,
batch_size: int = INDEX_BATCH_SIZE,
continue_on_failure: bool = CONTINUE_ON_CONNECTOR_FAILURE,
) -> None:
self.workspace_id = asana_workspace_id
self.project_ids_to_index: list[str] | None = (
asana_project_ids.split(",") if asana_project_ids is not None else None
)
self.asana_team_id = asana_team_id
self.batch_size = batch_size
self.continue_on_failure = continue_on_failure
logger.info(
f"AsanaConnector initialized with workspace_id: {asana_workspace_id}"
)
def load_credentials(self, credentials: dict[str, Any]) -> dict[str, Any] | None:
self.api_token = credentials["asana_api_token_secret"]
self.asana_client = asana_api.AsanaAPI(
api_token=self.api_token,
workspace_gid=self.workspace_id,
team_gid=self.asana_team_id,
)
logger.info("Asana credentials loaded and API client initialized")
return None
def poll_source(
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch | None
) -> GenerateDocumentsOutput:
start_time = datetime.datetime.fromtimestamp(start).isoformat()
logger.info(f"Starting Asana poll from {start_time}")
asana = asana_api.AsanaAPI(
api_token=self.api_token,
workspace_gid=self.workspace_id,
team_gid=self.asana_team_id,
)
docs_batch: list[Document] = []
tasks = asana.get_tasks(self.project_ids_to_index, start_time)
for task in tasks:
doc = self._message_to_doc(task)
docs_batch.append(doc)
if len(docs_batch) >= self.batch_size:
logger.info(f"Yielding batch of {len(docs_batch)} documents")
yield docs_batch
docs_batch = []
if docs_batch:
logger.info(f"Yielding final batch of {len(docs_batch)} documents")
yield docs_batch
logger.info("Asana poll completed")
def load_from_state(self) -> GenerateDocumentsOutput:
logger.notice("Starting full index of all Asana tasks")
return self.poll_source(start=0, end=None)
def _message_to_doc(self, task: asana_api.AsanaTask) -> Document:
logger.debug(f"Converting Asana task {task.id} to Document")
return Document(
id=task.id,
sections=[Section(link=task.link, text=task.text)],
doc_updated_at=task.last_modified,
source=DocumentSource.ASANA,
semantic_identifier=task.title,
metadata={
"group": task.project_gid,
"project": task.project_name,
},
)
if __name__ == "__main__":
import time
import os
logger.notice("Starting Asana connector test")
connector = AsanaConnector(
os.environ["WORKSPACE_ID"],
os.environ["PROJECT_IDS"],
os.environ["TEAM_ID"],
)
connector.load_credentials(
{
"asana_api_token_secret": os.environ["API_TOKEN"],
}
)
logger.info("Loading all documents from Asana")
all_docs = connector.load_from_state()
current = time.time()
one_day_ago = current - 24 * 60 * 60 # 1 day
logger.info("Polling for documents updated in the last 24 hours")
latest_docs = connector.poll_source(one_day_ago, current)
for docs in latest_docs:
for doc in docs:
print(doc.id)
logger.notice("Asana connector test completed")

View File

@ -4,6 +4,7 @@ from typing import Type
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from danswer.configs.constants import DocumentSource from danswer.configs.constants import DocumentSource
from danswer.connectors.asana.connector import AsanaConnector
from danswer.connectors.axero.connector import AxeroConnector from danswer.connectors.axero.connector import AxeroConnector
from danswer.connectors.blob.connector import BlobStorageConnector from danswer.connectors.blob.connector import BlobStorageConnector
from danswer.connectors.bookstack.connector import BookstackConnector from danswer.connectors.bookstack.connector import BookstackConnector
@ -91,6 +92,7 @@ def identify_connector_class(
DocumentSource.CLICKUP: ClickupConnector, DocumentSource.CLICKUP: ClickupConnector,
DocumentSource.MEDIAWIKI: MediaWikiConnector, DocumentSource.MEDIAWIKI: MediaWikiConnector,
DocumentSource.WIKIPEDIA: WikipediaConnector, DocumentSource.WIKIPEDIA: WikipediaConnector,
DocumentSource.ASANA: AsanaConnector,
DocumentSource.S3: BlobStorageConnector, DocumentSource.S3: BlobStorageConnector,
DocumentSource.R2: BlobStorageConnector, DocumentSource.R2: BlobStorageConnector,
DocumentSource.GOOGLE_CLOUD_STORAGE: BlobStorageConnector, DocumentSource.GOOGLE_CLOUD_STORAGE: BlobStorageConnector,

View File

@ -70,6 +70,7 @@ transformers==4.39.2
uvicorn==0.21.1 uvicorn==0.21.1
zulip==0.8.2 zulip==0.8.2
hubspot-api-client==8.1.0 hubspot-api-client==8.1.0
asana==5.0.8
zenpy==2.0.41 zenpy==2.0.41
dropbox==11.36.2 dropbox==11.36.2
boto3-stubs[s3]==1.34.133 boto3-stubs[s3]==1.34.133

BIN
web/public/Asana.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.6 KiB

View File

@ -52,7 +52,7 @@ import litellmIcon from "../../../public/LiteLLM.jpg";
import awsWEBP from "../../../public/Amazon.webp"; import awsWEBP from "../../../public/Amazon.webp";
import azureIcon from "../../../public/Azure.png"; import azureIcon from "../../../public/Azure.png";
import asanaIcon from "../../../public/Asana.png";
import anthropicSVG from "../../../public/Anthropic.svg"; import anthropicSVG from "../../../public/Anthropic.svg";
import nomicSVG from "../../../public/nomic.svg"; import nomicSVG from "../../../public/nomic.svg";
import microsoftIcon from "../../../public/microsoft.png"; import microsoftIcon from "../../../public/microsoft.png";
@ -2811,3 +2811,14 @@ export const WindowsIcon = ({
</svg> </svg>
); );
}; };
export const AsanaIcon = ({
size = 16,
className = defaultTailwindCSS,
}: IconProps) => (
<div
style={{ width: `${size}px`, height: `${size}px` }}
className={`w-[${size}px] h-[${size}px] ` + className}
>
<Image src={asanaIcon} alt="Logo" width="96" height="96" />
</div>
);

View File

@ -763,6 +763,38 @@ For example, specifying .*-support.* as a "channel" will cause the connector to
}, },
], ],
}, },
asana: {
description: "Configure Asana connector",
values: [
{
type: "text",
query: "Enter your Asana workspace ID:",
label: "Workspace ID",
name: "asana_workspace_id",
optional: false,
description:
"The ID of the Asana workspace to index. You can find this at https://app.asana.com/api/1.0/workspaces. It's a number that looks like 1234567890123456.",
},
{
type: "text",
query: "Enter project IDs to index (optional):",
label: "Project IDs",
name: "asana_project_ids",
description:
"IDs of specific Asana projects to index, separated by commas. Leave empty to index all projects in the workspace. Example: 1234567890123456,2345678901234567",
optional: true,
},
{
type: "text",
query: "Enter the Team ID (optional):",
label: "Team ID",
name: "asana_team_id",
optional: true,
description:
"ID of a team to use for accessing team-visible tasks. This allows indexing of team-visible tasks in addition to public tasks. Leave empty if you don't want to use this feature.",
},
],
},
mediawiki: { mediawiki: {
description: "Configure MediaWiki connector", description: "Configure MediaWiki connector",
values: [ values: [
@ -1056,6 +1088,12 @@ export interface MediaWikiBaseConfig {
recurse_depth?: number; recurse_depth?: number;
} }
export interface AsanaConfig {
asana_workspace_id: string;
asana_project_ids?: string;
asana_team_id?: string;
}
export interface MediaWikiConfig extends MediaWikiBaseConfig { export interface MediaWikiConfig extends MediaWikiBaseConfig {
hostname: string; hostname: string;
} }

View File

@ -166,6 +166,10 @@ export interface SharepointCredentialJson {
sp_directory_id: string; sp_directory_id: string;
} }
export interface AsanaCredentialJson {
asana_api_token_secret: string;
}
export interface TeamsCredentialJson { export interface TeamsCredentialJson {
teams_client_id: string; teams_client_id: string;
teams_client_secret: string; teams_client_secret: string;
@ -241,6 +245,9 @@ export const credentialTemplates: Record<ValidSources, any> = {
sp_client_secret: "", sp_client_secret: "",
sp_directory_id: "", sp_directory_id: "",
} as SharepointCredentialJson, } as SharepointCredentialJson,
asana: {
asana_api_token_secret: "",
} as AsanaCredentialJson,
teams: { teams: {
teams_client_id: "", teams_client_id: "",
teams_client_secret: "", teams_client_secret: "",
@ -412,6 +419,9 @@ export const credentialDisplayNames: Record<string, string> = {
sp_client_secret: "SharePoint Client Secret", sp_client_secret: "SharePoint Client Secret",
sp_directory_id: "SharePoint Directory ID", sp_directory_id: "SharePoint Directory ID",
// Asana
asana_api_token_secret: "Asana API Token",
// Teams // Teams
teams_client_id: "Microsoft Teams Client ID", teams_client_id: "Microsoft Teams Client ID",
teams_client_secret: "Microsoft Teams Client Secret", teams_client_secret: "Microsoft Teams Client Secret",

View File

@ -32,6 +32,7 @@ import {
ZulipIcon, ZulipIcon,
MediaWikiIcon, MediaWikiIcon,
WikipediaIcon, WikipediaIcon,
AsanaIcon,
S3Icon, S3Icon,
OCIStorageIcon, OCIStorageIcon,
GoogleStorageIcon, GoogleStorageIcon,
@ -230,6 +231,12 @@ const SOURCE_METADATA_MAP: SourceMap = {
category: SourceCategory.Wiki, category: SourceCategory.Wiki,
docs: "https://docs.danswer.dev/connectors/wikipedia", docs: "https://docs.danswer.dev/connectors/wikipedia",
}, },
asana: {
icon: AsanaIcon,
displayName: "Asana",
category: SourceCategory.ProjectManagement,
docs: "https://docs.danswer.dev/connectors/asana",
},
mediawiki: { mediawiki: {
icon: MediaWikiIcon, icon: MediaWikiIcon,
displayName: "MediaWiki", displayName: "MediaWiki",

View File

@ -247,6 +247,7 @@ const validSources = [
"clickup", "clickup",
"wikipedia", "wikipedia",
"mediawiki", "mediawiki",
"asana",
"s3", "s3",
"r2", "r2",
"google_cloud_storage", "google_cloud_storage",