mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-09-20 13:05:49 +02:00
Make slack periodic use the DB
This commit is contained in:
@@ -3,12 +3,15 @@ from typing import cast
|
||||
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.factory import build_connector
|
||||
from danswer.connectors.factory import build_pull_connector
|
||||
from danswer.connectors.models import InputType
|
||||
from danswer.connectors.slack.config import get_pull_frequency
|
||||
from danswer.connectors.slack.pull import PeriodicSlackLoader
|
||||
from danswer.connectors.web.pull import WebLoader
|
||||
from danswer.db.index_attempt import fetch_index_attempts
|
||||
from danswer.db.index_attempt import insert_index_attempt
|
||||
from danswer.db.index_attempt import update_index_attempt
|
||||
from danswer.db.models import IndexAttempt
|
||||
from danswer.db.models import IndexingStatus
|
||||
from danswer.dynamic_configs import get_dynamic_config_store
|
||||
from danswer.dynamic_configs.interface import ConfigNotFoundError
|
||||
@@ -51,9 +54,21 @@ def run_update() -> None:
|
||||
if last_pull is None or _check_should_run(
|
||||
current_time, last_pull, pull_frequency
|
||||
):
|
||||
logger.info(f"Running slack pull from {last_pull or 0} to {current_time}")
|
||||
for doc_batch in PeriodicSlackLoader().load(last_pull or 0, current_time):
|
||||
indexing_pipeline(doc_batch)
|
||||
# TODO (chris): go back to only fetching messages that have changed
|
||||
# since the last pull. Not supported for now due to how we compute the
|
||||
# number of documents indexed for the admin dashboard (only look at latest)
|
||||
logger.info("Scheduling periodic slack pull")
|
||||
insert_index_attempt(
|
||||
IndexAttempt(
|
||||
source=DocumentSource.SLACK,
|
||||
input_type=InputType.PULL,
|
||||
status=IndexingStatus.NOT_STARTED,
|
||||
connector_specific_config={},
|
||||
)
|
||||
)
|
||||
# not 100% accurate, but the inaccuracy will result in more
|
||||
# frequent pulling rather than less frequent, which is fine
|
||||
# for now
|
||||
dynamic_config_store.store(last_slack_pull_key, current_time)
|
||||
|
||||
# TODO (chris): make this more efficient / in a single transaction to
|
||||
@@ -79,9 +94,8 @@ def run_update() -> None:
|
||||
try:
|
||||
# TODO (chris): spawn processes to parallelize / take advantage of
|
||||
# multiple cores + implement retries
|
||||
connector = build_connector(
|
||||
connector = build_pull_connector(
|
||||
source=not_started_index_attempt.source,
|
||||
input_type=InputType.PULL,
|
||||
connector_specific_config=not_started_index_attempt.connector_specific_config,
|
||||
)
|
||||
|
||||
|
@@ -1,3 +1,5 @@
|
||||
import time
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
|
||||
from danswer.configs.constants import DocumentSource
|
||||
@@ -5,11 +7,14 @@ from danswer.connectors.github.batch import BatchGithubLoader
|
||||
from danswer.connectors.google_drive.batch import BatchGoogleDriveLoader
|
||||
from danswer.connectors.interfaces import PullLoader
|
||||
from danswer.connectors.interfaces import RangePullLoader
|
||||
from danswer.connectors.models import Document
|
||||
from danswer.connectors.models import InputType
|
||||
from danswer.connectors.slack.batch import BatchSlackLoader
|
||||
from danswer.connectors.slack.pull import PeriodicSlackLoader
|
||||
from danswer.connectors.web.pull import WebLoader
|
||||
|
||||
_NUM_SECONDS_IN_DAY = 86400
|
||||
|
||||
|
||||
class ConnectorMissingException(Exception):
|
||||
pass
|
||||
@@ -38,3 +43,23 @@ def build_connector(
|
||||
raise ConnectorMissingException(
|
||||
f"Connector not found for source={source}, input_type={input_type}"
|
||||
)
|
||||
|
||||
|
||||
def build_pull_connector(
|
||||
source: DocumentSource, connector_specific_config: dict[str, Any]
|
||||
) -> PullLoader:
|
||||
return _range_pull_to_pull(
|
||||
build_connector(source, InputType.PULL, connector_specific_config)
|
||||
)
|
||||
|
||||
|
||||
def _range_pull_to_pull(range_pull_connector: RangePullLoader) -> PullLoader:
|
||||
class _Connector(PullLoader):
|
||||
def __init__(self) -> None:
|
||||
self._connector = range_pull_connector
|
||||
|
||||
def load(self) -> Generator[list[Document], None, None]:
|
||||
# adding some buffer to make sure we get all documents
|
||||
return self._connector.load(0, time.time() + _NUM_SECONDS_IN_DAY)
|
||||
|
||||
return _Connector()
|
||||
|
@@ -1,7 +1,6 @@
|
||||
import abc
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
from typing import List
|
||||
|
||||
from danswer.connectors.models import Document
|
||||
|
||||
@@ -12,7 +11,7 @@ SecondsSinceUnixEpoch = float
|
||||
# TODO (chris): rename from Loader -> Connector
|
||||
class PullLoader:
|
||||
@abc.abstractmethod
|
||||
def load(self) -> Generator[List[Document], None, None]:
|
||||
def load(self) -> Generator[list[Document], None, None]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@@ -20,11 +19,11 @@ class RangePullLoader:
|
||||
@abc.abstractmethod
|
||||
def load(
|
||||
self, start: SecondsSinceUnixEpoch, end: SecondsSinceUnixEpoch
|
||||
) -> Generator[List[Document], None, None]:
|
||||
) -> Generator[list[Document], None, None]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class PushLoader:
|
||||
@abc.abstractmethod
|
||||
def load(self, event: Any) -> Generator[List[Document], None, None]:
|
||||
def load(self, event: Any) -> Generator[list[Document], None, None]:
|
||||
raise NotImplementedError
|
||||
|
Reference in New Issue
Block a user