Fix ResourceLogger blocking main thread

This commit is contained in:
Weves
2023-11-20 16:41:51 -08:00
committed by Chris Weaver
parent eeb844e35e
commit 6e9f31d1e9

View File

@@ -1,4 +1,4 @@
import time import asyncio
import psutil import psutil
from dask.distributed import WorkerPlugin from dask.distributed import WorkerPlugin
@@ -18,8 +18,11 @@ class ResourceLogger(WorkerPlugin):
self.worker = worker self.worker = worker
worker.loop.add_callback(self.log_resources) worker.loop.add_callback(self.log_resources)
def log_resources(self) -> None: async def log_resources(self) -> None:
"""Periodically log CPU and memory usage.""" """Periodically log CPU and memory usage.
NOTE: must be async or else will clog up the worker indefinitely due to the fact that
Dask uses Tornado under the hood (which is async)"""
while True: while True:
cpu_percent = psutil.cpu_percent(interval=None) cpu_percent = psutil.cpu_percent(interval=None)
memory_available_gb = psutil.virtual_memory().available / (1024.0**3) memory_available_gb = psutil.virtual_memory().available / (1024.0**3)
@@ -27,4 +30,4 @@ class ResourceLogger(WorkerPlugin):
logger.debug( logger.debug(
f"Worker {self.worker.address}: CPU usage {cpu_percent}%, Memory available {memory_available_gb}GB" f"Worker {self.worker.address}: CPU usage {cpu_percent}%, Memory available {memory_available_gb}GB"
) )
time.sleep(self.log_interval) await asyncio.sleep(self.log_interval)