Ingestion API Additions (#1424)

This commit is contained in:
Yuhong Sun
2024-05-06 21:53:35 -07:00
committed by GitHub
parent 01476a37c3
commit 45d5d7af4a
3 changed files with 91 additions and 58 deletions

View File

@@ -321,3 +321,36 @@ def prepare_to_modify_documents(db_session: Session, document_ids: list[str]) ->
f"Failed to acquire locks after {_NUM_LOCK_ATTEMPTS} attempts "
f"for documents: {document_ids}"
)
def get_ingestion_documents(
db_session: Session,
) -> list[DbDocument]:
# TODO add the option to filter by DocumentSource
stmt = select(DbDocument).where(DbDocument.from_ingestion_api.is_(True))
documents = db_session.execute(stmt).scalars().all()
return list(documents)
def get_documents_by_cc_pair(
cc_pair_id: int,
db_session: Session,
) -> list[DbDocument]:
return (
db_session.query(DbDocument)
.join(
DocumentByConnectorCredentialPair,
DbDocument.id == DocumentByConnectorCredentialPair.id,
)
.join(
ConnectorCredentialPair,
and_(
DocumentByConnectorCredentialPair.connector_id
== ConnectorCredentialPair.connector_id,
DocumentByConnectorCredentialPair.credential_id
== ConnectorCredentialPair.credential_id,
),
)
.filter(ConnectorCredentialPair.id == cc_pair_id)
.all()
)

View File

@@ -9,10 +9,9 @@ from sqlalchemy.orm import Session
from danswer.configs.constants import DocumentSource
from danswer.connectors.models import Document
from danswer.connectors.models import IndexAttemptMetadata
from danswer.db.connector import fetch_connector_by_id
from danswer.db.connector import fetch_ingestion_connector_by_name
from danswer.db.connector_credential_pair import get_connector_credential_pair
from danswer.db.credentials import fetch_credential_by_id
from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id
from danswer.db.document import get_documents_by_cc_pair
from danswer.db.document import get_ingestion_documents
from danswer.db.embedding_model import get_current_db_embedding_model
from danswer.db.embedding_model import get_secondary_db_embedding_model
from danswer.db.engine import get_session
@@ -22,6 +21,7 @@ from danswer.dynamic_configs.factory import get_dynamic_config_store
from danswer.dynamic_configs.interface import ConfigNotFoundError
from danswer.indexing.embedder import DefaultIndexingEmbedder
from danswer.indexing.indexing_pipeline import build_indexing_pipeline
from danswer.server.danswer_api.models import DocMinimalInfo
from danswer.server.danswer_api.models import IngestionDocument
from danswer.server.danswer_api.models import IngestionResult
from danswer.utils.logger import setup_logger
@@ -67,55 +67,45 @@ def api_key_dep(authorization: str = Header(...)) -> str:
return token
@router.post("/doc-ingestion")
def document_ingestion(
@router.get("/connector-docs/{cc_pair_id}")
def get_docs_by_connector_credential_pair(
cc_pair_id: int,
_: str = Depends(api_key_dep),
db_session: Session = Depends(get_session),
) -> list[DocMinimalInfo]:
db_docs = get_documents_by_cc_pair(cc_pair_id=cc_pair_id, db_session=db_session)
return [
DocMinimalInfo(
document_id=doc.id,
semantic_id=doc.semantic_id,
link=doc.link,
)
for doc in db_docs
]
@router.get("/ingestion")
def get_ingestion_docs(
_: str = Depends(api_key_dep),
db_session: Session = Depends(get_session),
) -> list[DocMinimalInfo]:
db_docs = get_ingestion_documents(db_session)
return [
DocMinimalInfo(
document_id=doc.id,
semantic_id=doc.semantic_id,
link=doc.link,
)
for doc in db_docs
]
@router.post("/ingestion")
def upsert_ingestion_doc(
doc_info: IngestionDocument,
_: str = Depends(api_key_dep),
db_session: Session = Depends(get_session),
) -> IngestionResult:
"""Currently only attaches docs to existing connectors (cc-pairs).
Or to the default ingestion connector that is accessible to all users
Things to note:
- The document id if not provided is automatically generated from the semantic identifier
so if the document source type etc is updated, it won't create a duplicate
"""
if doc_info.credential_id:
credential_id = doc_info.credential_id
credential = fetch_credential_by_id(
credential_id=credential_id,
user=None,
db_session=db_session,
assume_admin=True,
)
if credential is None:
raise ValueError("Invalid Credential for doc, does not exist.")
else:
credential_id = 0
connector_id = doc_info.connector_id
# If user provides id and name, id takes precedence
if connector_id is not None:
connector = fetch_connector_by_id(connector_id, db_session)
if connector is None:
raise ValueError("Invalid Connector for doc, id does not exist.")
elif doc_info.connector_name:
connector = fetch_ingestion_connector_by_name(
doc_info.connector_name, db_session
)
if connector is None:
raise ValueError("Invalid Connector for doc, name does not exist.")
connector_id = connector.id
else:
connector_id = 0
cc_pair = get_connector_credential_pair(
connector_id=connector_id, credential_id=credential_id, db_session=db_session
)
if cc_pair is None:
raise ValueError("Connector and Credential not associated.")
# Disregard whatever value is passed, this must be True
doc_info.document.from_ingestion_api = True
document = Document.from_base(doc_info.document)
@@ -124,6 +114,14 @@ def document_ingestion(
if document.source == DocumentSource.INGESTION_API:
document.source = DocumentSource.FILE
cc_pair = get_connector_credential_pair_from_id(
cc_pair_id=doc_info.cc_pair_id or 0, db_session=db_session
)
if cc_pair is None:
raise HTTPException(
status_code=400, detail="Connector-Credential Pair specified does not exist"
)
# Need to index for both the primary and secondary index if possible
curr_ind_name, sec_ind_name = get_both_index_names(db_session)
curr_doc_index = get_default_document_index(
@@ -149,8 +147,8 @@ def document_ingestion(
new_doc, chunks = indexing_pipeline(
documents=[document],
index_attempt_metadata=IndexAttemptMetadata(
connector_id=connector_id,
credential_id=credential_id,
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
),
)
@@ -185,8 +183,8 @@ def document_ingestion(
sec_ind_pipeline(
documents=[document],
index_attempt_metadata=IndexAttemptMetadata(
connector_id=connector_id,
credential_id=credential_id,
connector_id=cc_pair.connector_id,
credential_id=cc_pair.credential_id,
),
)

View File

@@ -5,13 +5,15 @@ from danswer.connectors.models import DocumentBase
class IngestionDocument(BaseModel):
document: DocumentBase
connector_id: int | None = None # Takes precedence over the name
connector_name: str | None = None
credential_id: int | None = None
create_connector: bool = False # Currently not allowed
public_doc: bool = True # To attach to the cc_pair, currently unused
cc_pair_id: int | None
class IngestionResult(BaseModel):
document_id: str
already_existed: bool
class DocMinimalInfo(BaseModel):
document_id: str
semantic_id: str
link: str | None