robustify

This commit is contained in:
pablodanswer 2024-10-23 17:22:00 -07:00
parent e5a96cb115
commit 41b8472a31
6 changed files with 18 additions and 12 deletions

View File

@ -231,10 +231,22 @@ def on_worker_shutdown(sender: Any, **kwargs: Any) -> None:
return
for tenant_id, lock in sender.primary_worker_locks.items():
if lock and lock.owned():
logger.debug(f"Releasing lock for tenant {tenant_id}")
lock.release()
sender.primary_worker_locks[tenant_id] = None
try:
if lock and lock.owned():
logger.debug(f"Attempting to release lock for tenant {tenant_id}")
try:
lock.release()
logger.debug(f"Successfully released lock for tenant {tenant_id}")
except Exception as e:
logger.error(
f"Failed to release lock for tenant {tenant_id}. Error: {str(e)}"
)
finally:
sender.primary_worker_locks[tenant_id] = None
except Exception as e:
logger.error(
f"Error checking lock status for tenant {tenant_id}. Error: {str(e)}"
)
def on_setup_logging(

View File

@ -269,8 +269,8 @@ class HubPeriodicTask(bootsteps.StartStopStep):
f"Primary worker lock for tenant {tenant_id} could not be acquired!"
)
except Exception as e:
task_logger.error(f"Error in periodic task: {e}")
except Exception:
task_logger.exception("Periodic task failed.")
def stop(self, worker: Any) -> None:
# Cancel the scheduled task when the worker stops

View File

@ -68,7 +68,6 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
try:
# these tasks should never overlap
if not lock_beat.acquire(blocking=False):
task_logger.info(f"Lock acquired for tenant (Y): {tenant_id}")
return None
cc_pair_ids: list[int] = []

View File

@ -60,7 +60,6 @@ from danswer.utils.variable_functionality import set_is_ee_based_on_env_variable
from shared_configs.configs import CURRENT_TENANT_ID_CONTEXTVAR
from shared_configs.configs import MODEL_SERVER_HOST
from shared_configs.configs import MODEL_SERVER_PORT
from shared_configs.configs import POSTGRES_DEFAULT_SCHEMA
from shared_configs.configs import SLACK_CHANNEL_ID
logger = setup_logger()

View File

@ -8,7 +8,6 @@ from danswer.utils.logger import setup_logger
from ee.danswer.configs.app_configs import STRIPE_PRICE_ID
from ee.danswer.configs.app_configs import STRIPE_SECRET_KEY
from ee.danswer.server.tenants.access import generate_data_plane_token
from shared_configs.configs import CURRENT_TENANT_ID_CONTEXTVAR
stripe.api_key = STRIPE_SECRET_KEY
@ -50,7 +49,6 @@ def register_tenant_users(tenant_id: str, number_of_users: int) -> stripe.Subscr
if not STRIPE_PRICE_ID:
raise Exception("STRIPE_PRICE_ID is not set")
tenant_id = CURRENT_TENANT_ID_CONTEXTVAR.get()
response = fetch_tenant_stripe_information(tenant_id)
stripe_subscription_id = cast(str, response.get("stripe_subscription_id"))

View File

@ -132,9 +132,7 @@ else:
POSTGRES_DEFAULT_SCHEMA = os.environ.get("POSTGRES_DEFAULT_SCHEMA") or "public"
CURRENT_TENANT_ID_CONTEXTVAR = contextvars.ContextVar(
"current_tenant_id", default=POSTGRES_DEFAULT_SCHEMA
)