diff --git a/backend/danswer/db/document.py b/backend/danswer/db/document.py index b1620fb60f7..da37f3d8aa8 100644 --- a/backend/danswer/db/document.py +++ b/backend/danswer/db/document.py @@ -321,3 +321,36 @@ def prepare_to_modify_documents(db_session: Session, document_ids: list[str]) -> f"Failed to acquire locks after {_NUM_LOCK_ATTEMPTS} attempts " f"for documents: {document_ids}" ) + + +def get_ingestion_documents( + db_session: Session, +) -> list[DbDocument]: + # TODO add the option to filter by DocumentSource + stmt = select(DbDocument).where(DbDocument.from_ingestion_api.is_(True)) + documents = db_session.execute(stmt).scalars().all() + return list(documents) + + +def get_documents_by_cc_pair( + cc_pair_id: int, + db_session: Session, +) -> list[DbDocument]: + return ( + db_session.query(DbDocument) + .join( + DocumentByConnectorCredentialPair, + DbDocument.id == DocumentByConnectorCredentialPair.id, + ) + .join( + ConnectorCredentialPair, + and_( + DocumentByConnectorCredentialPair.connector_id + == ConnectorCredentialPair.connector_id, + DocumentByConnectorCredentialPair.credential_id + == ConnectorCredentialPair.credential_id, + ), + ) + .filter(ConnectorCredentialPair.id == cc_pair_id) + .all() + ) diff --git a/backend/danswer/server/danswer_api/ingestion.py b/backend/danswer/server/danswer_api/ingestion.py index 6e7b349f9e4..33a5b4f52f8 100644 --- a/backend/danswer/server/danswer_api/ingestion.py +++ b/backend/danswer/server/danswer_api/ingestion.py @@ -9,10 +9,9 @@ from sqlalchemy.orm import Session from danswer.configs.constants import DocumentSource from danswer.connectors.models import Document from danswer.connectors.models import IndexAttemptMetadata -from danswer.db.connector import fetch_connector_by_id -from danswer.db.connector import fetch_ingestion_connector_by_name -from danswer.db.connector_credential_pair import get_connector_credential_pair -from danswer.db.credentials import fetch_credential_by_id +from danswer.db.connector_credential_pair import get_connector_credential_pair_from_id +from danswer.db.document import get_documents_by_cc_pair +from danswer.db.document import get_ingestion_documents from danswer.db.embedding_model import get_current_db_embedding_model from danswer.db.embedding_model import get_secondary_db_embedding_model from danswer.db.engine import get_session @@ -22,6 +21,7 @@ from danswer.dynamic_configs.factory import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.indexing.embedder import DefaultIndexingEmbedder from danswer.indexing.indexing_pipeline import build_indexing_pipeline +from danswer.server.danswer_api.models import DocMinimalInfo from danswer.server.danswer_api.models import IngestionDocument from danswer.server.danswer_api.models import IngestionResult from danswer.utils.logger import setup_logger @@ -67,55 +67,45 @@ def api_key_dep(authorization: str = Header(...)) -> str: return token -@router.post("/doc-ingestion") -def document_ingestion( +@router.get("/connector-docs/{cc_pair_id}") +def get_docs_by_connector_credential_pair( + cc_pair_id: int, + _: str = Depends(api_key_dep), + db_session: Session = Depends(get_session), +) -> list[DocMinimalInfo]: + db_docs = get_documents_by_cc_pair(cc_pair_id=cc_pair_id, db_session=db_session) + return [ + DocMinimalInfo( + document_id=doc.id, + semantic_id=doc.semantic_id, + link=doc.link, + ) + for doc in db_docs + ] + + +@router.get("/ingestion") +def get_ingestion_docs( + _: str = Depends(api_key_dep), + db_session: Session = Depends(get_session), +) -> list[DocMinimalInfo]: + db_docs = get_ingestion_documents(db_session) + return [ + DocMinimalInfo( + document_id=doc.id, + semantic_id=doc.semantic_id, + link=doc.link, + ) + for doc in db_docs + ] + + +@router.post("/ingestion") +def upsert_ingestion_doc( doc_info: IngestionDocument, _: str = Depends(api_key_dep), db_session: Session = Depends(get_session), ) -> IngestionResult: - """Currently only attaches docs to existing connectors (cc-pairs). - Or to the default ingestion connector that is accessible to all users - - Things to note: - - The document id if not provided is automatically generated from the semantic identifier - so if the document source type etc is updated, it won't create a duplicate - """ - if doc_info.credential_id: - credential_id = doc_info.credential_id - credential = fetch_credential_by_id( - credential_id=credential_id, - user=None, - db_session=db_session, - assume_admin=True, - ) - if credential is None: - raise ValueError("Invalid Credential for doc, does not exist.") - else: - credential_id = 0 - - connector_id = doc_info.connector_id - # If user provides id and name, id takes precedence - if connector_id is not None: - connector = fetch_connector_by_id(connector_id, db_session) - if connector is None: - raise ValueError("Invalid Connector for doc, id does not exist.") - elif doc_info.connector_name: - connector = fetch_ingestion_connector_by_name( - doc_info.connector_name, db_session - ) - if connector is None: - raise ValueError("Invalid Connector for doc, name does not exist.") - connector_id = connector.id - else: - connector_id = 0 - - cc_pair = get_connector_credential_pair( - connector_id=connector_id, credential_id=credential_id, db_session=db_session - ) - if cc_pair is None: - raise ValueError("Connector and Credential not associated.") - - # Disregard whatever value is passed, this must be True doc_info.document.from_ingestion_api = True document = Document.from_base(doc_info.document) @@ -124,6 +114,14 @@ def document_ingestion( if document.source == DocumentSource.INGESTION_API: document.source = DocumentSource.FILE + cc_pair = get_connector_credential_pair_from_id( + cc_pair_id=doc_info.cc_pair_id or 0, db_session=db_session + ) + if cc_pair is None: + raise HTTPException( + status_code=400, detail="Connector-Credential Pair specified does not exist" + ) + # Need to index for both the primary and secondary index if possible curr_ind_name, sec_ind_name = get_both_index_names(db_session) curr_doc_index = get_default_document_index( @@ -149,8 +147,8 @@ def document_ingestion( new_doc, chunks = indexing_pipeline( documents=[document], index_attempt_metadata=IndexAttemptMetadata( - connector_id=connector_id, - credential_id=credential_id, + connector_id=cc_pair.connector_id, + credential_id=cc_pair.credential_id, ), ) @@ -185,8 +183,8 @@ def document_ingestion( sec_ind_pipeline( documents=[document], index_attempt_metadata=IndexAttemptMetadata( - connector_id=connector_id, - credential_id=credential_id, + connector_id=cc_pair.connector_id, + credential_id=cc_pair.credential_id, ), ) diff --git a/backend/danswer/server/danswer_api/models.py b/backend/danswer/server/danswer_api/models.py index 9b10bef7621..8a534c3e31b 100644 --- a/backend/danswer/server/danswer_api/models.py +++ b/backend/danswer/server/danswer_api/models.py @@ -5,13 +5,15 @@ from danswer.connectors.models import DocumentBase class IngestionDocument(BaseModel): document: DocumentBase - connector_id: int | None = None # Takes precedence over the name - connector_name: str | None = None - credential_id: int | None = None - create_connector: bool = False # Currently not allowed - public_doc: bool = True # To attach to the cc_pair, currently unused + cc_pair_id: int | None class IngestionResult(BaseModel): document_id: str already_existed: bool + + +class DocMinimalInfo(BaseModel): + document_id: str + semantic_id: str + link: str | None