mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-07-13 06:32:57 +02:00
Setup Postgres to docker compose + add web indexing APIs + update background runner to look for web indices to run (#13)
* Adding Postgres to docker compose * Model / migrations for indexing logs
This commit is contained in:
@ -6,6 +6,7 @@ RUN apt-get update \
|
||||
|
||||
COPY ./requirements/default.txt /tmp/requirements.txt
|
||||
RUN pip install --no-cache-dir --upgrade -r /tmp/requirements.txt
|
||||
RUN playwright install
|
||||
|
||||
WORKDIR /app
|
||||
COPY ./danswer /app/danswer
|
||||
|
108
backend/alembic.ini
Normal file
108
backend/alembic.ini
Normal file
@ -0,0 +1,108 @@
|
||||
# A generic, single database configuration.
|
||||
|
||||
[alembic]
|
||||
# path to migration scripts
|
||||
script_location = alembic
|
||||
|
||||
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
|
||||
# Uncomment the line below if you want the files to be prepended with date and time
|
||||
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
|
||||
|
||||
# sys.path path, will be prepended to sys.path if present.
|
||||
# defaults to the current working directory.
|
||||
prepend_sys_path = .
|
||||
|
||||
# timezone to use when rendering the date within the migration file
|
||||
# as well as the filename.
|
||||
# If specified, requires the python-dateutil library that can be
|
||||
# installed by adding `alembic[tz]` to the pip requirements
|
||||
# string value is passed to dateutil.tz.gettz()
|
||||
# leave blank for localtime
|
||||
# timezone =
|
||||
|
||||
# max length of characters to apply to the
|
||||
# "slug" field
|
||||
# truncate_slug_length = 40
|
||||
|
||||
# set to 'true' to run the environment during
|
||||
# the 'revision' command, regardless of autogenerate
|
||||
# revision_environment = false
|
||||
|
||||
# set to 'true' to allow .pyc and .pyo files without
|
||||
# a source .py file to be detected as revisions in the
|
||||
# versions/ directory
|
||||
# sourceless = false
|
||||
|
||||
# version location specification; This defaults
|
||||
# to alembic/versions. When using multiple version
|
||||
# directories, initial revisions must be specified with --version-path.
|
||||
# The path separator used here should be the separator specified by "version_path_separator" below.
|
||||
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
|
||||
|
||||
# version path separator; As mentioned above, this is the character used to split
|
||||
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
|
||||
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
|
||||
# Valid values for version_path_separator are:
|
||||
#
|
||||
# version_path_separator = :
|
||||
# version_path_separator = ;
|
||||
# version_path_separator = space
|
||||
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
|
||||
|
||||
# set to 'true' to search source files recursively
|
||||
# in each "version_locations" directory
|
||||
# new in Alembic version 1.10
|
||||
# recursive_version_locations = false
|
||||
|
||||
# the output encoding used when revision files
|
||||
# are written from script.py.mako
|
||||
# output_encoding = utf-8
|
||||
|
||||
# sqlalchemy.url = driver://user:pass@localhost/dbname
|
||||
|
||||
|
||||
[post_write_hooks]
|
||||
# post_write_hooks defines scripts or Python functions that are run
|
||||
# on newly generated revision scripts. See the documentation for further
|
||||
# detail and examples
|
||||
|
||||
# format using "black" - use the console_scripts runner, against the "black" entrypoint
|
||||
hooks = black
|
||||
black.type = console_scripts
|
||||
black.entrypoint = black
|
||||
black.options = -l 79 REVISION_SCRIPT_FILENAME
|
||||
|
||||
# Logging configuration
|
||||
[loggers]
|
||||
keys = root,sqlalchemy,alembic
|
||||
|
||||
[handlers]
|
||||
keys = console
|
||||
|
||||
[formatters]
|
||||
keys = generic
|
||||
|
||||
[logger_root]
|
||||
level = WARN
|
||||
handlers = console
|
||||
qualname =
|
||||
|
||||
[logger_sqlalchemy]
|
||||
level = WARN
|
||||
handlers =
|
||||
qualname = sqlalchemy.engine
|
||||
|
||||
[logger_alembic]
|
||||
level = INFO
|
||||
handlers =
|
||||
qualname = alembic
|
||||
|
||||
[handler_console]
|
||||
class = StreamHandler
|
||||
args = (sys.stderr,)
|
||||
level = NOTSET
|
||||
formatter = generic
|
||||
|
||||
[formatter_generic]
|
||||
format = %(levelname)-5.5s [%(name)s] %(message)s
|
||||
datefmt = %H:%M:%S
|
15
backend/alembic/README.md
Normal file
15
backend/alembic/README.md
Normal file
@ -0,0 +1,15 @@
|
||||
Generic single-database configuration with an async dbapi.
|
||||
|
||||
## To generate new migrations:
|
||||
`alembic revision --autogenerate -m <DESCRIPTION_OF_MIGRATION>`
|
||||
|
||||
More info can be found here: https://alembic.sqlalchemy.org/en/latest/autogenerate.html
|
||||
|
||||
## Running migrations
|
||||
|
||||
To run all un-applied migrations:
|
||||
`alembic upgrade head`
|
||||
|
||||
To undo migrations:
|
||||
`alembic downgrade -X`
|
||||
where X is the number of migrations you want to undo from the current state
|
89
backend/alembic/env.py
Normal file
89
backend/alembic/env.py
Normal file
@ -0,0 +1,89 @@
|
||||
import asyncio
|
||||
from logging.config import fileConfig
|
||||
|
||||
from alembic import context
|
||||
from danswer.db.engine import build_connection_string
|
||||
from danswer.db.models import Base
|
||||
from sqlalchemy import pool
|
||||
from sqlalchemy.engine import Connection
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
|
||||
# this is the Alembic Config object, which provides
|
||||
# access to the values within the .ini file in use.
|
||||
config = context.config
|
||||
|
||||
# Interpret the config file for Python logging.
|
||||
# This line sets up loggers basically.
|
||||
if config.config_file_name is not None:
|
||||
fileConfig(config.config_file_name)
|
||||
|
||||
# add your model's MetaData object here
|
||||
# for 'autogenerate' support
|
||||
# from myapp import mymodel
|
||||
# target_metadata = mymodel.Base.metadata
|
||||
target_metadata = Base.metadata
|
||||
|
||||
# other values from the config, defined by the needs of env.py,
|
||||
# can be acquired:
|
||||
# my_important_option = config.get_main_option("my_important_option")
|
||||
# ... etc.
|
||||
|
||||
|
||||
def run_migrations_offline() -> None:
|
||||
"""Run migrations in 'offline' mode.
|
||||
|
||||
This configures the context with just a URL
|
||||
and not an Engine, though an Engine is acceptable
|
||||
here as well. By skipping the Engine creation
|
||||
we don't even need a DBAPI to be available.
|
||||
|
||||
Calls to context.execute() here emit the given string to the
|
||||
script output.
|
||||
|
||||
"""
|
||||
url = build_connection_string()
|
||||
context.configure(
|
||||
url=url,
|
||||
target_metadata=target_metadata,
|
||||
literal_binds=True,
|
||||
dialect_opts={"paramstyle": "named"},
|
||||
)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
def do_run_migrations(connection: Connection) -> None:
|
||||
context.configure(connection=connection, target_metadata=target_metadata)
|
||||
|
||||
with context.begin_transaction():
|
||||
context.run_migrations()
|
||||
|
||||
|
||||
async def run_async_migrations() -> None:
|
||||
"""In this scenario we need to create an Engine
|
||||
and associate a connection with the context.
|
||||
|
||||
"""
|
||||
|
||||
connectable = create_async_engine(
|
||||
build_connection_string(),
|
||||
poolclass=pool.NullPool,
|
||||
)
|
||||
|
||||
async with connectable.connect() as connection:
|
||||
await connection.run_sync(do_run_migrations)
|
||||
|
||||
await connectable.dispose()
|
||||
|
||||
|
||||
def run_migrations_online() -> None:
|
||||
"""Run migrations in 'online' mode."""
|
||||
|
||||
asyncio.run(run_async_migrations())
|
||||
|
||||
|
||||
if context.is_offline_mode():
|
||||
run_migrations_offline()
|
||||
else:
|
||||
run_migrations_online()
|
24
backend/alembic/script.py.mako
Normal file
24
backend/alembic/script.py.mako
Normal file
@ -0,0 +1,24 @@
|
||||
"""${message}
|
||||
|
||||
Revision ID: ${up_revision}
|
||||
Revises: ${down_revision | comma,n}
|
||||
Create Date: ${create_date}
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
${imports if imports else ""}
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = ${repr(up_revision)}
|
||||
down_revision = ${repr(down_revision)}
|
||||
branch_labels = ${repr(branch_labels)}
|
||||
depends_on = ${repr(depends_on)}
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
${upgrades if upgrades else "pass"}
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
${downgrades if downgrades else "pass"}
|
@ -0,0 +1,72 @@
|
||||
"""Create IndexAttempt table
|
||||
|
||||
Revision ID: 47433d30de82
|
||||
Revises:
|
||||
Create Date: 2023-05-04 00:55:32.971991
|
||||
|
||||
"""
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "47433d30de82"
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.create_table(
|
||||
"index_attempt",
|
||||
sa.Column("id", sa.Integer(), nullable=False),
|
||||
# String type since python enum will change often
|
||||
sa.Column(
|
||||
"source",
|
||||
sa.String(),
|
||||
nullable=False,
|
||||
),
|
||||
# String type to easily accomodate new ways of pulling
|
||||
# in documents
|
||||
sa.Column(
|
||||
"input_type",
|
||||
sa.String(),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column(
|
||||
"connector_specific_config",
|
||||
postgresql.JSONB(),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column(
|
||||
"time_created",
|
||||
sa.DateTime(timezone=True),
|
||||
server_default=sa.text("now()"),
|
||||
nullable=True,
|
||||
),
|
||||
sa.Column(
|
||||
"time_updated",
|
||||
sa.DateTime(timezone=True),
|
||||
server_onupdate=sa.text("now()"), # type: ignore
|
||||
nullable=True,
|
||||
),
|
||||
sa.Column(
|
||||
"status",
|
||||
sa.Enum(
|
||||
"NOT_STARTED",
|
||||
"IN_PROGRESS",
|
||||
"SUCCESS",
|
||||
"FAILED",
|
||||
name="indexingstatus",
|
||||
),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("document_ids", postgresql.ARRAY(sa.String()), nullable=True),
|
||||
sa.Column("error_msg", sa.String(), nullable=True),
|
||||
sa.PrimaryKeyConstraint("id"),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_table("index_attempt")
|
||||
sa.Enum(name="indexingstatus").drop(op.get_bind(), checkfirst=False)
|
@ -1,8 +1,14 @@
|
||||
import asyncio
|
||||
import time
|
||||
from typing import cast
|
||||
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.slack.config import get_pull_frequency
|
||||
from danswer.connectors.slack.pull import SlackPullLoader
|
||||
from danswer.connectors.web.batch import BatchWebLoader
|
||||
from danswer.db.index_attempt import fetch_index_attempts
|
||||
from danswer.db.index_attempt import update_index_attempt
|
||||
from danswer.db.models import IndexingStatus
|
||||
from danswer.dynamic_configs import get_dynamic_config_store
|
||||
from danswer.dynamic_configs.interface import ConfigNotFoundError
|
||||
from danswer.utils.indexing_pipeline import build_indexing_pipeline
|
||||
@ -17,7 +23,7 @@ def _check_should_run(current_time: int, last_pull: int, pull_frequency: int) ->
|
||||
return current_time - last_pull > pull_frequency * 60
|
||||
|
||||
|
||||
def run_update():
|
||||
async def run_update():
|
||||
logger.info("Running update")
|
||||
# TODO (chris): implement a more generic way to run updates
|
||||
# so we don't need to edit this file for future connectors
|
||||
@ -45,16 +51,61 @@ def run_update():
|
||||
indexing_pipeline(doc_batch)
|
||||
dynamic_config_store.store(last_slack_pull_key, current_time)
|
||||
|
||||
# Web
|
||||
# TODO (chris): make this more efficient / in a single transaction to
|
||||
# prevent race conditions across multiple background jobs. For now,
|
||||
# this assumes we only ever run a single background job at a time
|
||||
# TODO (chris): make this generic for all pull connectors (not just web)
|
||||
not_started_index_attempts = await fetch_index_attempts(
|
||||
sources=[DocumentSource.WEB], statuses=[IndexingStatus.NOT_STARTED]
|
||||
)
|
||||
for not_started_index_attempt in not_started_index_attempts:
|
||||
logger.info(
|
||||
"Attempting to index website with IndexAttempt id: "
|
||||
f"{not_started_index_attempt.id}, source: "
|
||||
f"{not_started_index_attempt.source}, input_type: "
|
||||
f"{not_started_index_attempt.input_type}, and connector_specific_config: "
|
||||
f"{not_started_index_attempt.connector_specific_config}"
|
||||
)
|
||||
await update_index_attempt(
|
||||
index_attempt_id=not_started_index_attempt.id,
|
||||
new_status=IndexingStatus.IN_PROGRESS,
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
DELAY = 60 # 60 seconds
|
||||
error_msg = None
|
||||
base_url = not_started_index_attempt.connector_specific_config["url"]
|
||||
try:
|
||||
# TODO (chris): make all connectors async + spawn processes to
|
||||
# parallelize / take advantage of multiple cores + implement retries
|
||||
document_ids: list[str] = []
|
||||
async for doc_batch in BatchWebLoader(base_url=base_url).async_load():
|
||||
chunks = indexing_pipeline(doc_batch)
|
||||
document_ids.extend([chunk.source_document.id for chunk in chunks])
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to index website with url %s due to: %s", base_url, e
|
||||
)
|
||||
error_msg = str(e)
|
||||
|
||||
await update_index_attempt(
|
||||
index_attempt_id=not_started_index_attempt.id,
|
||||
new_status=IndexingStatus.FAILED if error_msg else IndexingStatus.SUCCESS,
|
||||
document_ids=document_ids if not error_msg else None,
|
||||
error_msg=error_msg,
|
||||
)
|
||||
|
||||
|
||||
async def update_loop(delay: int = 60):
|
||||
while True:
|
||||
start = time.time()
|
||||
try:
|
||||
run_update()
|
||||
await run_update()
|
||||
except Exception:
|
||||
logger.exception("Failed to run update")
|
||||
sleep_time = DELAY - (time.time() - start)
|
||||
sleep_time = delay - (time.time() - start)
|
||||
if sleep_time > 0:
|
||||
time.sleep(sleep_time)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(update_loop())
|
||||
|
@ -11,13 +11,7 @@ ALLOWED_USERS = "allowed_users"
|
||||
ALLOWED_GROUPS = "allowed_groups"
|
||||
|
||||
|
||||
class DocumentSource(Enum):
|
||||
Slack = 1
|
||||
Web = 2
|
||||
GoogleDrive = 3
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def __int__(self):
|
||||
return self.value
|
||||
class DocumentSource(str, Enum):
|
||||
SLACK = "slack"
|
||||
WEB = "web"
|
||||
GOOGLE_DRIVE = "google_drive"
|
||||
|
@ -128,7 +128,7 @@ class BatchGoogleDriveLoader(BatchLoader):
|
||||
Document(
|
||||
id=file["webViewLink"],
|
||||
sections=[Section(link=file["webViewLink"], text=full_context)],
|
||||
source=DocumentSource.GoogleDrive,
|
||||
source=DocumentSource.GOOGLE_DRIVE,
|
||||
metadata={},
|
||||
)
|
||||
)
|
||||
|
@ -1,7 +1,9 @@
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -20,3 +22,21 @@ class Document:
|
||||
|
||||
def get_raw_document_text(document: Document) -> str:
|
||||
return "\n\n".join([section.text for section in document.sections])
|
||||
|
||||
|
||||
class InputType(str, Enum):
|
||||
PULL = "pull" # e.g. calling slack API to get all messages in the last hour
|
||||
LOAD_STATE = "load_state" # e.g. loading the state of a slack workspace from a file
|
||||
EVENT = "event" # e.g. registered an endpoint as a listener, and processing slack events
|
||||
|
||||
|
||||
class ConnectorDescriptor(BaseModel):
|
||||
source: DocumentSource
|
||||
# how the raw data being indexed is procured
|
||||
input_type: InputType
|
||||
# what is passed into the __init__ of the connector described by `source`
|
||||
# and `input_type`
|
||||
connector_specific_config: dict[str, Any]
|
||||
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
@ -46,7 +46,7 @@ def _process_batch_event(
|
||||
text=event["text"],
|
||||
)
|
||||
],
|
||||
source=DocumentSource.Slack,
|
||||
source=DocumentSource.SLACK,
|
||||
metadata={},
|
||||
)
|
||||
|
||||
|
@ -174,7 +174,7 @@ def thread_to_doc(channel_id: str, thread: ThreadType) -> Document:
|
||||
)
|
||||
for m in thread
|
||||
],
|
||||
source=DocumentSource.Slack,
|
||||
source=DocumentSource.SLACK,
|
||||
metadata={},
|
||||
)
|
||||
|
||||
|
@ -17,6 +17,8 @@ ProcessDocumentFunc = Callable[..., Document]
|
||||
BuildListenerFunc = Callable[[ConnectorConfig], ProcessDocumentFunc]
|
||||
|
||||
|
||||
# TODO (chris) refactor definition of a connector to match `InputType`
|
||||
# + make them all async-based
|
||||
class BatchLoader:
|
||||
@abc.abstractmethod
|
||||
def load(self) -> Generator[List[Document], None, None]:
|
||||
|
@ -1,3 +1,4 @@
|
||||
from collections.abc import AsyncGenerator
|
||||
from collections.abc import Generator
|
||||
from typing import Any
|
||||
from typing import cast
|
||||
@ -7,11 +8,11 @@ from urllib.parse import urlparse
|
||||
from bs4 import BeautifulSoup
|
||||
from danswer.configs.app_configs import INDEX_BATCH_SIZE
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.configs.constants import SOURCE_TYPE
|
||||
from danswer.connectors.models import Document
|
||||
from danswer.connectors.models import Section
|
||||
from danswer.connectors.type_aliases import BatchLoader
|
||||
from danswer.utils.logging import setup_logger
|
||||
from playwright.async_api import async_playwright
|
||||
from playwright.sync_api import sync_playwright
|
||||
|
||||
logger = setup_logger()
|
||||
@ -55,6 +56,69 @@ class BatchWebLoader(BatchLoader):
|
||||
self.base_url = base_url
|
||||
self.batch_size = batch_size
|
||||
|
||||
async def async_load(self) -> AsyncGenerator[list[Document], None]:
|
||||
"""NOTE: TEMPORARY UNTIL ALL COMPONENTS ARE CONVERTED TO ASYNC
|
||||
At that point, this will take over from the regular `load` func.
|
||||
"""
|
||||
visited_links: set[str] = set()
|
||||
to_visit: list[str] = [self.base_url]
|
||||
doc_batch: list[Document] = []
|
||||
|
||||
async with async_playwright() as playwright:
|
||||
browser = await playwright.chromium.launch(headless=True)
|
||||
context = await browser.new_context()
|
||||
|
||||
while to_visit:
|
||||
current_url = to_visit.pop()
|
||||
if current_url in visited_links:
|
||||
continue
|
||||
visited_links.add(current_url)
|
||||
|
||||
try:
|
||||
page = await context.new_page()
|
||||
await page.goto(current_url)
|
||||
content = await page.content()
|
||||
soup = BeautifulSoup(content, "html.parser")
|
||||
|
||||
# Heuristics based cleaning
|
||||
for undesired_tag in ["nav", "header", "footer", "meta"]:
|
||||
[tag.extract() for tag in soup.find_all(undesired_tag)]
|
||||
for undesired_div in ["sidebar", "header", "footer"]:
|
||||
[
|
||||
tag.extract()
|
||||
for tag in soup.find_all("div", {"class": undesired_div})
|
||||
]
|
||||
|
||||
page_text = soup.get_text(TAG_SEPARATOR)
|
||||
|
||||
doc_batch.append(
|
||||
Document(
|
||||
id=current_url,
|
||||
sections=[Section(link=current_url, text=page_text)],
|
||||
source=DocumentSource.WEB,
|
||||
metadata={},
|
||||
)
|
||||
)
|
||||
|
||||
internal_links = get_internal_links(
|
||||
self.base_url, current_url, soup
|
||||
)
|
||||
for link in internal_links:
|
||||
if link not in visited_links:
|
||||
to_visit.append(link)
|
||||
|
||||
await page.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to fetch '{current_url}': {e}")
|
||||
continue
|
||||
|
||||
if len(doc_batch) >= self.batch_size:
|
||||
yield doc_batch
|
||||
doc_batch = []
|
||||
|
||||
if doc_batch:
|
||||
yield doc_batch
|
||||
|
||||
def load(self) -> Generator[list[Document], None, None]:
|
||||
"""Traverses through all pages found on the website
|
||||
and converts them into documents"""
|
||||
@ -93,7 +157,7 @@ class BatchWebLoader(BatchLoader):
|
||||
Document(
|
||||
id=current_url,
|
||||
sections=[Section(link=current_url, text=page_text)],
|
||||
source=DocumentSource.Web,
|
||||
source=DocumentSource.WEB,
|
||||
metadata={},
|
||||
)
|
||||
)
|
||||
|
31
backend/danswer/db/engine.py
Normal file
31
backend/danswer/db/engine.py
Normal file
@ -0,0 +1,31 @@
|
||||
import os
|
||||
|
||||
from sqlalchemy.ext.asyncio import AsyncEngine
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
|
||||
|
||||
ASYNC_DB_API = "asyncpg"
|
||||
# below are intended to match the env variables names used by the official
|
||||
# postgres docker image https://hub.docker.com/_/postgres
|
||||
DEFAULT_USER = os.environ.get("POSTGRES_USER", "postgres")
|
||||
DEFAULT_PASSWORD = os.environ.get("POSTGRES_PASSWORD", "password")
|
||||
DEFAULT_HOST = os.environ.get("POSTGRES_HOST", "localhost")
|
||||
DEFULT_PORT = os.environ.get("POSTGRES_PORT", "5432")
|
||||
DEFAULT_DB = os.environ.get("POSTGRES_DB", "postgres")
|
||||
|
||||
|
||||
def build_connection_string(
|
||||
*,
|
||||
db_api: str = ASYNC_DB_API,
|
||||
user: str = DEFAULT_USER,
|
||||
password: str = DEFAULT_PASSWORD,
|
||||
host: str = DEFAULT_HOST,
|
||||
port: str = DEFULT_PORT,
|
||||
db: str = DEFAULT_DB,
|
||||
) -> str:
|
||||
return f"postgresql+{db_api}://{user}:{password}@{host}:{port}/{db}"
|
||||
|
||||
|
||||
def build_async_engine() -> AsyncEngine:
|
||||
connection_string = build_connection_string()
|
||||
return create_async_engine(connection_string)
|
55
backend/danswer/db/index_attempt.py
Normal file
55
backend/danswer/db/index_attempt.py
Normal file
@ -0,0 +1,55 @@
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.db.engine import build_async_engine
|
||||
from danswer.db.models import IndexAttempt
|
||||
from danswer.db.models import IndexingStatus
|
||||
from danswer.utils.logging import setup_logger
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
logger = setup_logger()
|
||||
|
||||
|
||||
async def insert_index_attempt(index_attempt: IndexAttempt) -> None:
|
||||
logger.info(f"Inserting {index_attempt}")
|
||||
async with AsyncSession(build_async_engine()) as asession:
|
||||
asession.add(index_attempt)
|
||||
await asession.commit()
|
||||
|
||||
|
||||
async def fetch_index_attempts(
|
||||
*,
|
||||
sources: list[DocumentSource] | None = None,
|
||||
statuses: list[IndexingStatus] | None = None,
|
||||
) -> list[IndexAttempt]:
|
||||
async with AsyncSession(
|
||||
build_async_engine(), future=True, expire_on_commit=False
|
||||
) as asession:
|
||||
stmt = select(IndexAttempt)
|
||||
if sources:
|
||||
stmt = stmt.where(IndexAttempt.source.in_(sources))
|
||||
if statuses:
|
||||
stmt = stmt.where(IndexAttempt.status.in_(statuses))
|
||||
results = await asession.scalars(stmt)
|
||||
return list(results.all())
|
||||
|
||||
|
||||
async def update_index_attempt(
|
||||
*,
|
||||
index_attempt_id: int,
|
||||
new_status: IndexingStatus,
|
||||
document_ids: list[str] | None = None,
|
||||
error_msg: str | None = None,
|
||||
) -> bool:
|
||||
"""Returns `True` if successfully updated, `False` if cannot find matching ID"""
|
||||
async with AsyncSession(
|
||||
build_async_engine(), future=True, expire_on_commit=False
|
||||
) as asession:
|
||||
stmt = select(IndexAttempt).where(IndexAttempt.id == index_attempt_id)
|
||||
result = await asession.scalar(stmt)
|
||||
if result:
|
||||
result.status = new_status
|
||||
result.document_ids = document_ids
|
||||
result.error_msg = error_msg
|
||||
await asession.commit()
|
||||
return True
|
||||
return False
|
75
backend/danswer/db/models.py
Normal file
75
backend/danswer/db/models.py
Normal file
@ -0,0 +1,75 @@
|
||||
import datetime
|
||||
from enum import Enum as PyEnum
|
||||
from typing import Any
|
||||
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.models import InputType
|
||||
from fastapi.encoders import jsonable_encoder
|
||||
from sqlalchemy import DateTime
|
||||
from sqlalchemy import Enum
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy.dialects import postgresql
|
||||
from sqlalchemy.orm import DeclarativeBase
|
||||
from sqlalchemy.orm import Mapped
|
||||
from sqlalchemy.orm import mapped_column
|
||||
|
||||
|
||||
class Base(DeclarativeBase):
|
||||
pass
|
||||
|
||||
|
||||
class IndexingStatus(str, PyEnum):
|
||||
NOT_STARTED = "not_started"
|
||||
IN_PROGRESS = "in_progress"
|
||||
SUCCESS = "success"
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
class IndexAttempt(Base):
|
||||
"""
|
||||
Represents an attempt to index a group of 1 or more documents from a
|
||||
source. For example, a single pull from Google Drive, a single event from
|
||||
slack event API, or a single website crawl.
|
||||
"""
|
||||
|
||||
__tablename__ = "index_attempt"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
# would like this to be a single JSONB column with structure described by
|
||||
# `ConnectorDescriptor`, but this is not easily supported and requires
|
||||
# some difficult to understand magic
|
||||
source: Mapped[DocumentSource] = mapped_column(
|
||||
Enum(DocumentSource, native_enum=False)
|
||||
)
|
||||
input_type: Mapped[InputType] = mapped_column(Enum(InputType, native_enum=False))
|
||||
connector_specific_config: Mapped[dict[str, Any]] = mapped_column(
|
||||
postgresql.JSONB(), nullable=False
|
||||
)
|
||||
# TODO (chris): potentially add metadata for the chunker, embedder, and datastore
|
||||
time_created: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True), server_default=func.now()
|
||||
)
|
||||
time_updated: Mapped[datetime.datetime] = mapped_column(
|
||||
DateTime(timezone=True), onupdate=func.now()
|
||||
)
|
||||
status: Mapped[IndexingStatus] = mapped_column(Enum(IndexingStatus))
|
||||
document_ids: Mapped[list[str] | None] = mapped_column(
|
||||
postgresql.ARRAY(String()), default=None
|
||||
) # only filled if status = "complete"
|
||||
error_msg: Mapped[str | None] = mapped_column(
|
||||
String(), default=None
|
||||
) # only filled if status = "failed"
|
||||
|
||||
def __repr__(self):
|
||||
return (
|
||||
f"<IndexAttempt(id={self.id!r}, "
|
||||
f"source={self.source!r}, "
|
||||
f"input_type={self.input_type!r}, "
|
||||
f"connector_specific_config={self.connector_specific_config!r}, "
|
||||
f"time_created={self.time_created!r}, "
|
||||
f"time_updated={self.time_updated!r}, "
|
||||
f"status={self.status!r}, "
|
||||
f"document_ids={self.document_ids!r}, "
|
||||
f"error_msg={self.error_msg!r})>"
|
||||
)
|
@ -1,9 +1,19 @@
|
||||
from datetime import datetime
|
||||
|
||||
from danswer.configs.constants import DocumentSource
|
||||
from danswer.connectors.models import ConnectorDescriptor
|
||||
from danswer.connectors.models import InputType
|
||||
from danswer.connectors.slack.config import get_slack_config
|
||||
from danswer.connectors.slack.config import SlackConfig
|
||||
from danswer.connectors.slack.config import update_slack_config
|
||||
from danswer.db.index_attempt import fetch_index_attempts
|
||||
from danswer.db.index_attempt import insert_index_attempt
|
||||
from danswer.db.models import IndexAttempt
|
||||
from danswer.db.models import IndexingStatus
|
||||
from danswer.dynamic_configs.interface import ConfigNotFoundError
|
||||
from danswer.utils.logging import setup_logger
|
||||
from fastapi import APIRouter
|
||||
from pydantic import BaseModel
|
||||
|
||||
router = APIRouter(prefix="/admin")
|
||||
|
||||
@ -21,3 +31,43 @@ def fetch_slack_config():
|
||||
@router.post("/slack_connector_config")
|
||||
def modify_slack_config(slack_config: SlackConfig):
|
||||
update_slack_config(slack_config)
|
||||
|
||||
|
||||
class WebIndexAttemptRequest(BaseModel):
|
||||
url: str
|
||||
|
||||
|
||||
@router.post("/website_index", status_code=201)
|
||||
async def index_website(web_index_attempt_request: WebIndexAttemptRequest):
|
||||
index_request = IndexAttempt(
|
||||
source=DocumentSource.WEB,
|
||||
input_type=InputType.PULL,
|
||||
connector_specific_config={"url": web_index_attempt_request.url},
|
||||
status=IndexingStatus.NOT_STARTED,
|
||||
)
|
||||
await insert_index_attempt(index_request)
|
||||
|
||||
|
||||
class IndexAttemptSnapshot(BaseModel):
|
||||
url: str
|
||||
status: IndexingStatus
|
||||
time_created: datetime
|
||||
|
||||
|
||||
class ListWebsiteIndexAttemptsResponse(BaseModel):
|
||||
index_attempts: list[IndexAttemptSnapshot]
|
||||
|
||||
|
||||
@router.get("/website_index")
|
||||
async def list_website_index_attempts() -> ListWebsiteIndexAttemptsResponse:
|
||||
index_attempts = await fetch_index_attempts(sources=[DocumentSource.WEB])
|
||||
return ListWebsiteIndexAttemptsResponse(
|
||||
index_attempts=[
|
||||
IndexAttemptSnapshot(
|
||||
url=index_attempt.connector_specific_config["url"],
|
||||
status=index_attempt.status,
|
||||
time_created=index_attempt.time_created,
|
||||
)
|
||||
for index_attempt in index_attempts
|
||||
]
|
||||
)
|
||||
|
@ -4,6 +4,7 @@ from itertools import chain
|
||||
|
||||
from danswer.chunking.chunk import Chunker
|
||||
from danswer.chunking.chunk import DefaultChunker
|
||||
from danswer.chunking.models import EmbeddedIndexChunk
|
||||
from danswer.connectors.models import Document
|
||||
from danswer.datastores.interfaces import Datastore
|
||||
from danswer.datastores.qdrant.store import QdrantDatastore
|
||||
@ -16,10 +17,13 @@ def _indexing_pipeline(
|
||||
embedder: Embedder,
|
||||
datastore: Datastore,
|
||||
documents: list[Document],
|
||||
) -> None:
|
||||
) -> list[EmbeddedIndexChunk]:
|
||||
# TODO: make entire indexing pipeline async to not block the entire process
|
||||
# when running on async endpoints
|
||||
chunks = list(chain(*[chunker.chunk(document) for document in documents]))
|
||||
chunks_with_embeddings = embedder.embed(chunks)
|
||||
datastore.index(chunks_with_embeddings)
|
||||
return chunks_with_embeddings
|
||||
|
||||
|
||||
def build_indexing_pipeline(
|
||||
@ -27,7 +31,7 @@ def build_indexing_pipeline(
|
||||
chunker: Chunker | None = None,
|
||||
embedder: Embedder | None = None,
|
||||
datastore: Datastore | None = None,
|
||||
) -> Callable[[list[Document]], None]:
|
||||
) -> Callable[[list[Document]], list[EmbeddedIndexChunk]]:
|
||||
"""Builds a pipline which takes in a list of docs and indexes them.
|
||||
|
||||
Default uses _ chunker, _ embedder, and qdrant for the datastore"""
|
||||
|
@ -5,21 +5,39 @@ services:
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: Dockerfile
|
||||
depends_on:
|
||||
- db
|
||||
# just for local testing
|
||||
ports:
|
||||
- "8080:8080"
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
- POSTGRES_HOST=db
|
||||
volumes:
|
||||
- local_dynamic_storage:/home/storage
|
||||
background:
|
||||
build:
|
||||
context: ..
|
||||
dockerfile: Dockerfile.background
|
||||
depends_on:
|
||||
- db
|
||||
env_file:
|
||||
- .env
|
||||
environment:
|
||||
- POSTGRES_HOST=db
|
||||
volumes:
|
||||
- local_dynamic_storage:/home/storage
|
||||
db:
|
||||
image: postgres:15.2-alpine
|
||||
restart: always
|
||||
# POSTGRES_USER and POSTGRES_PASSWORD should be set in .env file
|
||||
env_file:
|
||||
- .env
|
||||
ports:
|
||||
- "5432:5432"
|
||||
volumes:
|
||||
- db:/var/lib/postgresql/data
|
||||
nginx:
|
||||
image: nginx:1.23.4-alpine
|
||||
ports:
|
||||
@ -39,4 +57,5 @@ services:
|
||||
- ./data/certbot/www:/var/www/certbot
|
||||
entrypoint: "/bin/sh -c 'trap exit TERM; while :; do certbot renew; sleep 12h & wait $${!}; done;'"
|
||||
volumes:
|
||||
local_dynamic_storage:
|
||||
local_dynamic_storage:
|
||||
db:
|
||||
|
@ -1,9 +1,12 @@
|
||||
alembic==1.10.4
|
||||
asyncpg==0.27.0
|
||||
beautifulsoup4==4.12.0
|
||||
fastapi==0.95.0
|
||||
filelock==3.12.0
|
||||
google-api-python-client==2.86.0
|
||||
google-auth-httplib2==0.1.0
|
||||
google-auth-oauthlib==1.0.0
|
||||
Mako==1.2.4
|
||||
openai==0.27.6
|
||||
playwright==1.32.1
|
||||
pydantic==1.10.7
|
||||
@ -13,6 +16,7 @@ qdrant-client==1.1.0
|
||||
requests==2.28.2
|
||||
sentence-transformers==2.2.2
|
||||
slack-sdk==3.20.2
|
||||
SQLAlchemy[mypy]==2.0.12
|
||||
transformers==4.27.3
|
||||
types-beautifulsoup4==4.12.0.3
|
||||
types-html5lib==1.1.11.13
|
||||
|
2
backend/setup.cfg
Normal file
2
backend/setup.cfg
Normal file
@ -0,0 +1,2 @@
|
||||
[mypy]
|
||||
plugins = sqlalchemy.ext.mypy.plugin
|
Reference in New Issue
Block a user