From 95701db1bdf60a5bdd6a9cc2c007bfd4adf0b316 Mon Sep 17 00:00:00 2001 From: Chris Weaver <25087905+Weves@users.noreply.github.com> Date: Wed, 29 Jan 2025 18:03:49 -0800 Subject: [PATCH] Add more sync records + fix small bug in monitoring task causing deletion metrics to never be emitted (#3837) Double check we don't double-emit + fix pruning metric Add log Fix comment rename --- .../tasks/doc_permission_syncing/tasks.py | 68 +++++++++ .../tasks/external_group_syncing/tasks.py | 28 ++++ .../celery/tasks/monitoring/tasks.py | 129 ++++++++++++++---- .../background/celery/tasks/pruning/tasks.py | 65 +++++++++ .../background/celery/tasks/vespa/tasks.py | 86 +----------- backend/onyx/db/enums.py | 3 + backend/onyx/redis/redis_connector_prune.py | 2 +- 7 files changed, 275 insertions(+), 106 deletions(-) diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index 48f7dc60d85a..81887dd9df5a 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -11,6 +11,7 @@ from celery import Task from celery.exceptions import SoftTimeLimitExceeded from redis import Redis from redis.lock import Lock as RedisLock +from sqlalchemy.orm import Session from ee.onyx.db.connector_credential_pair import get_all_auto_sync_cc_pairs from ee.onyx.db.document import upsert_document_external_perms @@ -31,12 +32,17 @@ from onyx.configs.constants import OnyxCeleryPriority from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisLocks +from onyx.db.connector import mark_cc_pair_as_permissions_synced from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.document import upsert_document_by_connector_credential_pair from onyx.db.engine import get_session_with_tenant from onyx.db.enums import AccessType from onyx.db.enums import ConnectorCredentialPairStatus +from onyx.db.enums import SyncStatus +from onyx.db.enums import SyncType from onyx.db.models import ConnectorCredentialPair +from onyx.db.sync_record import insert_sync_record +from onyx.db.sync_record import update_sync_record_status from onyx.db.users import batch_add_ext_perm_user_if_not_exists from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_doc_perm_sync import ( @@ -57,6 +63,9 @@ LIGHT_SOFT_TIME_LIMIT = 105 LIGHT_TIME_LIMIT = LIGHT_SOFT_TIME_LIMIT + 15 +"""Jobs / utils for kicking off doc permissions sync tasks.""" + + def _is_external_doc_permissions_sync_due(cc_pair: ConnectorCredentialPair) -> bool: """Returns boolean indicating if external doc permissions sync is due.""" @@ -174,6 +183,15 @@ def try_creating_permissions_sync_task( custom_task_id = f"{redis_connector.permissions.generator_task_key}_{uuid4()}" + # create before setting fence to avoid race condition where the monitoring + # task updates the sync record before it is created + with get_session_with_tenant(tenant_id) as db_session: + insert_sync_record( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.EXTERNAL_PERMISSIONS, + ) + # set a basic fence to start payload = RedisConnectorPermissionSyncPayload(started=None, celery_task_id=None) redis_connector.permissions.set_fence(payload) @@ -400,3 +418,53 @@ def update_external_document_permissions_task( f"Error Syncing Document Permissions: connector_id={connector_id} doc_id={doc_id}" ) return False + + +"""Monitoring CCPair permissions utils, called in monitor_vespa_sync""" + + +def monitor_ccpair_permissions_taskset( + tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session +) -> None: + fence_key = key_bytes.decode("utf-8") + cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key) + if cc_pair_id_str is None: + task_logger.warning( + f"monitor_ccpair_permissions_taskset: could not parse cc_pair_id from {fence_key}" + ) + return + + cc_pair_id = int(cc_pair_id_str) + + redis_connector = RedisConnector(tenant_id, cc_pair_id) + if not redis_connector.permissions.fenced: + return + + initial = redis_connector.permissions.generator_complete + if initial is None: + return + + remaining = redis_connector.permissions.get_remaining() + task_logger.info( + f"Permissions sync progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}" + ) + if remaining > 0: + return + + payload: RedisConnectorPermissionSyncPayload | None = ( + redis_connector.permissions.payload + ) + start_time: datetime | None = payload.started if payload else None + + mark_cc_pair_as_permissions_synced(db_session, int(cc_pair_id), start_time) + task_logger.info(f"Successfully synced permissions for cc_pair={cc_pair_id}") + + update_sync_record_status( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.EXTERNAL_PERMISSIONS, + sync_status=SyncStatus.SUCCESS, + num_docs_synced=initial, + ) + + redis_connector.permissions.reset() diff --git a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py index ce8544146611..42b108d13e94 100644 --- a/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/external_group_syncing/tasks.py @@ -33,7 +33,11 @@ from onyx.db.connector_credential_pair import get_connector_credential_pair_from from onyx.db.engine import get_session_with_tenant from onyx.db.enums import AccessType from onyx.db.enums import ConnectorCredentialPairStatus +from onyx.db.enums import SyncStatus +from onyx.db.enums import SyncType from onyx.db.models import ConnectorCredentialPair +from onyx.db.sync_record import insert_sync_record +from onyx.db.sync_record import update_sync_record_status from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_ext_group_sync import ( RedisConnectorExternalGroupSyncPayload, @@ -200,6 +204,15 @@ def try_creating_external_group_sync_task( celery_task_id=result.id, ) + # create before setting fence to avoid race condition where the monitoring + # task updates the sync record before it is created + with get_session_with_tenant(tenant_id) as db_session: + insert_sync_record( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.EXTERNAL_GROUP, + ) + redis_connector.external_group_sync.set_fence(payload) except Exception: @@ -289,11 +302,26 @@ def connector_external_group_sync_generator_task( ) mark_cc_pair_as_external_group_synced(db_session, cc_pair.id) + + update_sync_record_status( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.EXTERNAL_GROUP, + sync_status=SyncStatus.SUCCESS, + ) except Exception as e: task_logger.exception( f"Failed to run external group sync: cc_pair={cc_pair_id}" ) + with get_session_with_tenant(tenant_id) as db_session: + update_sync_record_status( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.EXTERNAL_GROUP, + sync_status=SyncStatus.FAILED, + ) + redis_connector.external_group_sync.generator_clear() redis_connector.external_group_sync.taskset_clear() raise e diff --git a/backend/onyx/background/celery/tasks/monitoring/tasks.py b/backend/onyx/background/celery/tasks/monitoring/tasks.py index d1bc14d738b6..d2116c60cd4f 100644 --- a/backend/onyx/background/celery/tasks/monitoring/tasks.py +++ b/backend/onyx/background/celery/tasks/monitoring/tasks.py @@ -58,6 +58,11 @@ _SYNC_START_LATENCY_KEY_FMT = ( "sync_start_latency:{sync_type}:{entity_id}:{sync_record_id}" ) +_CONNECTOR_START_TIME_KEY_FMT = "connector_start_time:{cc_pair_id}:{index_attempt_id}" +_CONNECTOR_END_TIME_KEY_FMT = "connector_end_time:{cc_pair_id}:{index_attempt_id}" +_SYNC_START_TIME_KEY_FMT = "sync_start_time:{sync_type}:{entity_id}:{sync_record_id}" +_SYNC_END_TIME_KEY_FMT = "sync_end_time:{sync_type}:{entity_id}:{sync_record_id}" + def _mark_metric_as_emitted(redis_std: Redis, key: str) -> None: """Mark a metric as having been emitted by setting a Redis key with expiration""" @@ -303,8 +308,6 @@ def _build_connector_final_metrics( ) ) - _mark_metric_as_emitted(redis_std, metric_key) - return metrics @@ -344,6 +347,52 @@ def _collect_connector_metrics(db_session: Session, redis_std: Redis) -> list[Me if one_hour_ago > most_recent_attempt.time_created: continue + # Build a job_id for correlation + job_id = build_job_id( + "connector", str(cc_pair.id), str(most_recent_attempt.id) + ) + + # Add raw start time metric if available + if most_recent_attempt.time_started: + start_time_key = _CONNECTOR_START_TIME_KEY_FMT.format( + cc_pair_id=cc_pair.id, + index_attempt_id=most_recent_attempt.id, + ) + metrics.append( + Metric( + key=start_time_key, + name="connector_start_time", + value=most_recent_attempt.time_started.timestamp(), + tags={ + "job_id": job_id, + "connector_id": str(cc_pair.connector.id), + "source": str(cc_pair.connector.source), + }, + ) + ) + + # Add raw end time metric if available and in terminal state + if ( + most_recent_attempt.status.is_terminal() + and most_recent_attempt.time_updated + ): + end_time_key = _CONNECTOR_END_TIME_KEY_FMT.format( + cc_pair_id=cc_pair.id, + index_attempt_id=most_recent_attempt.id, + ) + metrics.append( + Metric( + key=end_time_key, + name="connector_end_time", + value=most_recent_attempt.time_updated.timestamp(), + tags={ + "job_id": job_id, + "connector_id": str(cc_pair.connector.id), + "source": str(cc_pair.connector.source), + }, + ) + ) + # Connector start latency start_latency_metric = _build_connector_start_latency_metric( cc_pair, most_recent_attempt, second_most_recent_attempt, redis_std @@ -365,9 +414,10 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] """ Collect metrics for document set and group syncing: - Success/failure status - - Start latency (always) + - Start latency (for doc sets / user groups) - Duration & doc count (only if success) - Throughput (docs/min) (only if success) + - Raw start/end times for each sync """ one_hour_ago = get_db_current_time(db_session) - timedelta(hours=1) @@ -389,6 +439,43 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] # Build a job_id for correlation job_id = build_job_id("sync_record", str(sync_record.id)) + # Add raw start time metric + start_time_key = _SYNC_START_TIME_KEY_FMT.format( + sync_type=sync_record.sync_type, + entity_id=sync_record.entity_id, + sync_record_id=sync_record.id, + ) + metrics.append( + Metric( + key=start_time_key, + name="sync_start_time", + value=sync_record.sync_start_time.timestamp(), + tags={ + "job_id": job_id, + "sync_type": str(sync_record.sync_type), + }, + ) + ) + + # Add raw end time metric if available + if sync_record.sync_end_time: + end_time_key = _SYNC_END_TIME_KEY_FMT.format( + sync_type=sync_record.sync_type, + entity_id=sync_record.entity_id, + sync_record_id=sync_record.id, + ) + metrics.append( + Metric( + key=end_time_key, + name="sync_end_time", + value=sync_record.sync_end_time.timestamp(), + tags={ + "job_id": job_id, + "sync_type": str(sync_record.sync_type), + }, + ) + ) + # Emit a SUCCESS/FAIL boolean metric # Use a single Redis key to avoid re-emitting final metrics final_metric_key = _FINAL_METRIC_KEY_FMT.format( @@ -439,7 +526,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] if duration_seconds is not None: metrics.append( Metric( - key=None, + key=final_metric_key, name="sync_duration_seconds", value=duration_seconds, tags={ @@ -455,7 +542,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] metrics.append( Metric( - key=None, + key=final_metric_key, name="sync_doc_count", value=doc_count, tags={ @@ -468,7 +555,7 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] if sync_speed is not None: metrics.append( Metric( - key=None, + key=final_metric_key, name="sync_speed_docs_per_min", value=sync_speed, tags={ @@ -482,9 +569,6 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] f"Invalid sync record {sync_record.id} with no duration" ) - # Mark final metrics as emitted so we don't re-emit - _mark_metric_as_emitted(redis_std, final_metric_key) - # Emit start latency start_latency_key = _SYNC_START_LATENCY_KEY_FMT.format( sync_type=sync_record.sync_type, @@ -502,22 +586,20 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] entity = db_session.scalar( select(UserGroup).where(UserGroup.id == sync_record.entity_id) ) - else: - task_logger.info( - f"Skipping sync record {sync_record.id} of type {sync_record.sync_type}." - ) - continue if entity is None: task_logger.error( - f"Could not find entity for sync record {sync_record.id} " - f"(type={sync_record.sync_type}, id={sync_record.entity_id})." + f"Sync record of type {sync_record.sync_type} doesn't have an entity " + f"associated with it (id={sync_record.entity_id}). Skipping start latency metric." ) - continue # Calculate start latency in seconds: # (actual sync start) - (last modified time) - if entity.time_last_modified_by_user and sync_record.sync_start_time: + if ( + entity is not None + and entity.time_last_modified_by_user + and sync_record.sync_start_time + ): start_latency = ( sync_record.sync_start_time - entity.time_last_modified_by_user ).total_seconds() @@ -541,8 +623,6 @@ def _collect_sync_metrics(db_session: Session, redis_std: Redis) -> list[Metric] ) ) - _mark_metric_as_emitted(redis_std, start_latency_key) - return metrics @@ -607,9 +687,12 @@ def monitor_background_processes(self: Task, *, tenant_id: str | None) -> None: for metric_fn in metric_functions: metrics = metric_fn() for metric in metrics: - metric.log() - metric.emit(tenant_id) - if metric.key: + # double check to make sure we aren't double-emitting metrics + if metric.key is not None and not _has_metric_been_emitted( + redis_std, metric.key + ): + metric.log() + metric.emit(tenant_id) _mark_metric_as_emitted(redis_std, metric.key) task_logger.info("Successfully collected background metrics") diff --git a/backend/onyx/background/celery/tasks/pruning/tasks.py b/backend/onyx/background/celery/tasks/pruning/tasks.py index 3003af1960f0..c0e51e393b46 100644 --- a/backend/onyx/background/celery/tasks/pruning/tasks.py +++ b/backend/onyx/background/celery/tasks/pruning/tasks.py @@ -25,13 +25,18 @@ from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisLocks from onyx.connectors.factory import instantiate_connector from onyx.connectors.models import InputType +from onyx.db.connector import mark_ccpair_as_pruned from onyx.db.connector_credential_pair import get_connector_credential_pair from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id from onyx.db.connector_credential_pair import get_connector_credential_pairs from onyx.db.document import get_documents_for_connector_credential_pair from onyx.db.engine import get_session_with_tenant from onyx.db.enums import ConnectorCredentialPairStatus +from onyx.db.enums import SyncStatus +from onyx.db.enums import SyncType from onyx.db.models import ConnectorCredentialPair +from onyx.db.sync_record import insert_sync_record +from onyx.db.sync_record import update_sync_record_status from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_pool import get_redis_client from onyx.utils.logger import pruning_ctx @@ -40,6 +45,9 @@ from onyx.utils.logger import setup_logger logger = setup_logger() +"""Jobs / utils for kicking off pruning tasks.""" + + def _is_pruning_due(cc_pair: ConnectorCredentialPair) -> bool: """Returns boolean indicating if pruning is due. @@ -204,6 +212,14 @@ def try_creating_prune_generator_task( priority=OnyxCeleryPriority.LOW, ) + # create before setting fence to avoid race condition where the monitoring + # task updates the sync record before it is created + insert_sync_record( + db_session=db_session, + entity_id=cc_pair.id, + sync_type=SyncType.PRUNING, + ) + # set this only after all tasks have been added redis_connector.prune.set_fence(True) except Exception: @@ -348,3 +364,52 @@ def connector_pruning_generator_task( lock.release() task_logger.info(f"Pruning generator finished: cc_pair={cc_pair_id}") + + +"""Monitoring pruning utils, called in monitor_vespa_sync""" + + +def monitor_ccpair_pruning_taskset( + tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session +) -> None: + fence_key = key_bytes.decode("utf-8") + cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key) + if cc_pair_id_str is None: + task_logger.warning( + f"monitor_ccpair_pruning_taskset: could not parse cc_pair_id from {fence_key}" + ) + return + + cc_pair_id = int(cc_pair_id_str) + + redis_connector = RedisConnector(tenant_id, cc_pair_id) + if not redis_connector.prune.fenced: + return + + initial = redis_connector.prune.generator_complete + if initial is None: + return + + remaining = redis_connector.prune.get_remaining() + task_logger.info( + f"Connector pruning progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}" + ) + if remaining > 0: + return + + mark_ccpair_as_pruned(int(cc_pair_id), db_session) + task_logger.info( + f"Successfully pruned connector credential pair. cc_pair={cc_pair_id}" + ) + + update_sync_record_status( + db_session=db_session, + entity_id=cc_pair_id, + sync_type=SyncType.PRUNING, + sync_status=SyncStatus.SUCCESS, + num_docs_synced=initial, + ) + + redis_connector.prune.taskset_clear() + redis_connector.prune.generator_clear() + redis_connector.prune.set_fence(False) diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index 66e9201286cf..7152903bf077 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -24,6 +24,10 @@ from onyx.access.access import get_access_for_document from onyx.background.celery.apps.app_base import task_logger from onyx.background.celery.celery_redis import celery_get_queue_length from onyx.background.celery.celery_redis import celery_get_unacked_task_ids +from onyx.background.celery.tasks.doc_permission_syncing.tasks import ( + monitor_ccpair_permissions_taskset, +) +from onyx.background.celery.tasks.pruning.tasks import monitor_ccpair_pruning_taskset from onyx.background.celery.tasks.shared.RetryDocumentIndex import RetryDocumentIndex from onyx.background.celery.tasks.shared.tasks import LIGHT_SOFT_TIME_LIMIT from onyx.background.celery.tasks.shared.tasks import LIGHT_TIME_LIMIT @@ -34,8 +38,6 @@ from onyx.configs.constants import OnyxCeleryQueues from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisLocks from onyx.db.connector import fetch_connector_by_id -from onyx.db.connector import mark_cc_pair_as_permissions_synced -from onyx.db.connector import mark_ccpair_as_pruned from onyx.db.connector_credential_pair import add_deletion_failure_message from onyx.db.connector_credential_pair import ( delete_connector_credential_pair__no_commit, @@ -72,9 +74,6 @@ from onyx.redis.redis_connector import RedisConnector from onyx.redis.redis_connector_credential_pair import RedisConnectorCredentialPair from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync -from onyx.redis.redis_connector_doc_perm_sync import ( - RedisConnectorPermissionSyncPayload, -) from onyx.redis.redis_connector_index import RedisConnectorIndex from onyx.redis.redis_connector_prune import RedisConnectorPrune from onyx.redis.redis_document_set import RedisDocumentSet @@ -653,83 +652,6 @@ def monitor_connector_deletion_taskset( redis_connector.delete.reset() -def monitor_ccpair_pruning_taskset( - tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session -) -> None: - fence_key = key_bytes.decode("utf-8") - cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key) - if cc_pair_id_str is None: - task_logger.warning( - f"monitor_ccpair_pruning_taskset: could not parse cc_pair_id from {fence_key}" - ) - return - - cc_pair_id = int(cc_pair_id_str) - - redis_connector = RedisConnector(tenant_id, cc_pair_id) - if not redis_connector.prune.fenced: - return - - initial = redis_connector.prune.generator_complete - if initial is None: - return - - remaining = redis_connector.prune.get_remaining() - task_logger.info( - f"Connector pruning progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}" - ) - if remaining > 0: - return - - mark_ccpair_as_pruned(int(cc_pair_id), db_session) - task_logger.info( - f"Successfully pruned connector credential pair. cc_pair={cc_pair_id}" - ) - - redis_connector.prune.taskset_clear() - redis_connector.prune.generator_clear() - redis_connector.prune.set_fence(False) - - -def monitor_ccpair_permissions_taskset( - tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session -) -> None: - fence_key = key_bytes.decode("utf-8") - cc_pair_id_str = RedisConnector.get_id_from_fence_key(fence_key) - if cc_pair_id_str is None: - task_logger.warning( - f"monitor_ccpair_permissions_taskset: could not parse cc_pair_id from {fence_key}" - ) - return - - cc_pair_id = int(cc_pair_id_str) - - redis_connector = RedisConnector(tenant_id, cc_pair_id) - if not redis_connector.permissions.fenced: - return - - initial = redis_connector.permissions.generator_complete - if initial is None: - return - - remaining = redis_connector.permissions.get_remaining() - task_logger.info( - f"Permissions sync progress: cc_pair={cc_pair_id} remaining={remaining} initial={initial}" - ) - if remaining > 0: - return - - payload: RedisConnectorPermissionSyncPayload | None = ( - redis_connector.permissions.payload - ) - start_time: datetime | None = payload.started if payload else None - - mark_cc_pair_as_permissions_synced(db_session, int(cc_pair_id), start_time) - task_logger.info(f"Successfully synced permissions for cc_pair={cc_pair_id}") - - redis_connector.permissions.reset() - - def monitor_ccpair_indexing_taskset( tenant_id: str | None, key_bytes: bytes, r: Redis, db_session: Session ) -> None: diff --git a/backend/onyx/db/enums.py b/backend/onyx/db/enums.py index b32825298e39..7a7943a014c5 100644 --- a/backend/onyx/db/enums.py +++ b/backend/onyx/db/enums.py @@ -28,6 +28,9 @@ class SyncType(str, PyEnum): DOCUMENT_SET = "document_set" USER_GROUP = "user_group" CONNECTOR_DELETION = "connector_deletion" + PRUNING = "pruning" # not really a sync, but close enough + EXTERNAL_PERMISSIONS = "external_permissions" + EXTERNAL_GROUP = "external_group" def __str__(self) -> str: return self.value diff --git a/backend/onyx/redis/redis_connector_prune.py b/backend/onyx/redis/redis_connector_prune.py index 75279028f815..ea4a923eb6d8 100644 --- a/backend/onyx/redis/redis_connector_prune.py +++ b/backend/onyx/redis/redis_connector_prune.py @@ -92,7 +92,7 @@ class RedisConnectorPrune: if fence_bytes is None: return None - fence_int = cast(int, fence_bytes) + fence_int = int(cast(bytes, fence_bytes)) return fence_int @generator_complete.setter