mirror of
https://github.com/danswer-ai/danswer.git
synced 2025-06-12 17:10:54 +02:00
Add process-level memory monitoring (#4294)
* Add process-level memory monitoring * Switch to every 5 minutes
This commit is contained in:
parent
ba514aaaa2
commit
293d1a4476
@ -194,6 +194,16 @@ if not MULTI_TENANT:
|
|||||||
"queue": OnyxCeleryQueues.MONITORING,
|
"queue": OnyxCeleryQueues.MONITORING,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"name": "monitor-process-memory",
|
||||||
|
"task": OnyxCeleryTask.MONITOR_PROCESS_MEMORY,
|
||||||
|
"schedule": timedelta(minutes=5),
|
||||||
|
"options": {
|
||||||
|
"priority": OnyxCeleryPriority.LOW,
|
||||||
|
"expires": BEAT_EXPIRES_DEFAULT,
|
||||||
|
"queue": OnyxCeleryQueues.MONITORING,
|
||||||
|
},
|
||||||
|
},
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ from itertools import islice
|
|||||||
from typing import Any
|
from typing import Any
|
||||||
from typing import Literal
|
from typing import Literal
|
||||||
|
|
||||||
|
import psutil
|
||||||
from celery import shared_task
|
from celery import shared_task
|
||||||
from celery import Task
|
from celery import Task
|
||||||
from celery.exceptions import SoftTimeLimitExceeded
|
from celery.exceptions import SoftTimeLimitExceeded
|
||||||
@ -19,6 +20,7 @@ from sqlalchemy.orm import Session
|
|||||||
from onyx.background.celery.apps.app_base import task_logger
|
from onyx.background.celery.apps.app_base import task_logger
|
||||||
from onyx.background.celery.celery_redis import celery_get_queue_length
|
from onyx.background.celery.celery_redis import celery_get_queue_length
|
||||||
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
|
from onyx.background.celery.celery_redis import celery_get_unacked_task_ids
|
||||||
|
from onyx.background.celery.memory_monitoring import emit_process_memory
|
||||||
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
from onyx.configs.constants import CELERY_GENERIC_BEAT_LOCK_TIMEOUT
|
||||||
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
|
from onyx.configs.constants import ONYX_CLOUD_TENANT_ID
|
||||||
from onyx.configs.constants import OnyxCeleryQueues
|
from onyx.configs.constants import OnyxCeleryQueues
|
||||||
@ -39,8 +41,10 @@ from onyx.db.models import UserGroup
|
|||||||
from onyx.db.search_settings import get_active_search_settings_list
|
from onyx.db.search_settings import get_active_search_settings_list
|
||||||
from onyx.redis.redis_pool import get_redis_client
|
from onyx.redis.redis_pool import get_redis_client
|
||||||
from onyx.redis.redis_pool import redis_lock_dump
|
from onyx.redis.redis_pool import redis_lock_dump
|
||||||
|
from onyx.utils.logger import is_running_in_container
|
||||||
from onyx.utils.telemetry import optional_telemetry
|
from onyx.utils.telemetry import optional_telemetry
|
||||||
from onyx.utils.telemetry import RecordType
|
from onyx.utils.telemetry import RecordType
|
||||||
|
from shared_configs.configs import MULTI_TENANT
|
||||||
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
|
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR
|
||||||
|
|
||||||
_MONITORING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
|
_MONITORING_SOFT_TIME_LIMIT = 60 * 5 # 5 minutes
|
||||||
@ -904,3 +908,93 @@ def monitor_celery_queues_helper(
|
|||||||
f"external_group_sync={n_external_group_sync} "
|
f"external_group_sync={n_external_group_sync} "
|
||||||
f"permissions_upsert={n_permissions_upsert} "
|
f"permissions_upsert={n_permissions_upsert} "
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
"""Memory monitoring"""
|
||||||
|
|
||||||
|
|
||||||
|
def _get_cmdline_for_process(process: psutil.Process) -> str | None:
|
||||||
|
try:
|
||||||
|
return " ".join(process.cmdline())
|
||||||
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(
|
||||||
|
name=OnyxCeleryTask.MONITOR_PROCESS_MEMORY,
|
||||||
|
ignore_result=True,
|
||||||
|
soft_time_limit=_MONITORING_SOFT_TIME_LIMIT,
|
||||||
|
time_limit=_MONITORING_TIME_LIMIT,
|
||||||
|
queue=OnyxCeleryQueues.MONITORING,
|
||||||
|
bind=True,
|
||||||
|
)
|
||||||
|
def monitor_process_memory(self: Task, *, tenant_id: str) -> None:
|
||||||
|
"""
|
||||||
|
Task to monitor memory usage of supervisor-managed processes.
|
||||||
|
This periodically checks the memory usage of processes and logs information
|
||||||
|
in a standardized format.
|
||||||
|
|
||||||
|
The task looks for processes managed by supervisor and logs their
|
||||||
|
memory usage statistics. This is useful for monitoring memory consumption
|
||||||
|
over time and identifying potential memory leaks.
|
||||||
|
"""
|
||||||
|
# don't run this task in multi-tenant mode, have other, better means of monitoring
|
||||||
|
if MULTI_TENANT:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Skip memory monitoring if not in container
|
||||||
|
if not is_running_in_container():
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get all supervisor-managed processes
|
||||||
|
supervisor_processes: dict[int, str] = {}
|
||||||
|
|
||||||
|
# Map cmd line elements to more readable process names
|
||||||
|
process_type_mapping = {
|
||||||
|
"--hostname=primary": "primary",
|
||||||
|
"--hostname=light": "light",
|
||||||
|
"--hostname=heavy": "heavy",
|
||||||
|
"--hostname=indexing": "indexing",
|
||||||
|
"--hostname=monitoring": "monitoring",
|
||||||
|
"beat": "beat",
|
||||||
|
"slack/listener.py": "slack",
|
||||||
|
}
|
||||||
|
|
||||||
|
# Find all python processes that are likely celery workers
|
||||||
|
for proc in psutil.process_iter():
|
||||||
|
cmdline = _get_cmdline_for_process(proc)
|
||||||
|
if not cmdline:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Match supervisor-managed processes
|
||||||
|
for process_name, process_type in process_type_mapping.items():
|
||||||
|
if process_name in cmdline:
|
||||||
|
if process_type in supervisor_processes.values():
|
||||||
|
task_logger.error(
|
||||||
|
f"Duplicate process type for type {process_type} "
|
||||||
|
f"with cmd {cmdline} with pid={proc.pid}."
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
supervisor_processes[proc.pid] = process_type
|
||||||
|
break
|
||||||
|
|
||||||
|
if len(supervisor_processes) != len(process_type_mapping):
|
||||||
|
task_logger.error(
|
||||||
|
"Missing processes: "
|
||||||
|
f"{set(process_type_mapping.keys()).symmetric_difference(supervisor_processes.values())}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Log memory usage for each process
|
||||||
|
for pid, process_type in supervisor_processes.items():
|
||||||
|
try:
|
||||||
|
emit_process_memory(pid, process_type, {})
|
||||||
|
except psutil.NoSuchProcess:
|
||||||
|
# Process may have terminated since we obtained the list
|
||||||
|
continue
|
||||||
|
except Exception as e:
|
||||||
|
task_logger.exception(f"Error monitoring process {pid}: {str(e)}")
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
task_logger.exception("Error in monitor_process_memory task")
|
||||||
|
@ -403,6 +403,7 @@ class OnyxCeleryTask:
|
|||||||
|
|
||||||
MONITOR_BACKGROUND_PROCESSES = "monitor_background_processes"
|
MONITOR_BACKGROUND_PROCESSES = "monitor_background_processes"
|
||||||
MONITOR_CELERY_QUEUES = "monitor_celery_queues"
|
MONITOR_CELERY_QUEUES = "monitor_celery_queues"
|
||||||
|
MONITOR_PROCESS_MEMORY = "monitor_process_memory"
|
||||||
|
|
||||||
# Tenant pre-provisioning
|
# Tenant pre-provisioning
|
||||||
PRE_PROVISION_TENANT = "pre_provision_tenant"
|
PRE_PROVISION_TENANT = "pre_provision_tenant"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user