mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-22 22:11:03 +02:00
Cloud improvements (#3099)
* add improved cloud configuration * fix typing * finalize slackbot improvements * minor update * finalized keda * moderate slackbot switch * update some configs * revert * include reset engine!
This commit is contained in:
parent
d68f8d6fbc
commit
facf1d55a0
@ -12,6 +12,7 @@ from danswer.db.engine import get_all_tenant_ids
|
|||||||
from danswer.db.engine import SqlEngine
|
from danswer.db.engine import SqlEngine
|
||||||
from danswer.utils.logger import setup_logger
|
from danswer.utils.logger import setup_logger
|
||||||
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
||||||
|
from shared_configs.configs import IGNORED_SYNCING_TENANT_LIST
|
||||||
from shared_configs.configs import MULTI_TENANT
|
from shared_configs.configs import MULTI_TENANT
|
||||||
|
|
||||||
logger = setup_logger(__name__)
|
logger = setup_logger(__name__)
|
||||||
@ -72,6 +73,15 @@ class DynamicTenantScheduler(PersistentScheduler):
|
|||||||
logger.info(f"Found {len(existing_tenants)} existing tenants in schedule")
|
logger.info(f"Found {len(existing_tenants)} existing tenants in schedule")
|
||||||
|
|
||||||
for tenant_id in tenant_ids:
|
for tenant_id in tenant_ids:
|
||||||
|
if (
|
||||||
|
IGNORED_SYNCING_TENANT_LIST
|
||||||
|
and tenant_id in IGNORED_SYNCING_TENANT_LIST
|
||||||
|
):
|
||||||
|
logger.info(
|
||||||
|
f"Skipping tenant {tenant_id} as it is in the ignored syncing list"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
if tenant_id not in existing_tenants:
|
if tenant_id not in existing_tenants:
|
||||||
logger.info(f"Processing new tenant: {tenant_id}")
|
logger.info(f"Processing new tenant: {tenant_id}")
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ from celery import signals
|
|||||||
from celery import Task
|
from celery import Task
|
||||||
from celery.signals import celeryd_init
|
from celery.signals import celeryd_init
|
||||||
from celery.signals import worker_init
|
from celery.signals import worker_init
|
||||||
|
from celery.signals import worker_process_init
|
||||||
from celery.signals import worker_ready
|
from celery.signals import worker_ready
|
||||||
from celery.signals import worker_shutdown
|
from celery.signals import worker_shutdown
|
||||||
|
|
||||||
@ -81,6 +82,11 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
|
|||||||
app_base.on_worker_shutdown(sender, **kwargs)
|
app_base.on_worker_shutdown(sender, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@worker_process_init.connect
|
||||||
|
def init_worker(**kwargs: Any) -> None:
|
||||||
|
SqlEngine.reset_engine()
|
||||||
|
|
||||||
|
|
||||||
@signals.setup_logging.connect
|
@signals.setup_logging.connect
|
||||||
def on_setup_logging(
|
def on_setup_logging(
|
||||||
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any
|
loglevel: Any, logfile: Any, format: Any, colorize: Any, **kwargs: Any
|
||||||
|
@ -1,96 +0,0 @@
|
|||||||
from datetime import timedelta
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from celery.beat import PersistentScheduler # type: ignore
|
|
||||||
from celery.utils.log import get_task_logger
|
|
||||||
|
|
||||||
from danswer.db.engine import get_all_tenant_ids
|
|
||||||
from danswer.utils.variable_functionality import fetch_versioned_implementation
|
|
||||||
|
|
||||||
logger = get_task_logger(__name__)
|
|
||||||
|
|
||||||
|
|
||||||
class DynamicTenantScheduler(PersistentScheduler):
|
|
||||||
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
self._reload_interval = timedelta(minutes=1)
|
|
||||||
self._last_reload = self.app.now() - self._reload_interval
|
|
||||||
|
|
||||||
def setup_schedule(self) -> None:
|
|
||||||
super().setup_schedule()
|
|
||||||
|
|
||||||
def tick(self) -> float:
|
|
||||||
retval = super().tick()
|
|
||||||
now = self.app.now()
|
|
||||||
if (
|
|
||||||
self._last_reload is None
|
|
||||||
or (now - self._last_reload) > self._reload_interval
|
|
||||||
):
|
|
||||||
logger.info("Reloading schedule to check for new tenants...")
|
|
||||||
self._update_tenant_tasks()
|
|
||||||
self._last_reload = now
|
|
||||||
return retval
|
|
||||||
|
|
||||||
def _update_tenant_tasks(self) -> None:
|
|
||||||
logger.info("Checking for tenant task updates...")
|
|
||||||
try:
|
|
||||||
tenant_ids = get_all_tenant_ids()
|
|
||||||
tasks_to_schedule = fetch_versioned_implementation(
|
|
||||||
"danswer.background.celery.tasks.beat_schedule", "get_tasks_to_schedule"
|
|
||||||
)
|
|
||||||
|
|
||||||
new_beat_schedule: dict[str, dict[str, Any]] = {}
|
|
||||||
|
|
||||||
current_schedule = getattr(self, "_store", {"entries": {}}).get(
|
|
||||||
"entries", {}
|
|
||||||
)
|
|
||||||
|
|
||||||
existing_tenants = set()
|
|
||||||
for task_name in current_schedule.keys():
|
|
||||||
if "-" in task_name:
|
|
||||||
existing_tenants.add(task_name.split("-")[-1])
|
|
||||||
|
|
||||||
for tenant_id in tenant_ids:
|
|
||||||
if tenant_id not in existing_tenants:
|
|
||||||
logger.info(f"Found new tenant: {tenant_id}")
|
|
||||||
|
|
||||||
for task in tasks_to_schedule():
|
|
||||||
task_name = f"{task['name']}-{tenant_id}"
|
|
||||||
new_task = {
|
|
||||||
"task": task["task"],
|
|
||||||
"schedule": task["schedule"],
|
|
||||||
"kwargs": {"tenant_id": tenant_id},
|
|
||||||
}
|
|
||||||
if options := task.get("options"):
|
|
||||||
new_task["options"] = options
|
|
||||||
new_beat_schedule[task_name] = new_task
|
|
||||||
|
|
||||||
if self._should_update_schedule(current_schedule, new_beat_schedule):
|
|
||||||
logger.info(
|
|
||||||
"Updating schedule",
|
|
||||||
extra={
|
|
||||||
"new_tasks": len(new_beat_schedule),
|
|
||||||
"current_tasks": len(current_schedule),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if not hasattr(self, "_store"):
|
|
||||||
self._store: dict[str, dict] = {"entries": {}}
|
|
||||||
self.update_from_dict(new_beat_schedule)
|
|
||||||
logger.info(f"New schedule: {new_beat_schedule}")
|
|
||||||
|
|
||||||
logger.info("Tenant tasks updated successfully")
|
|
||||||
else:
|
|
||||||
logger.debug("No schedule updates needed")
|
|
||||||
|
|
||||||
except (AttributeError, KeyError):
|
|
||||||
logger.exception("Failed to process task configuration")
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Unexpected error updating tenant tasks")
|
|
||||||
|
|
||||||
def _should_update_schedule(
|
|
||||||
self, current_schedule: dict, new_schedule: dict
|
|
||||||
) -> bool:
|
|
||||||
"""Compare schedules to determine if an update is needed."""
|
|
||||||
current_tasks = set(current_schedule.keys())
|
|
||||||
new_tasks = set(new_schedule.keys())
|
|
||||||
return current_tasks != new_tasks
|
|
@ -8,7 +8,7 @@ tasks_to_schedule = [
|
|||||||
{
|
{
|
||||||
"name": "check-for-vespa-sync",
|
"name": "check-for-vespa-sync",
|
||||||
"task": "check_for_vespa_sync_task",
|
"task": "check_for_vespa_sync_task",
|
||||||
"schedule": timedelta(seconds=5),
|
"schedule": timedelta(seconds=20),
|
||||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -20,13 +20,13 @@ tasks_to_schedule = [
|
|||||||
{
|
{
|
||||||
"name": "check-for-indexing",
|
"name": "check-for-indexing",
|
||||||
"task": "check_for_indexing",
|
"task": "check_for_indexing",
|
||||||
"schedule": timedelta(seconds=10),
|
"schedule": timedelta(seconds=15),
|
||||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"name": "check-for-prune",
|
"name": "check-for-prune",
|
||||||
"task": "check_for_pruning",
|
"task": "check_for_pruning",
|
||||||
"schedule": timedelta(seconds=10),
|
"schedule": timedelta(seconds=15),
|
||||||
"options": {"priority": DanswerCeleryPriority.HIGH},
|
"options": {"priority": DanswerCeleryPriority.HIGH},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -29,18 +29,26 @@ JobStatusType = (
|
|||||||
def _initializer(
|
def _initializer(
|
||||||
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
|
func: Callable, args: list | tuple, kwargs: dict[str, Any] | None = None
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""Ensure the parent proc's database connections are not touched
|
"""Initialize the child process with a fresh SQLAlchemy Engine.
|
||||||
in the new connection pool
|
|
||||||
|
|
||||||
Based on the recommended approach in the SQLAlchemy docs found:
|
Based on SQLAlchemy's recommendations to handle multiprocessing:
|
||||||
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
|
https://docs.sqlalchemy.org/en/20/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
|
||||||
"""
|
"""
|
||||||
if kwargs is None:
|
if kwargs is None:
|
||||||
kwargs = {}
|
kwargs = {}
|
||||||
|
|
||||||
logger.info("Initializing spawned worker child process.")
|
logger.info("Initializing spawned worker child process.")
|
||||||
|
|
||||||
|
# Reset the engine in the child process
|
||||||
|
SqlEngine.reset_engine()
|
||||||
|
|
||||||
|
# Optionally set a custom app name for database logging purposes
|
||||||
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
|
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
|
||||||
|
|
||||||
|
# Initialize a new engine with desired parameters
|
||||||
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
|
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
|
||||||
|
|
||||||
|
# Proceed with executing the target function
|
||||||
return func(*args, **kwargs)
|
return func(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@ -55,11 +55,11 @@ def validate_channel_names(
|
|||||||
# Scaling configurations for multi-tenant Slack bot handling
|
# Scaling configurations for multi-tenant Slack bot handling
|
||||||
TENANT_LOCK_EXPIRATION = 1800 # How long a pod can hold exclusive access to a tenant before other pods can acquire it
|
TENANT_LOCK_EXPIRATION = 1800 # How long a pod can hold exclusive access to a tenant before other pods can acquire it
|
||||||
TENANT_HEARTBEAT_INTERVAL = (
|
TENANT_HEARTBEAT_INTERVAL = (
|
||||||
60 # How often pods send heartbeats to indicate they are still processing a tenant
|
15 # How often pods send heartbeats to indicate they are still processing a tenant
|
||||||
)
|
)
|
||||||
TENANT_HEARTBEAT_EXPIRATION = 180 # How long before a tenant's heartbeat expires, allowing other pods to take over
|
TENANT_HEARTBEAT_EXPIRATION = (
|
||||||
TENANT_ACQUISITION_INTERVAL = (
|
30 # How long before a tenant's heartbeat expires, allowing other pods to take over
|
||||||
60 # How often pods attempt to acquire unprocessed tenants
|
|
||||||
)
|
)
|
||||||
|
TENANT_ACQUISITION_INTERVAL = 60 # How often pods attempt to acquire unprocessed tenants and checks for new tokens
|
||||||
|
|
||||||
MAX_TENANTS_PER_POD = int(os.getenv("MAX_TENANTS_PER_POD", 50))
|
MAX_TENANTS_PER_POD = int(os.getenv("MAX_TENANTS_PER_POD", 50))
|
||||||
|
@ -75,6 +75,7 @@ from danswer.search.retrieval.search_runner import download_nltk_data
|
|||||||
from danswer.server.manage.models import SlackBotTokens
|
from danswer.server.manage.models import SlackBotTokens
|
||||||
from danswer.utils.logger import setup_logger
|
from danswer.utils.logger import setup_logger
|
||||||
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
|
from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
|
||||||
|
from shared_configs.configs import DISALLOWED_SLACK_BOT_TENANT_LIST
|
||||||
from shared_configs.configs import MODEL_SERVER_HOST
|
from shared_configs.configs import MODEL_SERVER_HOST
|
||||||
from shared_configs.configs import MODEL_SERVER_PORT
|
from shared_configs.configs import MODEL_SERVER_PORT
|
||||||
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
|
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
|
||||||
@ -164,9 +165,15 @@ class SlackbotHandler:
|
|||||||
|
|
||||||
def acquire_tenants(self) -> None:
|
def acquire_tenants(self) -> None:
|
||||||
tenant_ids = get_all_tenant_ids()
|
tenant_ids = get_all_tenant_ids()
|
||||||
logger.debug(f"Found {len(tenant_ids)} total tenants in Postgres")
|
|
||||||
|
|
||||||
for tenant_id in tenant_ids:
|
for tenant_id in tenant_ids:
|
||||||
|
if (
|
||||||
|
DISALLOWED_SLACK_BOT_TENANT_LIST is not None
|
||||||
|
and tenant_id in DISALLOWED_SLACK_BOT_TENANT_LIST
|
||||||
|
):
|
||||||
|
logger.debug(f"Tenant {tenant_id} is in the disallowed list, skipping")
|
||||||
|
continue
|
||||||
|
|
||||||
if tenant_id in self.tenant_ids:
|
if tenant_id in self.tenant_ids:
|
||||||
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
|
logger.debug(f"Tenant {tenant_id} already in self.tenant_ids")
|
||||||
continue
|
continue
|
||||||
@ -190,6 +197,9 @@ class SlackbotHandler:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
logger.debug(f"Acquired lock for tenant {tenant_id}")
|
logger.debug(f"Acquired lock for tenant {tenant_id}")
|
||||||
|
self.tenant_ids.add(tenant_id)
|
||||||
|
|
||||||
|
for tenant_id in self.tenant_ids:
|
||||||
token = CURRENT_TENANT_ID_CONTEXTVAR.set(
|
token = CURRENT_TENANT_ID_CONTEXTVAR.set(
|
||||||
tenant_id or POSTGRES_DEFAULT_SCHEMA
|
tenant_id or POSTGRES_DEFAULT_SCHEMA
|
||||||
)
|
)
|
||||||
@ -236,14 +246,14 @@ class SlackbotHandler:
|
|||||||
|
|
||||||
self.slack_bot_tokens[tenant_id] = slack_bot_tokens
|
self.slack_bot_tokens[tenant_id] = slack_bot_tokens
|
||||||
|
|
||||||
if tenant_id in self.socket_clients:
|
if self.socket_clients.get(tenant_id):
|
||||||
asyncio.run(self.socket_clients[tenant_id].close())
|
asyncio.run(self.socket_clients[tenant_id].close())
|
||||||
|
|
||||||
self.start_socket_client(tenant_id, slack_bot_tokens)
|
self.start_socket_client(tenant_id, slack_bot_tokens)
|
||||||
|
|
||||||
except KvKeyNotFoundError:
|
except KvKeyNotFoundError:
|
||||||
logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}")
|
logger.debug(f"Missing Slack Bot tokens for tenant {tenant_id}")
|
||||||
if tenant_id in self.socket_clients:
|
if self.socket_clients.get(tenant_id):
|
||||||
asyncio.run(self.socket_clients[tenant_id].close())
|
asyncio.run(self.socket_clients[tenant_id].close())
|
||||||
del self.socket_clients[tenant_id]
|
del self.socket_clients[tenant_id]
|
||||||
del self.slack_bot_tokens[tenant_id]
|
del self.slack_bot_tokens[tenant_id]
|
||||||
@ -277,14 +287,14 @@ class SlackbotHandler:
|
|||||||
logger.info(f"Connecting socket client for tenant {tenant_id}")
|
logger.info(f"Connecting socket client for tenant {tenant_id}")
|
||||||
socket_client.connect()
|
socket_client.connect()
|
||||||
self.socket_clients[tenant_id] = socket_client
|
self.socket_clients[tenant_id] = socket_client
|
||||||
self.tenant_ids.add(tenant_id)
|
|
||||||
logger.info(f"Started SocketModeClient for tenant {tenant_id}")
|
logger.info(f"Started SocketModeClient for tenant {tenant_id}")
|
||||||
|
|
||||||
def stop_socket_clients(self) -> None:
|
def stop_socket_clients(self) -> None:
|
||||||
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
|
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
|
||||||
for tenant_id, client in self.socket_clients.items():
|
for tenant_id, client in self.socket_clients.items():
|
||||||
asyncio.run(client.close())
|
if client:
|
||||||
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
|
asyncio.run(client.close())
|
||||||
|
logger.info(f"Stopped SocketModeClient for tenant {tenant_id}")
|
||||||
|
|
||||||
def shutdown(self, signum: int | None, frame: FrameType | None) -> None:
|
def shutdown(self, signum: int | None, frame: FrameType | None) -> None:
|
||||||
if not self.running:
|
if not self.running:
|
||||||
@ -298,6 +308,16 @@ class SlackbotHandler:
|
|||||||
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
|
logger.info(f"Stopping {len(self.socket_clients)} socket clients")
|
||||||
self.stop_socket_clients()
|
self.stop_socket_clients()
|
||||||
|
|
||||||
|
# Release locks for all tenants
|
||||||
|
logger.info(f"Releasing locks for {len(self.tenant_ids)} tenants")
|
||||||
|
for tenant_id in self.tenant_ids:
|
||||||
|
try:
|
||||||
|
redis_client = get_redis_client(tenant_id=tenant_id)
|
||||||
|
redis_client.delete(DanswerRedisLocks.SLACK_BOT_LOCK)
|
||||||
|
logger.info(f"Released lock for tenant {tenant_id}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error releasing lock for tenant {tenant_id}: {e}")
|
||||||
|
|
||||||
# Wait for background threads to finish (with timeout)
|
# Wait for background threads to finish (with timeout)
|
||||||
logger.info("Waiting for background threads to finish...")
|
logger.info("Waiting for background threads to finish...")
|
||||||
self.acquire_thread.join(timeout=5)
|
self.acquire_thread.join(timeout=5)
|
||||||
|
@ -189,6 +189,13 @@ class SqlEngine:
|
|||||||
return ""
|
return ""
|
||||||
return cls._app_name
|
return cls._app_name
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def reset_engine(cls) -> None:
|
||||||
|
with cls._lock:
|
||||||
|
if cls._engine:
|
||||||
|
cls._engine.dispose()
|
||||||
|
cls._engine = None
|
||||||
|
|
||||||
|
|
||||||
def get_all_tenant_ids() -> list[str] | list[None]:
|
def get_all_tenant_ids() -> list[str] | list[None]:
|
||||||
if not MULTI_TENANT:
|
if not MULTI_TENANT:
|
||||||
|
@ -63,6 +63,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper):
|
|||||||
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
|
stmt = construct_document_select_for_connector_credential_pair_by_needs_sync(
|
||||||
cc_pair.connector_id, cc_pair.credential_id
|
cc_pair.connector_id, cc_pair.credential_id
|
||||||
)
|
)
|
||||||
|
|
||||||
for doc in db_session.scalars(stmt).yield_per(1):
|
for doc in db_session.scalars(stmt).yield_per(1):
|
||||||
current_time = time.monotonic()
|
current_time = time.monotonic()
|
||||||
if current_time - last_lock_time >= (
|
if current_time - last_lock_time >= (
|
||||||
|
@ -142,6 +142,20 @@ async def async_return_default_schema(*args: Any, **kwargs: Any) -> str:
|
|||||||
# Prefix used for all tenant ids
|
# Prefix used for all tenant ids
|
||||||
TENANT_ID_PREFIX = "tenant_"
|
TENANT_ID_PREFIX = "tenant_"
|
||||||
|
|
||||||
|
ALLOWED_SLACK_BOT_TENANT_IDS = os.environ.get("ALLOWED_SLACK_BOT_TENANT_IDS")
|
||||||
|
DISALLOWED_SLACK_BOT_TENANT_LIST = (
|
||||||
|
[tenant.strip() for tenant in ALLOWED_SLACK_BOT_TENANT_IDS.split(",")]
|
||||||
|
if ALLOWED_SLACK_BOT_TENANT_IDS
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
|
IGNORED_SYNCING_TENANT_IDS = os.environ.get("IGNORED_SYNCING_TENANT_ID")
|
||||||
|
IGNORED_SYNCING_TENANT_LIST = (
|
||||||
|
[tenant.strip() for tenant in IGNORED_SYNCING_TENANT_IDS.split(",")]
|
||||||
|
if IGNORED_SYNCING_TENANT_IDS
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
SUPPORTED_EMBEDDING_MODELS = [
|
SUPPORTED_EMBEDDING_MODELS = [
|
||||||
# Cloud-based models
|
# Cloud-based models
|
||||||
SupportedEmbeddingModel(
|
SupportedEmbeddingModel(
|
||||||
|
@ -9,12 +9,11 @@ spec:
|
|||||||
scaleTargetRef:
|
scaleTargetRef:
|
||||||
name: celery-worker-indexing
|
name: celery-worker-indexing
|
||||||
minReplicaCount: 1
|
minReplicaCount: 1
|
||||||
maxReplicaCount: 10
|
maxReplicaCount: 30
|
||||||
triggers:
|
triggers:
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
sslEnabled: "true"
|
sslEnabled: "true"
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: connector_indexing
|
listName: connector_indexing
|
||||||
@ -22,10 +21,10 @@ spec:
|
|||||||
databaseIndex: "15"
|
databaseIndex: "15"
|
||||||
authenticationRef:
|
authenticationRef:
|
||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
|
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
sslEnabled: "true"
|
sslEnabled: "true"
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: connector_indexing:2
|
listName: connector_indexing:2
|
||||||
@ -36,7 +35,6 @@ spec:
|
|||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
sslEnabled: "true"
|
sslEnabled: "true"
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: connector_indexing:3
|
listName: connector_indexing:3
|
||||||
@ -44,3 +42,12 @@ spec:
|
|||||||
databaseIndex: "15"
|
databaseIndex: "15"
|
||||||
authenticationRef:
|
authenticationRef:
|
||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
|
- type: cpu
|
||||||
|
metadata:
|
||||||
|
type: Utilization
|
||||||
|
value: "70"
|
||||||
|
|
||||||
|
- type: memory
|
||||||
|
metadata:
|
||||||
|
type: Utilization
|
||||||
|
value: "70"
|
||||||
|
@ -8,12 +8,11 @@ metadata:
|
|||||||
spec:
|
spec:
|
||||||
scaleTargetRef:
|
scaleTargetRef:
|
||||||
name: celery-worker-light
|
name: celery-worker-light
|
||||||
minReplicaCount: 1
|
minReplicaCount: 5
|
||||||
maxReplicaCount: 20
|
maxReplicaCount: 20
|
||||||
triggers:
|
triggers:
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: vespa_metadata_sync
|
listName: vespa_metadata_sync
|
||||||
@ -23,7 +22,6 @@ spec:
|
|||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: vespa_metadata_sync:2
|
listName: vespa_metadata_sync:2
|
||||||
@ -33,7 +31,6 @@ spec:
|
|||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: vespa_metadata_sync:3
|
listName: vespa_metadata_sync:3
|
||||||
@ -43,7 +40,6 @@ spec:
|
|||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: connector_deletion
|
listName: connector_deletion
|
||||||
@ -53,7 +49,6 @@ spec:
|
|||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: connector_deletion:2
|
listName: connector_deletion:2
|
||||||
|
@ -15,7 +15,6 @@ spec:
|
|||||||
triggers:
|
triggers:
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: celery
|
listName: celery
|
||||||
@ -26,7 +25,6 @@ spec:
|
|||||||
|
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: celery:1
|
listName: celery:1
|
||||||
@ -36,7 +34,6 @@ spec:
|
|||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: celery:2
|
listName: celery:2
|
||||||
@ -46,7 +43,6 @@ spec:
|
|||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: celery:3
|
listName: celery:3
|
||||||
@ -56,7 +52,6 @@ spec:
|
|||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: periodic_tasks
|
listName: periodic_tasks
|
||||||
@ -66,7 +61,6 @@ spec:
|
|||||||
name: celery-worker-auth
|
name: celery-worker-auth
|
||||||
- type: redis
|
- type: redis
|
||||||
metadata:
|
metadata:
|
||||||
host: "{host}"
|
|
||||||
port: "6379"
|
port: "6379"
|
||||||
enableTLS: "true"
|
enableTLS: "true"
|
||||||
listName: periodic_tasks:2
|
listName: periodic_tasks:2
|
||||||
|
@ -0,0 +1,19 @@
|
|||||||
|
apiVersion: keda.sh/v1alpha1
|
||||||
|
kind: ScaledObject
|
||||||
|
metadata:
|
||||||
|
name: indexing-model-server-scaledobject
|
||||||
|
namespace: danswer
|
||||||
|
labels:
|
||||||
|
app: indexing-model-server
|
||||||
|
spec:
|
||||||
|
scaleTargetRef:
|
||||||
|
name: indexing-model-server-deployment
|
||||||
|
pollingInterval: 15 # Check every 15 seconds
|
||||||
|
cooldownPeriod: 30 # Wait 30 seconds before scaling down
|
||||||
|
minReplicaCount: 1
|
||||||
|
maxReplicaCount: 14
|
||||||
|
triggers:
|
||||||
|
- type: cpu
|
||||||
|
metadata:
|
||||||
|
type: Utilization
|
||||||
|
value: "70"
|
@ -5,5 +5,5 @@ metadata:
|
|||||||
namespace: danswer
|
namespace: danswer
|
||||||
type: Opaque
|
type: Opaque
|
||||||
data:
|
data:
|
||||||
host: { { base64-encoded-hostname } }
|
host: { base64 encoded host here }
|
||||||
password: { { base64-encoded-password } }
|
password: { base64 encoded password here }
|
||||||
|
@ -14,8 +14,8 @@ spec:
|
|||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: celery-beat
|
- name: celery-beat
|
||||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
|
||||||
imagePullPolicy: Always
|
imagePullPolicy: IfNotPresent
|
||||||
command:
|
command:
|
||||||
[
|
[
|
||||||
"celery",
|
"celery",
|
||||||
|
@ -14,8 +14,8 @@ spec:
|
|||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: celery-worker-heavy
|
- name: celery-worker-heavy
|
||||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
|
||||||
imagePullPolicy: Always
|
imagePullPolicy: IfNotPresent
|
||||||
command:
|
command:
|
||||||
[
|
[
|
||||||
"celery",
|
"celery",
|
||||||
|
@ -14,8 +14,8 @@ spec:
|
|||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: celery-worker-indexing
|
- name: celery-worker-indexing
|
||||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
|
||||||
imagePullPolicy: Always
|
imagePullPolicy: IfNotPresent
|
||||||
command:
|
command:
|
||||||
[
|
[
|
||||||
"celery",
|
"celery",
|
||||||
@ -47,10 +47,10 @@ spec:
|
|||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
cpu: "500m"
|
cpu: "500m"
|
||||||
memory: "1Gi"
|
memory: "4Gi"
|
||||||
limits:
|
limits:
|
||||||
cpu: "1000m"
|
cpu: "1000m"
|
||||||
memory: "2Gi"
|
memory: "8Gi"
|
||||||
volumes:
|
volumes:
|
||||||
- name: vespa-certificates
|
- name: vespa-certificates
|
||||||
secret:
|
secret:
|
||||||
|
@ -14,8 +14,8 @@ spec:
|
|||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: celery-worker-light
|
- name: celery-worker-light
|
||||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
|
||||||
imagePullPolicy: Always
|
imagePullPolicy: IfNotPresent
|
||||||
command:
|
command:
|
||||||
[
|
[
|
||||||
"celery",
|
"celery",
|
||||||
|
@ -14,8 +14,8 @@ spec:
|
|||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: celery-worker-primary
|
- name: celery-worker-primary
|
||||||
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.2
|
image: danswer/danswer-backend-cloud:v0.12.0-cloud.beta.10
|
||||||
imagePullPolicy: Always
|
imagePullPolicy: IfNotPresent
|
||||||
command:
|
command:
|
||||||
[
|
[
|
||||||
"celery",
|
"celery",
|
||||||
|
Loading…
x
Reference in New Issue
Block a user