Add Github admin page + adjust way index APIs work

This commit is contained in:
Weves
2023-05-13 23:17:57 -07:00
committed by Chris Weaver
parent 3d1fffb38b
commit 5c98310b79
19 changed files with 379 additions and 179 deletions

View File

@@ -2,6 +2,8 @@ import time
from typing import cast
from danswer.configs.constants import DocumentSource
from danswer.connectors.factory import build_connector
from danswer.connectors.models import InputType
from danswer.connectors.slack.config import get_pull_frequency
from danswer.connectors.slack.pull import PeriodicSlackLoader
from danswer.connectors.web.pull import WebLoader
@@ -35,6 +37,8 @@ def run_update() -> None:
current_time = int(time.time())
# Slack
# TODO (chris): make Slack use the same approach as other connectors /
# make other connectors periodic
try:
pull_frequency = get_pull_frequency()
except ConfigNotFoundError:
@@ -56,17 +60,15 @@ def run_update() -> None:
indexing_pipeline(doc_batch)
dynamic_config_store.store(last_slack_pull_key, current_time)
# Web
# TODO (chris): make this more efficient / in a single transaction to
# prevent race conditions across multiple background jobs. For now,
# this assumes we only ever run a single background job at a time
# TODO (chris): make this generic for all pull connectors (not just web)
not_started_index_attempts = fetch_index_attempts(
sources=[DocumentSource.WEB], statuses=[IndexingStatus.NOT_STARTED]
input_types=[InputType.PULL], statuses=[IndexingStatus.NOT_STARTED]
)
for not_started_index_attempt in not_started_index_attempts:
logger.info(
"Attempting to index website with IndexAttempt id: "
"Attempting to index with IndexAttempt id: "
f"{not_started_index_attempt.id}, source: "
f"{not_started_index_attempt.source}, input_type: "
f"{not_started_index_attempt.input_type}, and connector_specific_config: "
@@ -78,17 +80,25 @@ def run_update() -> None:
)
error_msg = None
base_url = not_started_index_attempt.connector_specific_config["url"]
try:
# TODO (chris): spawn processes to parallelize / take advantage of
# multiple cores + implement retries
connector = build_connector(
source=not_started_index_attempt.source,
input_type=InputType.PULL,
connector_specific_config=not_started_index_attempt.connector_specific_config,
)
document_ids: list[str] = []
for doc_batch in WebLoader(base_url=base_url).load():
for doc_batch in connector.load():
chunks = indexing_pipeline(doc_batch)
document_ids.extend([chunk.source_document.id for chunk in chunks])
except Exception as e:
logger.exception(
"Failed to index website with url %s due to: %s", base_url, e
"Failed to index for source %s with config %s due to: %s",
not_started_index_attempt.source,
not_started_index_attempt.connector_specific_config,
e,
)
error_msg = str(e)

View File

@@ -0,0 +1,40 @@
from typing import Any
from danswer.configs.constants import DocumentSource
from danswer.connectors.github.batch import BatchGithubLoader
from danswer.connectors.google_drive.batch import BatchGoogleDriveLoader
from danswer.connectors.interfaces import PullLoader
from danswer.connectors.interfaces import RangePullLoader
from danswer.connectors.models import InputType
from danswer.connectors.slack.batch import BatchSlackLoader
from danswer.connectors.slack.pull import PeriodicSlackLoader
from danswer.connectors.web.pull import WebLoader
class ConnectorMissingException(Exception):
pass
def build_connector(
source: DocumentSource,
input_type: InputType,
connector_specific_config: dict[str, Any],
) -> PullLoader | RangePullLoader:
if source == DocumentSource.SLACK:
if input_type == InputType.PULL:
return PeriodicSlackLoader(**connector_specific_config)
if input_type == InputType.LOAD_STATE:
return BatchSlackLoader(**connector_specific_config)
elif source == DocumentSource.GOOGLE_DRIVE:
if input_type == InputType.PULL:
return BatchGoogleDriveLoader(**connector_specific_config)
elif source == DocumentSource.GITHUB:
if input_type == InputType.PULL:
return BatchGithubLoader(**connector_specific_config)
elif source == DocumentSource.WEB:
if input_type == InputType.PULL:
return WebLoader(**connector_specific_config)
raise ConnectorMissingException(
f"Connector not found for source={source}, input_type={input_type}"
)

View File

@@ -9,6 +9,7 @@ from danswer.connectors.models import Document
SecondsSinceUnixEpoch = float
# TODO (chris): rename from Loader -> Connector
class PullLoader:
@abc.abstractmethod
def load(self) -> Generator[List[Document], None, None]:

View File

