diff --git a/backend/Dockerfile.background b/backend/Dockerfile.background index 05e45e6e7e6..1c46f6094f2 100644 --- a/backend/Dockerfile.background +++ b/backend/Dockerfile.background @@ -6,6 +6,7 @@ RUN apt-get update \ COPY ./requirements/default.txt /tmp/requirements.txt RUN pip install --no-cache-dir --upgrade -r /tmp/requirements.txt +RUN playwright install WORKDIR /app COPY ./danswer /app/danswer diff --git a/backend/alembic.ini b/backend/alembic.ini new file mode 100644 index 00000000000..10ae5cfdd27 --- /dev/null +++ b/backend/alembic.ini @@ -0,0 +1,108 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts +script_location = alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. +prepend_sys_path = . + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python-dateutil library that can be +# installed by adding `alembic[tz]` to the pip requirements +# string value is passed to dateutil.tz.gettz() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the +# "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to alembic/versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "version_path_separator" below. +# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions + +# version path separator; As mentioned above, this is the character used to split +# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep. +# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas. +# Valid values for version_path_separator are: +# +# version_path_separator = : +# version_path_separator = ; +# version_path_separator = space +version_path_separator = os # Use os.pathsep. Default configuration used for new projects. + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +hooks = black +black.type = console_scripts +black.entrypoint = black +black.options = -l 79 REVISION_SCRIPT_FILENAME + +# Logging configuration +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/backend/alembic/README.md b/backend/alembic/README.md new file mode 100644 index 00000000000..22cbffa1648 --- /dev/null +++ b/backend/alembic/README.md @@ -0,0 +1,15 @@ +Generic single-database configuration with an async dbapi. + +## To generate new migrations: +`alembic revision --autogenerate -m ` + +More info can be found here: https://alembic.sqlalchemy.org/en/latest/autogenerate.html + +## Running migrations + +To run all un-applied migrations: +`alembic upgrade head` + +To undo migrations: +`alembic downgrade -X` +where X is the number of migrations you want to undo from the current state diff --git a/backend/alembic/env.py b/backend/alembic/env.py new file mode 100644 index 00000000000..98ab55034ce --- /dev/null +++ b/backend/alembic/env.py @@ -0,0 +1,89 @@ +import asyncio +from logging.config import fileConfig + +from alembic import context +from danswer.db.engine import build_connection_string +from danswer.db.models import Base +from sqlalchemy import pool +from sqlalchemy.engine import Connection +from sqlalchemy.ext.asyncio import create_async_engine + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = Base.metadata + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = build_connection_string() + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def do_run_migrations(connection: Connection) -> None: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +async def run_async_migrations() -> None: + """In this scenario we need to create an Engine + and associate a connection with the context. + + """ + + connectable = create_async_engine( + build_connection_string(), + poolclass=pool.NullPool, + ) + + async with connectable.connect() as connection: + await connection.run_sync(do_run_migrations) + + await connectable.dispose() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + + asyncio.run(run_async_migrations()) + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/backend/alembic/script.py.mako b/backend/alembic/script.py.mako new file mode 100644 index 00000000000..55df2863d20 --- /dev/null +++ b/backend/alembic/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/backend/alembic/versions/47433d30de82_create_indexattempt_table.py b/backend/alembic/versions/47433d30de82_create_indexattempt_table.py new file mode 100644 index 00000000000..449b35b7d5c --- /dev/null +++ b/backend/alembic/versions/47433d30de82_create_indexattempt_table.py @@ -0,0 +1,72 @@ +"""Create IndexAttempt table + +Revision ID: 47433d30de82 +Revises: +Create Date: 2023-05-04 00:55:32.971991 + +""" +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "47433d30de82" +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "index_attempt", + sa.Column("id", sa.Integer(), nullable=False), + # String type since python enum will change often + sa.Column( + "source", + sa.String(), + nullable=False, + ), + # String type to easily accomodate new ways of pulling + # in documents + sa.Column( + "input_type", + sa.String(), + nullable=False, + ), + sa.Column( + "connector_specific_config", + postgresql.JSONB(), + nullable=False, + ), + sa.Column( + "time_created", + sa.DateTime(timezone=True), + server_default=sa.text("now()"), + nullable=True, + ), + sa.Column( + "time_updated", + sa.DateTime(timezone=True), + server_onupdate=sa.text("now()"), # type: ignore + nullable=True, + ), + sa.Column( + "status", + sa.Enum( + "NOT_STARTED", + "IN_PROGRESS", + "SUCCESS", + "FAILED", + name="indexingstatus", + ), + nullable=False, + ), + sa.Column("document_ids", postgresql.ARRAY(sa.String()), nullable=True), + sa.Column("error_msg", sa.String(), nullable=True), + sa.PrimaryKeyConstraint("id"), + ) + + +def downgrade() -> None: + op.drop_table("index_attempt") + sa.Enum(name="indexingstatus").drop(op.get_bind(), checkfirst=False) diff --git a/backend/danswer/background/update.py b/backend/danswer/background/update.py index 644aff00422..f275f9f8696 100755 --- a/backend/danswer/background/update.py +++ b/backend/danswer/background/update.py @@ -1,8 +1,14 @@ +import asyncio import time from typing import cast +from danswer.configs.constants import DocumentSource from danswer.connectors.slack.config import get_pull_frequency from danswer.connectors.slack.pull import SlackPullLoader +from danswer.connectors.web.batch import BatchWebLoader +from danswer.db.index_attempt import fetch_index_attempts +from danswer.db.index_attempt import update_index_attempt +from danswer.db.models import IndexingStatus from danswer.dynamic_configs import get_dynamic_config_store from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.utils.indexing_pipeline import build_indexing_pipeline @@ -17,7 +23,7 @@ def _check_should_run(current_time: int, last_pull: int, pull_frequency: int) -> return current_time - last_pull > pull_frequency * 60 -def run_update(): +async def run_update(): logger.info("Running update") # TODO (chris): implement a more generic way to run updates # so we don't need to edit this file for future connectors @@ -45,16 +51,61 @@ def run_update(): indexing_pipeline(doc_batch) dynamic_config_store.store(last_slack_pull_key, current_time) + # Web + # TODO (chris): make this more efficient / in a single transaction to + # prevent race conditions across multiple background jobs. For now, + # this assumes we only ever run a single background job at a time + # TODO (chris): make this generic for all pull connectors (not just web) + not_started_index_attempts = await fetch_index_attempts( + sources=[DocumentSource.WEB], statuses=[IndexingStatus.NOT_STARTED] + ) + for not_started_index_attempt in not_started_index_attempts: + logger.info( + "Attempting to index website with IndexAttempt id: " + f"{not_started_index_attempt.id}, source: " + f"{not_started_index_attempt.source}, input_type: " + f"{not_started_index_attempt.input_type}, and connector_specific_config: " + f"{not_started_index_attempt.connector_specific_config}" + ) + await update_index_attempt( + index_attempt_id=not_started_index_attempt.id, + new_status=IndexingStatus.IN_PROGRESS, + ) -if __name__ == "__main__": - DELAY = 60 # 60 seconds + error_msg = None + base_url = not_started_index_attempt.connector_specific_config["url"] + try: + # TODO (chris): make all connectors async + spawn processes to + # parallelize / take advantage of multiple cores + implement retries + document_ids: list[str] = [] + async for doc_batch in BatchWebLoader(base_url=base_url).async_load(): + chunks = indexing_pipeline(doc_batch) + document_ids.extend([chunk.source_document.id for chunk in chunks]) + except Exception as e: + logger.exception( + "Failed to index website with url %s due to: %s", base_url, e + ) + error_msg = str(e) + await update_index_attempt( + index_attempt_id=not_started_index_attempt.id, + new_status=IndexingStatus.FAILED if error_msg else IndexingStatus.SUCCESS, + document_ids=document_ids if not error_msg else None, + error_msg=error_msg, + ) + + +async def update_loop(delay: int = 60): while True: start = time.time() try: - run_update() + await run_update() except Exception: logger.exception("Failed to run update") - sleep_time = DELAY - (time.time() - start) + sleep_time = delay - (time.time() - start) if sleep_time > 0: time.sleep(sleep_time) + + +if __name__ == "__main__": + asyncio.run(update_loop()) diff --git a/backend/danswer/configs/constants.py b/backend/danswer/configs/constants.py index 8570f205b64..cf2b52049dc 100644 --- a/backend/danswer/configs/constants.py +++ b/backend/danswer/configs/constants.py @@ -11,13 +11,7 @@ ALLOWED_USERS = "allowed_users" ALLOWED_GROUPS = "allowed_groups" -class DocumentSource(Enum): - Slack = 1 - Web = 2 - GoogleDrive = 3 - - def __str__(self): - return self.name - - def __int__(self): - return self.value +class DocumentSource(str, Enum): + SLACK = "slack" + WEB = "web" + GOOGLE_DRIVE = "google_drive" diff --git a/backend/danswer/connectors/google_drive/batch.py b/backend/danswer/connectors/google_drive/batch.py index 6c510358dc2..6b8a84df688 100644 --- a/backend/danswer/connectors/google_drive/batch.py +++ b/backend/danswer/connectors/google_drive/batch.py @@ -128,7 +128,7 @@ class BatchGoogleDriveLoader(BatchLoader): Document( id=file["webViewLink"], sections=[Section(link=file["webViewLink"], text=full_context)], - source=DocumentSource.GoogleDrive, + source=DocumentSource.GOOGLE_DRIVE, metadata={}, ) ) diff --git a/backend/danswer/connectors/models.py b/backend/danswer/connectors/models.py index 1b1d562dacb..e3021572993 100644 --- a/backend/danswer/connectors/models.py +++ b/backend/danswer/connectors/models.py @@ -1,7 +1,9 @@ from dataclasses import dataclass +from enum import Enum from typing import Any from danswer.configs.constants import DocumentSource +from pydantic import BaseModel @dataclass @@ -20,3 +22,21 @@ class Document: def get_raw_document_text(document: Document) -> str: return "\n\n".join([section.text for section in document.sections]) + + +class InputType(str, Enum): + PULL = "pull" # e.g. calling slack API to get all messages in the last hour + LOAD_STATE = "load_state" # e.g. loading the state of a slack workspace from a file + EVENT = "event" # e.g. registered an endpoint as a listener, and processing slack events + + +class ConnectorDescriptor(BaseModel): + source: DocumentSource + # how the raw data being indexed is procured + input_type: InputType + # what is passed into the __init__ of the connector described by `source` + # and `input_type` + connector_specific_config: dict[str, Any] + + class Config: + arbitrary_types_allowed = True diff --git a/backend/danswer/connectors/slack/batch.py b/backend/danswer/connectors/slack/batch.py index 4043ffae6f9..b5c9b5947cb 100644 --- a/backend/danswer/connectors/slack/batch.py +++ b/backend/danswer/connectors/slack/batch.py @@ -46,7 +46,7 @@ def _process_batch_event( text=event["text"], ) ], - source=DocumentSource.Slack, + source=DocumentSource.SLACK, metadata={}, ) diff --git a/backend/danswer/connectors/slack/pull.py b/backend/danswer/connectors/slack/pull.py index 7b3af9d3e56..f69e0ad439b 100644 --- a/backend/danswer/connectors/slack/pull.py +++ b/backend/danswer/connectors/slack/pull.py @@ -174,7 +174,7 @@ def thread_to_doc(channel_id: str, thread: ThreadType) -> Document: ) for m in thread ], - source=DocumentSource.Slack, + source=DocumentSource.SLACK, metadata={}, ) diff --git a/backend/danswer/connectors/type_aliases.py b/backend/danswer/connectors/type_aliases.py index f86d8fea277..e73b16afdea 100644 --- a/backend/danswer/connectors/type_aliases.py +++ b/backend/danswer/connectors/type_aliases.py @@ -17,6 +17,8 @@ ProcessDocumentFunc = Callable[..., Document] BuildListenerFunc = Callable[[ConnectorConfig], ProcessDocumentFunc] +# TODO (chris) refactor definition of a connector to match `InputType` +# + make them all async-based class BatchLoader: @abc.abstractmethod def load(self) -> Generator[List[Document], None, None]: diff --git a/backend/danswer/connectors/web/batch.py b/backend/danswer/connectors/web/batch.py index e1596fc2067..f38cb14d2d9 100644 --- a/backend/danswer/connectors/web/batch.py +++ b/backend/danswer/connectors/web/batch.py @@ -1,3 +1,4 @@ +from collections.abc import AsyncGenerator from collections.abc import Generator from typing import Any from typing import cast @@ -7,11 +8,11 @@ from urllib.parse import urlparse from bs4 import BeautifulSoup from danswer.configs.app_configs import INDEX_BATCH_SIZE from danswer.configs.constants import DocumentSource -from danswer.configs.constants import SOURCE_TYPE from danswer.connectors.models import Document from danswer.connectors.models import Section from danswer.connectors.type_aliases import BatchLoader from danswer.utils.logging import setup_logger +from playwright.async_api import async_playwright from playwright.sync_api import sync_playwright logger = setup_logger() @@ -55,6 +56,69 @@ class BatchWebLoader(BatchLoader): self.base_url = base_url self.batch_size = batch_size + async def async_load(self) -> AsyncGenerator[list[Document], None]: + """NOTE: TEMPORARY UNTIL ALL COMPONENTS ARE CONVERTED TO ASYNC + At that point, this will take over from the regular `load` func. + """ + visited_links: set[str] = set() + to_visit: list[str] = [self.base_url] + doc_batch: list[Document] = [] + + async with async_playwright() as playwright: + browser = await playwright.chromium.launch(headless=True) + context = await browser.new_context() + + while to_visit: + current_url = to_visit.pop() + if current_url in visited_links: + continue + visited_links.add(current_url) + + try: + page = await context.new_page() + await page.goto(current_url) + content = await page.content() + soup = BeautifulSoup(content, "html.parser") + + # Heuristics based cleaning + for undesired_tag in ["nav", "header", "footer", "meta"]: + [tag.extract() for tag in soup.find_all(undesired_tag)] + for undesired_div in ["sidebar", "header", "footer"]: + [ + tag.extract() + for tag in soup.find_all("div", {"class": undesired_div}) + ] + + page_text = soup.get_text(TAG_SEPARATOR) + + doc_batch.append( + Document( + id=current_url, + sections=[Section(link=current_url, text=page_text)], + source=DocumentSource.WEB, + metadata={}, + ) + ) + + internal_links = get_internal_links( + self.base_url, current_url, soup + ) + for link in internal_links: + if link not in visited_links: + to_visit.append(link) + + await page.close() + except Exception as e: + logger.error(f"Failed to fetch '{current_url}': {e}") + continue + + if len(doc_batch) >= self.batch_size: + yield doc_batch + doc_batch = [] + + if doc_batch: + yield doc_batch + def load(self) -> Generator[list[Document], None, None]: """Traverses through all pages found on the website and converts them into documents""" @@ -93,7 +157,7 @@ class BatchWebLoader(BatchLoader): Document( id=current_url, sections=[Section(link=current_url, text=page_text)], - source=DocumentSource.Web, + source=DocumentSource.WEB, metadata={}, ) ) diff --git a/backend/danswer/db/engine.py b/backend/danswer/db/engine.py new file mode 100644 index 00000000000..251970a96b2 --- /dev/null +++ b/backend/danswer/db/engine.py @@ -0,0 +1,31 @@ +import os + +from sqlalchemy.ext.asyncio import AsyncEngine +from sqlalchemy.ext.asyncio import create_async_engine + + +ASYNC_DB_API = "asyncpg" +# below are intended to match the env variables names used by the official +# postgres docker image https://hub.docker.com/_/postgres +DEFAULT_USER = os.environ.get("POSTGRES_USER", "postgres") +DEFAULT_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "password") +DEFAULT_HOST = os.environ.get("POSTGRES_HOST", "localhost") +DEFULT_PORT = os.environ.get("POSTGRES_PORT", "5432") +DEFAULT_DB = os.environ.get("POSTGRES_DB", "postgres") + + +def build_connection_string( + *, + db_api: str = ASYNC_DB_API, + user: str = DEFAULT_USER, + password: str = DEFAULT_PASSWORD, + host: str = DEFAULT_HOST, + port: str = DEFULT_PORT, + db: str = DEFAULT_DB, +) -> str: + return f"postgresql+{db_api}://{user}:{password}@{host}:{port}/{db}" + + +def build_async_engine() -> AsyncEngine: + connection_string = build_connection_string() + return create_async_engine(connection_string) diff --git a/backend/danswer/db/index_attempt.py b/backend/danswer/db/index_attempt.py new file mode 100644 index 00000000000..1841563c173 --- /dev/null +++ b/backend/danswer/db/index_attempt.py @@ -0,0 +1,55 @@ +from danswer.configs.constants import DocumentSource +from danswer.db.engine import build_async_engine +from danswer.db.models import IndexAttempt +from danswer.db.models import IndexingStatus +from danswer.utils.logging import setup_logger +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +logger = setup_logger() + + +async def insert_index_attempt(index_attempt: IndexAttempt) -> None: + logger.info(f"Inserting {index_attempt}") + async with AsyncSession(build_async_engine()) as asession: + asession.add(index_attempt) + await asession.commit() + + +async def fetch_index_attempts( + *, + sources: list[DocumentSource] | None = None, + statuses: list[IndexingStatus] | None = None, +) -> list[IndexAttempt]: + async with AsyncSession( + build_async_engine(), future=True, expire_on_commit=False + ) as asession: + stmt = select(IndexAttempt) + if sources: + stmt = stmt.where(IndexAttempt.source.in_(sources)) + if statuses: + stmt = stmt.where(IndexAttempt.status.in_(statuses)) + results = await asession.scalars(stmt) + return list(results.all()) + + +async def update_index_attempt( + *, + index_attempt_id: int, + new_status: IndexingStatus, + document_ids: list[str] | None = None, + error_msg: str | None = None, +) -> bool: + """Returns `True` if successfully updated, `False` if cannot find matching ID""" + async with AsyncSession( + build_async_engine(), future=True, expire_on_commit=False + ) as asession: + stmt = select(IndexAttempt).where(IndexAttempt.id == index_attempt_id) + result = await asession.scalar(stmt) + if result: + result.status = new_status + result.document_ids = document_ids + result.error_msg = error_msg + await asession.commit() + return True + return False diff --git a/backend/danswer/db/models.py b/backend/danswer/db/models.py new file mode 100644 index 00000000000..d679981b6fa --- /dev/null +++ b/backend/danswer/db/models.py @@ -0,0 +1,75 @@ +import datetime +from enum import Enum as PyEnum +from typing import Any + +from danswer.configs.constants import DocumentSource +from danswer.connectors.models import InputType +from fastapi.encoders import jsonable_encoder +from sqlalchemy import DateTime +from sqlalchemy import Enum +from sqlalchemy import func +from sqlalchemy import String +from sqlalchemy.dialects import postgresql +from sqlalchemy.orm import DeclarativeBase +from sqlalchemy.orm import Mapped +from sqlalchemy.orm import mapped_column + + +class Base(DeclarativeBase): + pass + + +class IndexingStatus(str, PyEnum): + NOT_STARTED = "not_started" + IN_PROGRESS = "in_progress" + SUCCESS = "success" + FAILED = "failed" + + +class IndexAttempt(Base): + """ + Represents an attempt to index a group of 1 or more documents from a + source. For example, a single pull from Google Drive, a single event from + slack event API, or a single website crawl. + """ + + __tablename__ = "index_attempt" + + id: Mapped[int] = mapped_column(primary_key=True) + # would like this to be a single JSONB column with structure described by + # `ConnectorDescriptor`, but this is not easily supported and requires + # some difficult to understand magic + source: Mapped[DocumentSource] = mapped_column( + Enum(DocumentSource, native_enum=False) + ) + input_type: Mapped[InputType] = mapped_column(Enum(InputType, native_enum=False)) + connector_specific_config: Mapped[dict[str, Any]] = mapped_column( + postgresql.JSONB(), nullable=False + ) + # TODO (chris): potentially add metadata for the chunker, embedder, and datastore + time_created: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now() + ) + time_updated: Mapped[datetime.datetime] = mapped_column( + DateTime(timezone=True), onupdate=func.now() + ) + status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus)) + document_ids: Mapped[list[str] | None] = mapped_column( + postgresql.ARRAY(String()), default=None + ) # only filled if status = "complete" + error_msg: Mapped[str | None] = mapped_column( + String(), default=None + ) # only filled if status = "failed" + + def __repr__(self): + return ( + f"" + ) diff --git a/backend/danswer/server/admin.py b/backend/danswer/server/admin.py index 3f8ca482f3a..3ec3c7b3885 100644 --- a/backend/danswer/server/admin.py +++ b/backend/danswer/server/admin.py @@ -1,9 +1,19 @@ +from datetime import datetime + +from danswer.configs.constants import DocumentSource +from danswer.connectors.models import ConnectorDescriptor +from danswer.connectors.models import InputType from danswer.connectors.slack.config import get_slack_config from danswer.connectors.slack.config import SlackConfig from danswer.connectors.slack.config import update_slack_config +from danswer.db.index_attempt import fetch_index_attempts +from danswer.db.index_attempt import insert_index_attempt +from danswer.db.models import IndexAttempt +from danswer.db.models import IndexingStatus from danswer.dynamic_configs.interface import ConfigNotFoundError from danswer.utils.logging import setup_logger from fastapi import APIRouter +from pydantic import BaseModel router = APIRouter(prefix="/admin") @@ -21,3 +31,43 @@ def fetch_slack_config(): @router.post("/slack_connector_config") def modify_slack_config(slack_config: SlackConfig): update_slack_config(slack_config) + + +class WebIndexAttemptRequest(BaseModel): + url: str + + +@router.post("/website_index", status_code=201) +async def index_website(web_index_attempt_request: WebIndexAttemptRequest): + index_request = IndexAttempt( + source=DocumentSource.WEB, + input_type=InputType.PULL, + connector_specific_config={"url": web_index_attempt_request.url}, + status=IndexingStatus.NOT_STARTED, + ) + await insert_index_attempt(index_request) + + +class IndexAttemptSnapshot(BaseModel): + url: str + status: IndexingStatus + time_created: datetime + + +class ListWebsiteIndexAttemptsResponse(BaseModel): + index_attempts: list[IndexAttemptSnapshot] + + +@router.get("/website_index") +async def list_website_index_attempts() -> ListWebsiteIndexAttemptsResponse: + index_attempts = await fetch_index_attempts(sources=[DocumentSource.WEB]) + return ListWebsiteIndexAttemptsResponse( + index_attempts=[ + IndexAttemptSnapshot( + url=index_attempt.connector_specific_config["url"], + status=index_attempt.status, + time_created=index_attempt.time_created, + ) + for index_attempt in index_attempts + ] + ) diff --git a/backend/danswer/utils/indexing_pipeline.py b/backend/danswer/utils/indexing_pipeline.py index 0b864d18dc7..db00c9dc40f 100644 --- a/backend/danswer/utils/indexing_pipeline.py +++ b/backend/danswer/utils/indexing_pipeline.py @@ -4,6 +4,7 @@ from itertools import chain from danswer.chunking.chunk import Chunker from danswer.chunking.chunk import DefaultChunker +from danswer.chunking.models import EmbeddedIndexChunk from danswer.connectors.models import Document from danswer.datastores.interfaces import Datastore from danswer.datastores.qdrant.store import QdrantDatastore @@ -16,10 +17,13 @@ def _indexing_pipeline( embedder: Embedder, datastore: Datastore, documents: list[Document], -) -> None: +) -> list[EmbeddedIndexChunk]: + # TODO: make entire indexing pipeline async to not block the entire process + # when running on async endpoints chunks = list(chain(*[chunker.chunk(document) for document in documents])) chunks_with_embeddings = embedder.embed(chunks) datastore.index(chunks_with_embeddings) + return chunks_with_embeddings def build_indexing_pipeline( @@ -27,7 +31,7 @@ def build_indexing_pipeline( chunker: Chunker | None = None, embedder: Embedder | None = None, datastore: Datastore | None = None, -) -> Callable[[list[Document]], None]: +) -> Callable[[list[Document]], list[EmbeddedIndexChunk]]: """Builds a pipline which takes in a list of docs and indexes them. Default uses _ chunker, _ embedder, and qdrant for the datastore""" diff --git a/backend/deployment/docker-compose.yml b/backend/deployment/docker-compose.yml index 9aa4868d9d6..dddb2316efa 100644 --- a/backend/deployment/docker-compose.yml +++ b/backend/deployment/docker-compose.yml @@ -5,21 +5,39 @@ services: build: context: .. dockerfile: Dockerfile + depends_on: + - db # just for local testing ports: - "8080:8080" env_file: - .env + environment: + - POSTGRES_HOST=db volumes: - local_dynamic_storage:/home/storage background: build: context: .. dockerfile: Dockerfile.background + depends_on: + - db env_file: - .env + environment: + - POSTGRES_HOST=db volumes: - local_dynamic_storage:/home/storage + db: + image: postgres:15.2-alpine + restart: always + # POSTGRES_USER and POSTGRES_PASSWORD should be set in .env file + env_file: + - .env + ports: + - "5432:5432" + volumes: + - db:/var/lib/postgresql/data nginx: image: nginx:1.23.4-alpine ports: @@ -39,4 +57,5 @@ services: - ./data/certbot/www:/var/www/certbot entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'" volumes: - local_dynamic_storage: \ No newline at end of file + local_dynamic_storage: + db: diff --git a/backend/requirements/default.txt b/backend/requirements/default.txt index b6b1f9425a7..538cd207e1c 100644 --- a/backend/requirements/default.txt +++ b/backend/requirements/default.txt @@ -1,9 +1,12 @@ +alembic==1.10.4 +asyncpg==0.27.0 beautifulsoup4==4.12.0 fastapi==0.95.0 filelock==3.12.0 google-api-python-client==2.86.0 google-auth-httplib2==0.1.0 google-auth-oauthlib==1.0.0 +Mako==1.2.4 openai==0.27.6 playwright==1.32.1 pydantic==1.10.7 @@ -13,6 +16,7 @@ qdrant-client==1.1.0 requests==2.28.2 sentence-transformers==2.2.2 slack-sdk==3.20.2 +SQLAlchemy[mypy]==2.0.12 transformers==4.27.3 types-beautifulsoup4==4.12.0.3 types-html5lib==1.1.11.13 diff --git a/backend/setup.cfg b/backend/setup.cfg new file mode 100644 index 00000000000..1f858c6cec5 --- /dev/null +++ b/backend/setup.cfg @@ -0,0 +1,2 @@ +[mypy] +plugins = sqlalchemy.ext.mypy.plugin \ No newline at end of file