diff --git a/backend/danswer/background/celery/celery_redis.py b/backend/danswer/background/celery/celery_redis.py index b3132c59a..271e9a76a 100644 --- a/backend/danswer/background/celery/celery_redis.py +++ b/backend/danswer/background/celery/celery_redis.py @@ -134,7 +134,7 @@ class RedisDocumentSet(RedisObjectHelper): last_lock_time = current_time # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" - # the actual redis key is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" + # the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" # we prefix the task id so it's easier to keep track of who created the task # aka "documentset_1_6dd32ded3-00aa-4884-8b21-42f8332e7fac" custom_task_id = f"{self.task_id_prefix}_{uuid4()}" @@ -189,7 +189,7 @@ class RedisUserGroup(RedisObjectHelper): last_lock_time = current_time # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" - # the actual redis key is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" + # the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" # we prefix the task id so it's easier to keep track of who created the task # aka "documentset_1_6dd32ded3-00aa-4884-8b21-42f8332e7fac" custom_task_id = f"{self.task_id_prefix}_{uuid4()}" @@ -256,7 +256,7 @@ class RedisConnectorCredentialPair(RedisObjectHelper): last_lock_time = current_time # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" - # the actual redis key is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" + # the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" # we prefix the task id so it's easier to keep track of who created the task # aka "documentset_1_6dd32ded3-00aa-4884-8b21-42f8332e7fac" custom_task_id = f"{self.task_id_prefix}_{uuid4()}" diff --git a/backend/danswer/background/celery/celeryconfig.py b/backend/danswer/background/celery/celeryconfig.py index 898cfd4b9..d0314adf8 100644 --- a/backend/danswer/background/celery/celeryconfig.py +++ b/backend/danswer/background/celery/celeryconfig.py @@ -1,5 +1,7 @@ # docs: https://docs.celeryq.dev/en/stable/userguide/configuration.html +from danswer.configs.app_configs import CELERY_RESULT_EXPIRES from danswer.configs.app_configs import REDIS_DB_NUMBER_CELERY +from danswer.configs.app_configs import REDIS_DB_NUMBER_CELERY_RESULT_BACKEND from danswer.configs.app_configs import REDIS_HOST from danswer.configs.app_configs import REDIS_PASSWORD from danswer.configs.app_configs import REDIS_PORT @@ -27,7 +29,7 @@ if REDIS_SSL: # example celery_broker_url: "redis://:password@localhost:6379/15" broker_url = f"{REDIS_SCHEME}://{CELERY_PASSWORD_PART}{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_NUMBER_CELERY}{SSL_QUERY_PARAMS}" -result_backend = f"{REDIS_SCHEME}://{CELERY_PASSWORD_PART}{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_NUMBER_CELERY}{SSL_QUERY_PARAMS}" +result_backend = f"{REDIS_SCHEME}://{CELERY_PASSWORD_PART}{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB_NUMBER_CELERY_RESULT_BACKEND}{SSL_QUERY_PARAMS}" # NOTE: prefetch 4 is significantly faster than prefetch 1 for small tasks # however, prefetching is bad when tasks are lengthy as those tasks @@ -42,3 +44,33 @@ broker_transport_options = { task_default_priority = DanswerCeleryPriority.MEDIUM task_acks_late = True + +# It's possible we don't even need celery's result backend, in which case all of the optimization below +# might be irrelevant +result_expires = CELERY_RESULT_EXPIRES # 86400 seconds is the default + +# Option 0: Defaults (json serializer, no compression) +# about 1.5 KB per queued task. 1KB in queue, 400B for result, 100 as a child entry in generator result + +# Option 1: Reduces generator task result sizes by roughly 20% +# task_compression = "bzip2" +# task_serializer = "pickle" +# result_compression = "bzip2" +# result_serializer = "pickle" +# accept_content=["pickle"] + +# Option 2: this significantly reduces the size of the result for generator tasks since the list of children +# can be large. small tasks change very little +# def pickle_bz2_encoder(data): +# return bz2.compress(pickle.dumps(data)) + +# def pickle_bz2_decoder(data): +# return pickle.loads(bz2.decompress(data)) + +# from kombu import serialization # To register custom serialization with Celery/Kombu + +# serialization.register('pickle-bzip2', pickle_bz2_encoder, pickle_bz2_decoder, 'application/x-pickle-bz2', 'binary') + +# task_serializer = "pickle-bzip2" +# result_serializer = "pickle-bzip2" +# accept_content=["pickle", "pickle-bzip2"] diff --git a/backend/danswer/configs/app_configs.py b/backend/danswer/configs/app_configs.py index 4b5109b5e..c97bd535d 100644 --- a/backend/danswer/configs/app_configs.py +++ b/backend/danswer/configs/app_configs.py @@ -159,11 +159,16 @@ REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD") or "" REDIS_DB_NUMBER = int(os.environ.get("REDIS_DB_NUMBER", 0)) # Used by celery as broker and backend -REDIS_DB_NUMBER_CELERY = int(os.environ.get("REDIS_DB_NUMBER_CELERY", 15)) +REDIS_DB_NUMBER_CELERY_RESULT_BACKEND = int( + os.environ.get("REDIS_DB_NUMBER_CELERY_RESULT_BACKEND", 14) +) +REDIS_DB_NUMBER_CELERY = int(os.environ.get("REDIS_DB_NUMBER_CELERY", 15)) # broker REDIS_SSL_CERT_REQS = os.getenv("REDIS_SSL_CERT_REQS", "CERT_NONE") REDIS_SSL_CA_CERTS = os.getenv("REDIS_SSL_CA_CERTS", "") +CELERY_RESULT_EXPIRES = int(os.environ.get("CELERY_RESULT_EXPIRES", 86400)) # seconds + ##### # Connector Configs #####