mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-08-02 21:22:51 +02:00
* Add basic foundation for teams checkpointing classes * Fix slack connector main entrypoint * Saving changes * Finish teams checkpointing impl * Remove commented out code * Remove more unused code * Move code around * Add threadpool to process requests in parallel * Fix mypy errors / warnings * Move test import to main function only * Address nits on PR * Remove unnecessary check prior to entering while-loop * Remove print statement * Change exception message * Address more nits * Use indexing instead of destructuring * Add back invocation of `run_with_timeout` instead of a direct call * Revert slack testing code * Move early return to before second API call * Pull fetch to team outside of loop * Address nits on PR * Add back client-side filtering * Updated connector to return after a team's indexing is finished * Add type ignore * Implement proper datetime range fetching * Address comment on PR * Rename function * Change exception type when no team with the given id was found * Address nit on PR * Add comment on why `page_loaded` is needed to be specified explicitly * Remove duplicated calls to fetching channels * Use helper function for thread-based yielding instead of manual logic * Move datetime filtering to message-level instead * Address more comments on PR * Add new utility function for yielding sections * Add additional utility function * Add teams tests * Edit error message * Address nits on PR * Promote url-prefix to be a class level constant * Fix mypy error * Remove start/end parameters from function that doesn't use them anymore; move around comments * Address more nits on PR * Add comment
92 lines
3.1 KiB
Python
92 lines
3.1 KiB
Python
from collections.abc import Iterator
|
|
from typing import cast
|
|
from typing import TypeVar
|
|
|
|
from onyx.connectors.connector_runner import CheckpointOutputWrapper
|
|
from onyx.connectors.interfaces import CheckpointedConnector
|
|
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
|
|
from onyx.connectors.models import ConnectorCheckpoint
|
|
from onyx.connectors.models import ConnectorFailure
|
|
from onyx.connectors.models import Document
|
|
from onyx.connectors.models import ImageSection
|
|
from onyx.connectors.models import TextSection
|
|
|
|
_ITERATION_LIMIT = 100_000
|
|
|
|
CT = TypeVar("CT", bound=ConnectorCheckpoint)
|
|
|
|
|
|
def load_all_docs_from_checkpoint_connector(
|
|
connector: CheckpointedConnector[CT],
|
|
start: SecondsSinceUnixEpoch,
|
|
end: SecondsSinceUnixEpoch,
|
|
) -> list[Document]:
|
|
num_iterations = 0
|
|
|
|
checkpoint = cast(CT, connector.build_dummy_checkpoint())
|
|
documents: list[Document] = []
|
|
while checkpoint.has_more:
|
|
doc_batch_generator = CheckpointOutputWrapper[CT]()(
|
|
connector.load_from_checkpoint(start, end, checkpoint)
|
|
)
|
|
for document, failure, next_checkpoint in doc_batch_generator:
|
|
if failure is not None:
|
|
raise RuntimeError(f"Failed to load documents: {failure}")
|
|
if document is not None:
|
|
documents.append(document)
|
|
if next_checkpoint is not None:
|
|
checkpoint = next_checkpoint
|
|
|
|
num_iterations += 1
|
|
if num_iterations > _ITERATION_LIMIT:
|
|
raise RuntimeError("Too many iterations. Infinite loop?")
|
|
|
|
return documents
|
|
|
|
|
|
def load_everything_from_checkpoint_connector(
|
|
connector: CheckpointedConnector[CT],
|
|
start: SecondsSinceUnixEpoch,
|
|
end: SecondsSinceUnixEpoch,
|
|
) -> list[Document | ConnectorFailure]:
|
|
"""Like load_all_docs_from_checkpoint_connector but returns both documents and failures"""
|
|
num_iterations = 0
|
|
|
|
checkpoint = connector.build_dummy_checkpoint()
|
|
outputs: list[Document | ConnectorFailure] = []
|
|
while checkpoint.has_more:
|
|
doc_batch_generator = CheckpointOutputWrapper[CT]()(
|
|
connector.load_from_checkpoint(start, end, checkpoint)
|
|
)
|
|
for document, failure, next_checkpoint in doc_batch_generator:
|
|
if failure is not None:
|
|
outputs.append(failure)
|
|
if document is not None:
|
|
outputs.append(document)
|
|
if next_checkpoint is not None:
|
|
checkpoint = next_checkpoint
|
|
|
|
num_iterations += 1
|
|
if num_iterations > _ITERATION_LIMIT:
|
|
raise RuntimeError("Too many iterations. Infinite loop?")
|
|
|
|
return outputs
|
|
|
|
|
|
def to_sections(
|
|
iterator: Iterator[Document | ConnectorFailure],
|
|
) -> Iterator[TextSection | ImageSection]:
|
|
for doc in iterator:
|
|
if not isinstance(doc, Document):
|
|
failure = doc
|
|
raise RuntimeError(failure)
|
|
|
|
for section in doc.sections:
|
|
yield section
|
|
|
|
|
|
def to_text_sections(iterator: Iterator[TextSection | ImageSection]) -> Iterator[str]:
|
|
for section in iterator:
|
|
if isinstance(section, TextSection):
|
|
yield section.text
|