@@ -1,4 +1,5 @@
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import InputType
from danswer.db.engine import build_engine
from danswer.db.models import IndexAttempt
from danswer.db.models import IndexingStatus
@@ -20,6 +21,7 @@ def fetch_index_attempts(
*,
sources: list[DocumentSource] | None = None,
statuses: list[IndexingStatus] | None = None,
input_types: list[InputType] | None = None,
) -> list[IndexAttempt]:
with Session(build_engine(), future=True, expire_on_commit=False) as session:
stmt = select(IndexAttempt)
@@ -27,6 +29,8 @@ def fetch_index_attempts(
stmt = stmt.where(IndexAttempt.source.in_(sources))
if statuses:
stmt = stmt.where(IndexAttempt.status.in_(statuses))
if input_types:
stmt = stmt.where(IndexAttempt.input_type.in_(input_types))
results = session.scalars(stmt)
return list(results.all())

View File

@@ -1,6 +1,9 @@
from typing import Any
from danswer.auth.users import current_admin_user
from danswer.configs.constants import DocumentSource
from danswer.configs.constants import NO_AUTH_USER
from danswer.connectors.factory import build_connector
from danswer.connectors.google_drive.connector_auth import get_auth_url
from danswer.connectors.google_drive.connector_auth import get_drive_tokens
from danswer.connectors.google_drive.connector_auth import save_access_tokens
@@ -19,11 +22,10 @@ from danswer.server.models import AuthStatus
from danswer.server.models import AuthUrl
from danswer.server.models import GDriveCallback
from danswer.server.models import IndexAttemptSnapshot
from danswer.server.models import ListWebsiteIndexAttemptsResponse
from danswer.server.models import WebIndexAttemptRequest
from danswer.utils.logging import setup_logger
from fastapi import APIRouter
from fastapi import Depends
from pydantic import BaseModel
router = APIRouter(prefix="/admin")
@@ -67,29 +69,51 @@ def modify_slack_config(
update_slack_config(slack_config)
@router.post("/connectors/web/index-attempt", status_code=201)
def index_website(
web_index_attempt_request: WebIndexAttemptRequest,
class IndexAttemptRequest(BaseModel):
input_type: InputType = InputType.PULL
connector_specific_config: dict[str, Any]
@router.post("/connectors/{source}/index-attempt", status_code=201)
def index(
source: DocumentSource,
index_attempt_request: IndexAttemptRequest,
_: User = Depends(current_admin_user),
) -> None:
index_request = IndexAttempt(
source=DocumentSource.WEB,
input_type=InputType.PULL,
connector_specific_config={"url": web_index_attempt_request.url},
status=IndexingStatus.NOT_STARTED,
# validate that the connector specified by the source / input_type combination
# exists AND that the connector_specific_config is valid for that connector type
build_connector(
source=source,
input_type=index_attempt_request.input_type,
connector_specific_config=index_attempt_request.connector_specific_config,
)
# once validated, insert the index attempt into the database where it will
# get picked up by a background job
insert_index_attempt(
index_attempt=IndexAttempt(
source=source,
input_type=index_attempt_request.input_type,
connector_specific_config=index_attempt_request.connector_specific_config,
status=IndexingStatus.NOT_STARTED,
)
)
insert_index_attempt(index_request)
@router.get("/connectors/web/index-attempt")
def list_website_index_attempts(
class ListIndexAttemptsResponse(BaseModel):
index_attempts: list[IndexAttemptSnapshot]
@router.get("/connectors/{source}/index-attempt")
def list_index_attempts(
source: DocumentSource,
_: User = Depends(current_admin_user),
) -> ListWebsiteIndexAttemptsResponse:
index_attempts = fetch_index_attempts(sources=[DocumentSource.WEB])
return ListWebsiteIndexAttemptsResponse(
) -> ListIndexAttemptsResponse:
index_attempts = fetch_index_attempts(sources=[source])
return ListIndexAttemptsResponse(
index_attempts=[
IndexAttemptSnapshot(
url=index_attempt.connector_specific_config["url"],
connector_specific_config=index_attempt.connector_specific_config,
status=index_attempt.status,
time_created=index_attempt.time_created,
time_updated=index_attempt.time_updated,

View File

@@ -1,4 +1,5 @@
from datetime import datetime
from typing import Any
from danswer.datastores.interfaces import DatastoreFilter
from danswer.db.models import IndexingStatus
@@ -49,12 +50,8 @@ class UserByEmail(BaseModel):
user_email: str
class WebIndexAttemptRequest(BaseModel):
url: str
class IndexAttemptSnapshot(BaseModel):
url: str
connector_specific_config: dict[str, Any]
status: IndexingStatus
time_created: datetime
time_updated: datetime