Files
Raunak Bhagat 312e3b92bc perf: Implement checkpointing for Teams Connector. (#4601)
* Add basic foundation for teams checkpointing classes

* Fix slack connector main entrypoint

* Saving changes

* Finish teams checkpointing impl

* Remove commented out code

* Remove more unused code

* Move code around

* Add threadpool to process requests in parallel

* Fix mypy errors / warnings

* Move test import to main function only

* Address nits on PR

* Remove unnecessary check prior to entering while-loop

* Remove print statement

* Change exception message

* Address more nits

* Use indexing instead of destructuring

* Add back invocation of `run_with_timeout` instead of a direct call

* Revert slack testing code

* Move early return to before second API call

* Pull fetch to team outside of loop

* Address nits on PR

* Add back client-side filtering

* Updated connector to return after a team's indexing is finished

* Add type ignore

* Implement proper datetime range fetching

* Address comment on PR

* Rename function

* Change exception type when no team with the given id was found

* Address nit on PR

* Add comment on why `page_loaded` is needed to be specified explicitly

* Remove duplicated calls to fetching channels

* Use helper function for thread-based yielding instead of manual logic

* Move datetime filtering to message-level instead

* Address more comments on PR

* Add new utility function for yielding sections

* Add additional utility function

* Add teams tests

* Edit error message

* Address nits on PR

* Promote url-prefix to be a class level constant

* Fix mypy error

* Remove start/end parameters from function that doesn't use them anymore; move around comments

* Address more nits on PR

* Add comment
2025-05-14 04:30:57 +00:00

92 lines
3.1 KiB
Python

from collections.abc import Iterator
from typing import cast
from typing import TypeVar
from onyx.connectors.connector_runner import CheckpointOutputWrapper
from onyx.connectors.interfaces import CheckpointedConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
from onyx.connectors.models import Document
from onyx.connectors.models import ImageSection
from onyx.connectors.models import TextSection
_ITERATION_LIMIT = 100_000
CT = TypeVar("CT", bound=ConnectorCheckpoint)
def load_all_docs_from_checkpoint_connector(
connector: CheckpointedConnector[CT],
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
) -> list[Document]:
num_iterations = 0
checkpoint = cast(CT, connector.build_dummy_checkpoint())
documents: list[Document] = []
while checkpoint.has_more:
doc_batch_generator = CheckpointOutputWrapper[CT]()(
connector.load_from_checkpoint(start, end, checkpoint)
)
for document, failure, next_checkpoint in doc_batch_generator:
if failure is not None:
raise RuntimeError(f"Failed to load documents: {failure}")
if document is not None:
documents.append(document)
if next_checkpoint is not None:
checkpoint = next_checkpoint
num_iterations += 1
if num_iterations > _ITERATION_LIMIT:
raise RuntimeError("Too many iterations. Infinite loop?")
return documents
def load_everything_from_checkpoint_connector(
connector: CheckpointedConnector[CT],
start: SecondsSinceUnixEpoch,
end: SecondsSinceUnixEpoch,
) -> list[Document | ConnectorFailure]:
"""Like load_all_docs_from_checkpoint_connector but returns both documents and failures"""
num_iterations = 0
checkpoint = connector.build_dummy_checkpoint()
outputs: list[Document | ConnectorFailure] = []
while checkpoint.has_more:
doc_batch_generator = CheckpointOutputWrapper[CT]()(
connector.load_from_checkpoint(start, end, checkpoint)
)
for document, failure, next_checkpoint in doc_batch_generator:
if failure is not None:
outputs.append(failure)
if document is not None:
outputs.append(document)
if next_checkpoint is not None:
checkpoint = next_checkpoint
num_iterations += 1
if num_iterations > _ITERATION_LIMIT:
raise RuntimeError("Too many iterations. Infinite loop?")
return outputs
def to_sections(
iterator: Iterator[Document | ConnectorFailure],
) -> Iterator[TextSection | ImageSection]:
for doc in iterator:
if not isinstance(doc, Document):
failure = doc
raise RuntimeError(failure)
for section in doc.sections:
yield section
def to_text_sections(iterator: Iterator[TextSection | ImageSection]) -> Iterator[str]:
for section in iterator:
if isinstance(section, TextSection):
yield section.text