mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-07-03 19:20:53 +02:00
harden connections to redis (#2677)
* set broker_connection_retry_on_startup to silence deprecation warning (we're OK with retrying on startup) * env var for CELERY_BROKER_POOL_LIMIT * add redis retry on timeout and health check interval * set socket_keepalive = True * remove shadow declaration of REDIS_HEALTH_CHECK_INTERVAL, add socket_keepalive_options where possible * fix mypy complaint * pass through vars in docker compose * remove extra '=' * wrap in a try
This commit is contained in:
@ -1,7 +1,9 @@
|
|||||||
# docs: https://docs.celeryq.dev/en/stable/userguide/configuration.html
|
# docs: https://docs.celeryq.dev/en/stable/userguide/configuration.html
|
||||||
|
from danswer.configs.app_configs import CELERY_BROKER_POOL_LIMIT
|
||||||
from danswer.configs.app_configs import CELERY_RESULT_EXPIRES
|
from danswer.configs.app_configs import CELERY_RESULT_EXPIRES
|
||||||
from danswer.configs.app_configs import REDIS_DB_NUMBER_CELERY
|
from danswer.configs.app_configs import REDIS_DB_NUMBER_CELERY
|
||||||
from danswer.configs.app_configs import REDIS_DB_NUMBER_CELERY_RESULT_BACKEND
|
from danswer.configs.app_configs import REDIS_DB_NUMBER_CELERY_RESULT_BACKEND
|
||||||
|
from danswer.configs.app_configs import REDIS_HEALTH_CHECK_INTERVAL
|
||||||
from danswer.configs.app_configs import REDIS_HOST
|
from danswer.configs.app_configs import REDIS_HOST
|
||||||
from danswer.configs.app_configs import REDIS_PASSWORD
|
from danswer.configs.app_configs import REDIS_PASSWORD
|
||||||
from danswer.configs.app_configs import REDIS_PORT
|
from danswer.configs.app_configs import REDIS_PORT
|
||||||
@ -9,6 +11,7 @@ from danswer.configs.app_configs import REDIS_SSL
|
|||||||
from danswer.configs.app_configs import REDIS_SSL_CA_CERTS
|
from danswer.configs.app_configs import REDIS_SSL_CA_CERTS
|
||||||
from danswer.configs.app_configs import REDIS_SSL_CERT_REQS
|
from danswer.configs.app_configs import REDIS_SSL_CERT_REQS
|
||||||
from danswer.configs.constants import DanswerCeleryPriority
|
from danswer.configs.constants import DanswerCeleryPriority
|
||||||
|
from danswer.configs.constants import REDIS_SOCKET_KEEPALIVE_OPTIONS
|
||||||
|
|
||||||
CELERY_SEPARATOR = ":"
|
CELERY_SEPARATOR = ":"
|
||||||
|
|
||||||
@ -36,12 +39,30 @@ result_backend = f"{REDIS_SCHEME}://{CELERY_PASSWORD_PART}{REDIS_HOST}:{REDIS_PO
|
|||||||
# can stall other tasks.
|
# can stall other tasks.
|
||||||
worker_prefetch_multiplier = 4
|
worker_prefetch_multiplier = 4
|
||||||
|
|
||||||
|
broker_connection_retry_on_startup = True
|
||||||
|
broker_pool_limit = CELERY_BROKER_POOL_LIMIT
|
||||||
|
|
||||||
|
# redis broker settings
|
||||||
|
# https://docs.celeryq.dev/projects/kombu/en/stable/reference/kombu.transport.redis.html
|
||||||
broker_transport_options = {
|
broker_transport_options = {
|
||||||
"priority_steps": list(range(len(DanswerCeleryPriority))),
|
"priority_steps": list(range(len(DanswerCeleryPriority))),
|
||||||
"sep": CELERY_SEPARATOR,
|
"sep": CELERY_SEPARATOR,
|
||||||
"queue_order_strategy": "priority",
|
"queue_order_strategy": "priority",
|
||||||
|
"retry_on_timeout": True,
|
||||||
|
"health_check_interval": REDIS_HEALTH_CHECK_INTERVAL,
|
||||||
|
"socket_keepalive": True,
|
||||||
|
"socket_keepalive_options": REDIS_SOCKET_KEEPALIVE_OPTIONS,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# redis backend settings
|
||||||
|
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#redis-backend-settings
|
||||||
|
|
||||||
|
# there doesn't appear to be a way to set socket_keepalive_options on the redis result backend
|
||||||
|
redis_socket_keepalive = True
|
||||||
|
redis_retry_on_timeout = True
|
||||||
|
redis_backend_health_check_interval = REDIS_HEALTH_CHECK_INTERVAL
|
||||||
|
|
||||||
|
|
||||||
task_default_priority = DanswerCeleryPriority.MEDIUM
|
task_default_priority = DanswerCeleryPriority.MEDIUM
|
||||||
task_acks_late = True
|
task_acks_late = True
|
||||||
|
|
||||||
|
@ -164,6 +164,12 @@ REDIS_DB_NUMBER_CELERY_RESULT_BACKEND = int(
|
|||||||
)
|
)
|
||||||
REDIS_DB_NUMBER_CELERY = int(os.environ.get("REDIS_DB_NUMBER_CELERY", 15)) # broker
|
REDIS_DB_NUMBER_CELERY = int(os.environ.get("REDIS_DB_NUMBER_CELERY", 15)) # broker
|
||||||
|
|
||||||
|
# will propagate to both our redis client as well as celery's redis client
|
||||||
|
REDIS_HEALTH_CHECK_INTERVAL = int(os.environ.get("REDIS_HEALTH_CHECK_INTERVAL", 60))
|
||||||
|
|
||||||
|
# our redis client only, not celery's
|
||||||
|
REDIS_POOL_MAX_CONNECTIONS = int(os.environ.get("REDIS_POOL_MAX_CONNECTIONS", 128))
|
||||||
|
|
||||||
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#redis-backend-settings
|
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#redis-backend-settings
|
||||||
# should be one of "required", "optional", or "none"
|
# should be one of "required", "optional", or "none"
|
||||||
REDIS_SSL_CERT_REQS = os.getenv("REDIS_SSL_CERT_REQS", "none")
|
REDIS_SSL_CERT_REQS = os.getenv("REDIS_SSL_CERT_REQS", "none")
|
||||||
@ -171,6 +177,16 @@ REDIS_SSL_CA_CERTS = os.getenv("REDIS_SSL_CA_CERTS", None)
|
|||||||
|
|
||||||
CELERY_RESULT_EXPIRES = int(os.environ.get("CELERY_RESULT_EXPIRES", 86400)) # seconds
|
CELERY_RESULT_EXPIRES = int(os.environ.get("CELERY_RESULT_EXPIRES", 86400)) # seconds
|
||||||
|
|
||||||
|
# https://docs.celeryq.dev/en/stable/userguide/configuration.html#broker-pool-limit
|
||||||
|
# Setting to None may help when there is a proxy in the way closing idle connections
|
||||||
|
CELERY_BROKER_POOL_LIMIT_DEFAULT = 10
|
||||||
|
try:
|
||||||
|
CELERY_BROKER_POOL_LIMIT = int(
|
||||||
|
os.environ.get("CELERY_BROKER_POOL_LIMIT", CELERY_BROKER_POOL_LIMIT_DEFAULT)
|
||||||
|
)
|
||||||
|
except ValueError:
|
||||||
|
CELERY_BROKER_POOL_LIMIT = CELERY_BROKER_POOL_LIMIT_DEFAULT
|
||||||
|
|
||||||
#####
|
#####
|
||||||
# Connector Configs
|
# Connector Configs
|
||||||
#####
|
#####
|
||||||
|
@ -1,3 +1,5 @@
|
|||||||
|
import platform
|
||||||
|
import socket
|
||||||
from enum import auto
|
from enum import auto
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
|
|
||||||
@ -204,3 +206,13 @@ class DanswerCeleryPriority(int, Enum):
|
|||||||
MEDIUM = auto()
|
MEDIUM = auto()
|
||||||
LOW = auto()
|
LOW = auto()
|
||||||
LOWEST = auto()
|
LOWEST = auto()
|
||||||
|
|
||||||
|
|
||||||
|
REDIS_SOCKET_KEEPALIVE_OPTIONS = {}
|
||||||
|
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPINTVL] = 15
|
||||||
|
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPCNT] = 3
|
||||||
|
|
||||||
|
if platform.system() == "Darwin":
|
||||||
|
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPALIVE] = 60 # type: ignore
|
||||||
|
else:
|
||||||
|
REDIS_SOCKET_KEEPALIVE_OPTIONS[socket.TCP_KEEPIDLE] = 60 # type: ignore
|
||||||
|
@ -5,14 +5,15 @@ import redis
|
|||||||
from redis.client import Redis
|
from redis.client import Redis
|
||||||
|
|
||||||
from danswer.configs.app_configs import REDIS_DB_NUMBER
|
from danswer.configs.app_configs import REDIS_DB_NUMBER
|
||||||
|
from danswer.configs.app_configs import REDIS_HEALTH_CHECK_INTERVAL
|
||||||
from danswer.configs.app_configs import REDIS_HOST
|
from danswer.configs.app_configs import REDIS_HOST
|
||||||
from danswer.configs.app_configs import REDIS_PASSWORD
|
from danswer.configs.app_configs import REDIS_PASSWORD
|
||||||
|
from danswer.configs.app_configs import REDIS_POOL_MAX_CONNECTIONS
|
||||||
from danswer.configs.app_configs import REDIS_PORT
|
from danswer.configs.app_configs import REDIS_PORT
|
||||||
from danswer.configs.app_configs import REDIS_SSL
|
from danswer.configs.app_configs import REDIS_SSL
|
||||||
from danswer.configs.app_configs import REDIS_SSL_CA_CERTS
|
from danswer.configs.app_configs import REDIS_SSL_CA_CERTS
|
||||||
from danswer.configs.app_configs import REDIS_SSL_CERT_REQS
|
from danswer.configs.app_configs import REDIS_SSL_CERT_REQS
|
||||||
|
from danswer.configs.constants import REDIS_SOCKET_KEEPALIVE_OPTIONS
|
||||||
REDIS_POOL_MAX_CONNECTIONS = 128
|
|
||||||
|
|
||||||
|
|
||||||
class RedisPool:
|
class RedisPool:
|
||||||
@ -59,6 +60,9 @@ class RedisPool:
|
|||||||
password=password,
|
password=password,
|
||||||
max_connections=max_connections,
|
max_connections=max_connections,
|
||||||
timeout=None,
|
timeout=None,
|
||||||
|
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL,
|
||||||
|
socket_keepalive=True,
|
||||||
|
socket_keepalive_options=REDIS_SOCKET_KEEPALIVE_OPTIONS,
|
||||||
connection_class=redis.SSLConnection,
|
connection_class=redis.SSLConnection,
|
||||||
ssl_ca_certs=ssl_ca_certs,
|
ssl_ca_certs=ssl_ca_certs,
|
||||||
ssl_cert_reqs=ssl_cert_reqs,
|
ssl_cert_reqs=ssl_cert_reqs,
|
||||||
@ -71,6 +75,9 @@ class RedisPool:
|
|||||||
password=password,
|
password=password,
|
||||||
max_connections=max_connections,
|
max_connections=max_connections,
|
||||||
timeout=None,
|
timeout=None,
|
||||||
|
health_check_interval=REDIS_HEALTH_CHECK_INTERVAL,
|
||||||
|
socket_keepalive=True,
|
||||||
|
socket_keepalive_options=REDIS_SOCKET_KEEPALIVE_OPTIONS,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -87,6 +87,7 @@ services:
|
|||||||
- LOG_ENDPOINT_LATENCY=${LOG_ENDPOINT_LATENCY:-}
|
- LOG_ENDPOINT_LATENCY=${LOG_ENDPOINT_LATENCY:-}
|
||||||
- LOG_POSTGRES_LATENCY=${LOG_POSTGRES_LATENCY:-}
|
- LOG_POSTGRES_LATENCY=${LOG_POSTGRES_LATENCY:-}
|
||||||
- LOG_POSTGRES_CONN_COUNTS=${LOG_POSTGRES_CONN_COUNTS:-}
|
- LOG_POSTGRES_CONN_COUNTS=${LOG_POSTGRES_CONN_COUNTS:-}
|
||||||
|
- CELERY_BROKER_POOL_LIMIT=${CELERY_BROKER_POOL_LIMIT:-}
|
||||||
|
|
||||||
# Chat Configs
|
# Chat Configs
|
||||||
- HARD_DELETE_CHATS=${HARD_DELETE_CHATS:-}
|
- HARD_DELETE_CHATS=${HARD_DELETE_CHATS:-}
|
||||||
|
@ -80,7 +80,8 @@ services:
|
|||||||
# If set to `true` will enable additional logs about Vespa query performance
|
# If set to `true` will enable additional logs about Vespa query performance
|
||||||
# (time spent on finding the right docs + time spent fetching summaries from disk)
|
# (time spent on finding the right docs + time spent fetching summaries from disk)
|
||||||
- LOG_VESPA_TIMING_INFORMATION=${LOG_VESPA_TIMING_INFORMATION:-}
|
- LOG_VESPA_TIMING_INFORMATION=${LOG_VESPA_TIMING_INFORMATION:-}
|
||||||
|
- CELERY_BROKER_POOL_LIMIT=${CELERY_BROKER_POOL_LIMIT:-}
|
||||||
|
|
||||||
# Chat Configs
|
# Chat Configs
|
||||||
- HARD_DELETE_CHATS=${HARD_DELETE_CHATS:-}
|
- HARD_DELETE_CHATS=${HARD_DELETE_CHATS:-}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user