diff --git a/.python-version b/.python-version new file mode 100644 index 0000000..1445aee --- /dev/null +++ b/.python-version @@ -0,0 +1 @@ +3.10.14 diff --git a/modules/capturer.py b/modules/capturer.py index fd49d46..9843ba3 100644 --- a/modules/capturer.py +++ b/modules/capturer.py @@ -1,20 +1,38 @@ -from typing import Any +from typing import Any, Optional import cv2 - -def get_video_frame(video_path: str, frame_number: int = 0) -> Any: +def get_video_frame(video_path: str, frame_number: int = 0) -> Optional[Any]: + """Retrieve a specific frame from a video.""" capture = cv2.VideoCapture(video_path) - frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT) - capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1)) + + if not capture.isOpened(): + print(f"Error: Cannot open video file {video_path}") + return None + + frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) + + # Ensure frame_number is within the valid range + frame_number = max(0, min(frame_number, frame_total - 1)) + + capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number) has_frame, frame = capture.read() capture.release() - if has_frame: - return frame - return None + if not has_frame: + print(f"Error: Cannot read frame {frame_number} from {video_path}") + return None + + return frame def get_video_frame_total(video_path: str) -> int: + """Get the total number of frames in a video.""" capture = cv2.VideoCapture(video_path) - video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) + + if not capture.isOpened(): + print(f"Error: Cannot open video file {video_path}") + return 0 + + frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT)) capture.release() - return video_frame_total + + return frame_total diff --git a/modules/core.py b/modules/core.py index 3ec1617..6e2d646 100644 --- a/modules/core.py +++ b/modules/core.py @@ -1,16 +1,17 @@ import os import sys -# single thread doubles cuda performance - needs to be set before torch import -if any(arg.startswith('--execution-provider') for arg in sys.argv): - os.environ['OMP_NUM_THREADS'] = '1' -# reduce tensorflow log level -os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' import warnings -from typing import List import platform import signal import shutil import argparse +from typing import List + +# Set environment variables for CUDA performance and TensorFlow logging +if any(arg.startswith('--execution-provider') for arg in sys.argv): + os.environ['OMP_NUM_THREADS'] = '1' +os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' + import torch import onnxruntime import tensorflow @@ -19,34 +20,56 @@ import modules.globals import modules.metadata import modules.ui as ui from modules.processors.frame.core import get_frame_processors_modules -from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path - -if 'ROCMExecutionProvider' in modules.globals.execution_providers: - del torch +from modules.utilities import ( + has_image_extension, + is_image, + is_video, + detect_fps, + create_video, + extract_frames, + get_temp_frame_paths, + restore_audio, + create_temp, + move_temp, + clean_temp, + normalize_output_path +) +# Filter warnings warnings.filterwarnings('ignore', category=FutureWarning, module='insightface') warnings.filterwarnings('ignore', category=UserWarning, module='torchvision') +# Cross-platform resource management +if platform.system() == 'Darwin' and 'ROCMExecutionProvider' in modules.globals.execution_providers: + del torch + def parse_args() -> None: signal.signal(signal.SIGINT, lambda signal_number, frame: destroy()) program = argparse.ArgumentParser() - program.add_argument('-s', '--source', help='select an source image', dest='source_path') - program.add_argument('-t', '--target', help='select an target image or video', dest='target_path') - program.add_argument('-o', '--output', help='select output file or directory', dest='output_path') - program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+') - program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False) - program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True) - program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False) - program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False) - program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9']) - program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]') - program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory()) - program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+') - program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads()) - program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}') + program.add_argument('-s', '--source', help='Select a source image', dest='source_path') + program.add_argument('-t', '--target', help='Select a target image or video', dest='target_path') + program.add_argument('-o', '--output', help='Select output file or directory', dest='output_path') + program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor', + default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+') + program.add_argument('--keep-fps', help='Keep original fps', dest='keep_fps', action='store_true', default=False) + program.add_argument('--keep-audio', help='Keep original audio', dest='keep_audio', action='store_true', default=True) + program.add_argument('--keep-frames', help='Keep temporary frames', dest='keep_frames', action='store_true', default=False) + program.add_argument('--many-faces', help='Process every face', dest='many_faces', action='store_true', default=False) + program.add_argument('--video-encoder', help='Adjust output video encoder', dest='video_encoder', default='libx264', + choices=['libx264', 'libx265', 'libvpx-vp9']) + program.add_argument('--video-quality', help='Adjust output video quality', dest='video_quality', type=int, default=18, + choices=range(52), metavar='[0-51]') + program.add_argument('--max-memory', help='Maximum amount of RAM in GB', dest='max_memory', type=int, + default=suggest_max_memory()) + program.add_argument('--execution-provider', help='Execution provider', dest='execution_provider', default=['cpu'], + choices=suggest_execution_providers(), nargs='+') + program.add_argument('--execution-threads', help='Number of execution threads', dest='execution_threads', type=int, + default=suggest_execution_threads()) + program.add_argument('-v', '--version', action='version', + version=f'{modules.metadata.name} {modules.metadata.version}') - # register deprecated args + # Register deprecated args program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated') program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int) program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated') @@ -56,7 +79,8 @@ def parse_args() -> None: modules.globals.source_path = args.source_path modules.globals.target_path = args.target_path - modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path) + modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, + args.output_path) modules.globals.frame_processors = args.frame_processor modules.globals.headless = args.source_path or args.target_path or args.output_path modules.globals.keep_fps = args.keep_fps @@ -69,19 +93,22 @@ def parse_args() -> None: modules.globals.execution_providers = decode_execution_providers(args.execution_provider) modules.globals.execution_threads = args.execution_threads - #for ENHANCER tumbler: - if 'face_enhancer' in args.frame_processor: - modules.globals.fp_ui['face_enhancer'] = True - else: - modules.globals.fp_ui['face_enhancer'] = False - + # Handle face enhancer tumbler + modules.globals.fp_ui['face_enhancer'] = 'face_enhancer' in args.frame_processor + modules.globals.nsfw = False - # translate deprecated args + # Handle deprecated arguments + handle_deprecated_args(args) + + +def handle_deprecated_args(args) -> None: + """Handle deprecated arguments by translating them to the new format.""" if args.source_path_deprecated: print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m') modules.globals.source_path = args.source_path_deprecated - modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path) + modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, + args.output_path) if args.cpu_cores_deprecated: print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m') modules.globals.execution_threads = args.cpu_cores_deprecated @@ -92,7 +119,7 @@ def parse_args() -> None: print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m') modules.globals.execution_providers = decode_execution_providers(['cuda']) if args.gpu_vendor_deprecated == 'amd': - print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m') + print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider rocm instead.\033[0m') modules.globals.execution_providers = decode_execution_providers(['rocm']) if args.gpu_threads_deprecated: print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m') @@ -100,18 +127,22 @@ def parse_args() -> None: def encode_execution_providers(execution_providers: List[str]) -> List[str]: - return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers] + return [provider.replace('ExecutionProvider', '').lower() for provider in execution_providers] def decode_execution_providers(execution_providers: List[str]) -> List[str]: - return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers())) - if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)] + available_providers = onnxruntime.get_available_providers() + encoded_providers = encode_execution_providers(available_providers) + + selected_providers = [available_providers[encoded_providers.index(req)] for req in execution_providers + if req in encoded_providers] + + # Default to CPU if no suitable providers are found + return selected_providers if selected_providers else ['CPUExecutionProvider'] def suggest_max_memory() -> int: - if platform.system().lower() == 'darwin': - return 4 - return 16 + return 4 if platform.system().lower() == 'darwin' else 16 def suggest_execution_providers() -> List[str]: @@ -119,34 +150,41 @@ def suggest_execution_providers() -> List[str]: def suggest_execution_threads() -> int: - if 'DmlExecutionProvider' in modules.globals.execution_providers: + if 'dml' in modules.globals.execution_providers: return 1 - if 'ROCMExecutionProvider' in modules.globals.execution_providers: + if 'rocm' in modules.globals.execution_providers: return 1 return 8 def limit_resources() -> None: - # prevent tensorflow memory leak + # Prevent TensorFlow memory leak gpus = tensorflow.config.experimental.list_physical_devices('GPU') for gpu in gpus: tensorflow.config.experimental.set_memory_growth(gpu, True) - # limit memory usage + + # Limit memory usage if modules.globals.max_memory: memory = modules.globals.max_memory * 1024 ** 3 if platform.system().lower() == 'darwin': - memory = modules.globals.max_memory * 1024 ** 6 - if platform.system().lower() == 'windows': + memory = modules.globals.max_memory * 1024 ** 3 + elif platform.system().lower() == 'windows': import ctypes kernel32 = ctypes.windll.kernel32 kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory)) else: import resource - resource.setrlimit(resource.RLIMIT_DATA, (memory, memory)) - + try: + soft, hard = resource.getrlimit(resource.RLIMIT_DATA) + if memory > hard: + print(f"Warning: Requested memory limit {memory / (1024 ** 3)} GB exceeds system's hard limit. Setting to maximum allowed {hard / (1024 ** 3)} GB.") + memory = hard + resource.setrlimit(resource.RLIMIT_DATA, (memory, memory)) + except ValueError as e: + print(f"Warning: Could not set memory limit: {e}. Continuing with default limits.") def release_resources() -> None: - if 'CUDAExecutionProvider' in modules.globals.execution_providers: + if 'cuda' in modules.globals.execution_providers: torch.cuda.empty_cache() @@ -157,12 +195,15 @@ def pre_check() -> bool: if not shutil.which('ffmpeg'): update_status('ffmpeg is not installed.') return False + if 'cuda' in modules.globals.execution_providers and not torch.cuda.is_available(): + update_status('CUDA is not available. Please check your GPU or CUDA installation.') + return False return True def update_status(message: str, scope: str = 'DLC.CORE') -> None: print(f'[{scope}] {message}') - if not modules.globals.headless: + if not modules.globals.headless and ui.status_label: ui.update_status(message) @@ -170,37 +211,61 @@ def start() -> None: for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): if not frame_processor.pre_start(): return - # process image to image + + # Process image to image if has_image_extension(modules.globals.target_path): - if modules.globals.nsfw == False: - from modules.predicter import predict_image - if predict_image(modules.globals.target_path): - destroy() - shutil.copy2(modules.globals.target_path, modules.globals.output_path) - for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - update_status('Progressing...', frame_processor.NAME) - frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path) - release_resources() - if is_image(modules.globals.target_path): - update_status('Processing to image succeed!') - else: - update_status('Processing to image failed!') + process_image_to_image() return - # process image to videos - if modules.globals.nsfw == False: + + # Process image to video + process_image_to_video() + + +def process_image_to_image() -> None: + if not modules.globals.nsfw: + from modules.predicter import predict_image + if predict_image(modules.globals.target_path): + destroy() + + shutil.copy2(modules.globals.target_path, modules.globals.output_path) + for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): + update_status('Processing...', frame_processor.NAME) + frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path) + release_resources() + + if is_image(modules.globals.target_path): + update_status('Processing to image succeeded!') + else: + update_status('Processing to image failed!') + + +def process_image_to_video() -> None: + if not modules.globals.nsfw: from modules.predicter import predict_video if predict_video(modules.globals.target_path): destroy() - update_status('Creating temp resources...') + + update_status('Creating temporary resources...') create_temp(modules.globals.target_path) update_status('Extracting frames...') extract_frames(modules.globals.target_path) temp_frame_paths = get_temp_frame_paths(modules.globals.target_path) for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - update_status('Progressing...', frame_processor.NAME) + update_status('Processing...', frame_processor.NAME) frame_processor.process_video(modules.globals.source_path, temp_frame_paths) release_resources() - # handles fps + + handle_video_fps() + handle_video_audio() + clean_temp(modules.globals.target_path) + + if is_video(modules.globals.target_path): + update_status('Processing to video succeeded!') + else: + update_status('Processing to video failed!') + + +def handle_video_fps() -> None: if modules.globals.keep_fps: update_status('Detecting fps...') fps = detect_fps(modules.globals.target_path) @@ -209,7 +274,9 @@ def start() -> None: else: update_status('Creating video with 30.0 fps...') create_video(modules.globals.target_path) - # handle audio + + +def handle_video_audio() -> None: if modules.globals.keep_audio: if modules.globals.keep_fps: update_status('Restoring audio...') @@ -218,12 +285,6 @@ def start() -> None: restore_audio(modules.globals.target_path, modules.globals.output_path) else: move_temp(modules.globals.target_path, modules.globals.output_path) - # clean and validate - clean_temp(modules.globals.target_path) - if is_video(modules.globals.target_path): - update_status('Processing to video succeed!') - else: - update_status('Processing to video failed!') def destroy() -> None: @@ -233,15 +294,20 @@ def destroy() -> None: def run() -> None: - parse_args() - if not pre_check(): - return - for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): - if not frame_processor.pre_check(): + try: + parse_args() + if not pre_check(): return - limit_resources() - if modules.globals.headless: - start() - else: - window = ui.init(start, destroy) - window.mainloop() + for frame_processor in get_frame_processors_modules(modules.globals.frame_processors): + if not frame_processor.pre_check(): + return + limit_resources() + if modules.globals.headless: + start() + else: + window = ui.init(start, destroy) + window.mainloop() + except Exception as e: + print(f"UI initialization failed: {str(e)}") + update_status(f"UI initialization failed: {str(e)}") + destroy() # Ensure any resources are cleaned up on failure diff --git a/modules/face_analyser.py b/modules/face_analyser.py index f2d46bf..9b42b70 100644 --- a/modules/face_analyser.py +++ b/modules/face_analyser.py @@ -1,31 +1,27 @@ -from typing import Any +from typing import Any, Optional import insightface import modules.globals from modules.typing import Frame -FACE_ANALYSER = None +FACE_ANALYSER: Optional[insightface.app.FaceAnalysis] = None - -def get_face_analyser() -> Any: +def get_face_analyser() -> insightface.app.FaceAnalysis: global FACE_ANALYSER if FACE_ANALYSER is None: - FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=modules.globals.execution_providers) + FACE_ANALYSER = insightface.app.FaceAnalysis( + name='buffalo_l', + providers=modules.globals.execution_providers + ) FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640)) + return FACE_ANALYSER +def get_one_face(frame: Frame) -> Optional[Any]: + faces = get_face_analyser().get(frame) + return min(faces, key=lambda x: x.bbox[0], default=None) -def get_one_face(frame: Frame) -> Any: - face = get_face_analyser().get(frame) - try: - return min(face, key=lambda x: x.bbox[0]) - except ValueError: - return None - - -def get_many_faces(frame: Frame) -> Any: - try: - return get_face_analyser().get(frame) - except IndexError: - return None +def get_many_faces(frame: Frame) -> Optional[Any]: + faces = get_face_analyser().get(frame) + return faces if faces else None diff --git a/modules/predicter.py b/modules/predicter.py index bb51b0a..86c6f7d 100644 --- a/modules/predicter.py +++ b/modules/predicter.py @@ -1,25 +1,24 @@ -import numpy +import numpy as np import opennsfw2 from PIL import Image - from modules.typing import Frame MAX_PROBABILITY = 0.85 +# Preload the model once for efficiency +model = opennsfw2.make_open_nsfw_model() def predict_frame(target_frame: Frame) -> bool: image = Image.fromarray(target_frame) image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO) - model = opennsfw2.make_open_nsfw_model() - views = numpy.expand_dims(image, axis=0) + views = np.expand_dims(image, axis=0) _, probability = model.predict(views)[0] return probability > MAX_PROBABILITY - def predict_image(target_path: str) -> bool: - return opennsfw2.predict_image(target_path) > MAX_PROBABILITY - + probability = opennsfw2.predict_image(target_path, model=model) + return probability > MAX_PROBABILITY def predict_video(target_path: str) -> bool: - _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100) + _, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100, model=model) return any(probability > MAX_PROBABILITY for probability in probabilities) diff --git a/modules/processors/frame/core.py b/modules/processors/frame/core.py index 7d76704..092c8c4 100644 --- a/modules/processors/frame/core.py +++ b/modules/processors/frame/core.py @@ -17,57 +17,56 @@ FRAME_PROCESSORS_INTERFACE = [ 'process_video' ] - -def load_frame_processor_module(frame_processor: str) -> Any: +def load_frame_processor_module(frame_processor: str) -> ModuleType: try: frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}') + # Ensure all required methods are present for method_name in FRAME_PROCESSORS_INTERFACE: if not hasattr(frame_processor_module, method_name): - sys.exit() + raise AttributeError(f"Missing required method {method_name} in {frame_processor} module.") except ImportError: - print(f"Frame processor {frame_processor} not found") - sys.exit() + print(f"Error: Frame processor '{frame_processor}' not found.") + sys.exit(1) + except AttributeError as e: + print(e) + sys.exit(1) + return frame_processor_module - def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]: global FRAME_PROCESSORS_MODULES if not FRAME_PROCESSORS_MODULES: - for frame_processor in frame_processors: - frame_processor_module = load_frame_processor_module(frame_processor) - FRAME_PROCESSORS_MODULES.append(frame_processor_module) + FRAME_PROCESSORS_MODULES = [load_frame_processor_module(fp) for fp in frame_processors] + set_frame_processors_modules_from_ui(frame_processors) return FRAME_PROCESSORS_MODULES def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None: global FRAME_PROCESSORS_MODULES for frame_processor, state in modules.globals.fp_ui.items(): - if state == True and frame_processor not in frame_processors: - frame_processor_module = load_frame_processor_module(frame_processor) - FRAME_PROCESSORS_MODULES.append(frame_processor_module) + if state and frame_processor not in frame_processors: + module = load_frame_processor_module(frame_processor) + FRAME_PROCESSORS_MODULES.append(module) modules.globals.frame_processors.append(frame_processor) - if state == False: - try: - frame_processor_module = load_frame_processor_module(frame_processor) - FRAME_PROCESSORS_MODULES.remove(frame_processor_module) - modules.globals.frame_processors.remove(frame_processor) - except: - pass + elif not state and frame_processor in frame_processors: + module = load_frame_processor_module(frame_processor) + FRAME_PROCESSORS_MODULES.remove(module) + modules.globals.frame_processors.remove(frame_processor) def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None: with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor: - futures = [] - for path in temp_frame_paths: - future = executor.submit(process_frames, source_path, [path], progress) - futures.append(future) + futures = [executor.submit(process_frames, source_path, [path], progress) for path in temp_frame_paths] for future in futures: future.result() - -def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None: +def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None: progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]' total = len(frame_paths) with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress: - progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory}) + progress.set_postfix({ + 'execution_providers': modules.globals.execution_providers, + 'execution_threads': modules.globals.execution_threads, + 'max_memory': modules.globals.max_memory + }) multi_process_frame(source_path, frame_paths, process_frames, progress) diff --git a/modules/processors/frame/face_enhancer.py b/modules/processors/frame/face_enhancer.py index 608071a..a75f5c0 100644 --- a/modules/processors/frame/face_enhancer.py +++ b/modules/processors/frame/face_enhancer.py @@ -8,7 +8,7 @@ import modules.globals import modules.processors.frame.core from modules.core import update_status from modules.face_analyser import get_one_face -from modules.typing import Frame, Face +from modules.typing import Frame, Face # Ensure these are imported from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video FACE_ENHANCER = None @@ -16,34 +16,26 @@ THREAD_SEMAPHORE = threading.Semaphore() THREAD_LOCK = threading.Lock() NAME = 'DLC.FACE-ENHANCER' - def pre_check() -> bool: download_directory_path = resolve_relative_path('..\models') conditional_download(download_directory_path, ['https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth']) return True - def pre_start() -> bool: if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path): update_status('Select an image or video for target path.', NAME) return False return True - def get_face_enhancer() -> Any: global FACE_ENHANCER with THREAD_LOCK: if FACE_ENHANCER is None: - if os.name == 'nt': - model_path = resolve_relative_path('..\models\GFPGANv1.4.pth') - # todo: set models path https://github.com/TencentARC/GFPGAN/issues/399 - else: - model_path = resolve_relative_path('../models/GFPGANv1.4.pth') - FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined] + model_path = resolve_relative_path('../models/GFPGANv1.4.pth') + FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined] return FACE_ENHANCER - def enhance_face(temp_frame: Frame) -> Frame: with THREAD_SEMAPHORE: _, _, temp_frame = get_face_enhancer().enhance( @@ -52,14 +44,12 @@ def enhance_face(temp_frame: Frame) -> Frame: ) return temp_frame - def process_frame(source_face: Face, temp_frame: Frame) -> Frame: target_face = get_one_face(temp_frame) if target_face: temp_frame = enhance_face(temp_frame) return temp_frame - def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None: for temp_frame_path in temp_frame_paths: temp_frame = cv2.imread(temp_frame_path) @@ -68,12 +58,10 @@ def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any if progress: progress.update(1) - def process_image(source_path: str, target_path: str, output_path: str) -> None: target_frame = cv2.imread(target_path) result = process_frame(None, target_frame) cv2.imwrite(output_path, result) - def process_video(source_path: str, temp_frame_paths: List[str]) -> None: modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames) diff --git a/modules/processors/frame/face_swapper.py b/modules/processors/frame/face_swapper.py index 4b4a222..7bf5550 100644 --- a/modules/processors/frame/face_swapper.py +++ b/modules/processors/frame/face_swapper.py @@ -2,6 +2,7 @@ from typing import Any, List import cv2 import insightface import threading +import os import modules.globals import modules.processors.frame.core @@ -14,26 +15,25 @@ FACE_SWAPPER = None THREAD_LOCK = threading.Lock() NAME = 'DLC.FACE-SWAPPER' - def pre_check() -> bool: download_directory_path = resolve_relative_path('../models') - conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx']) + conditional_download(download_directory_path, [ + 'https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx' + ]) return True - def pre_start() -> bool: if not is_image(modules.globals.source_path): update_status('Select an image for source path.', NAME) return False elif not get_one_face(cv2.imread(modules.globals.source_path)): - update_status('No face in source path detected.', NAME) + update_status('No face detected in the source path.', NAME) return False if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path): update_status('Select an image or video for target path.', NAME) return False return True - def get_face_swapper() -> Any: global FACE_SWAPPER @@ -43,11 +43,9 @@ def get_face_swapper() -> Any: FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=modules.globals.execution_providers) return FACE_SWAPPER - def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame: return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True) - def process_frame(source_face: Face, temp_frame: Frame) -> Frame: if modules.globals.many_faces: many_faces = get_many_faces(temp_frame) @@ -60,7 +58,6 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame: temp_frame = swap_face(source_face, target_face, temp_frame) return temp_frame - def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None: source_face = get_one_face(cv2.imread(source_path)) for temp_frame_path in temp_frame_paths: @@ -69,18 +66,15 @@ def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any result = process_frame(source_face, temp_frame) cv2.imwrite(temp_frame_path, result) except Exception as exception: - print(exception) - pass + print(f"Error processing frame {temp_frame_path}: {exception}") if progress: progress.update(1) - def process_image(source_path: str, target_path: str, output_path: str) -> None: source_face = get_one_face(cv2.imread(source_path)) target_frame = cv2.imread(target_path) result = process_frame(source_face, target_frame) cv2.imwrite(output_path, result) - def process_video(source_path: str, temp_frame_paths: List[str]) -> None: modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames) diff --git a/modules/ui.json b/modules/ui.json index 0954578..1d1db99 100644 --- a/modules/ui.json +++ b/modules/ui.json @@ -1,76 +1,57 @@ { "CTk": { - "fg_color": ["gray95", "gray10"] + "fg_color": ["#FFFFFF", "#2D2D2D"] }, "CTkToplevel": { - "fg_color": ["gray95", "gray10"] + "fg_color": ["#FFFFFF", "#2D2D2D"] }, "CTkFrame": { "corner_radius": 0, "border_width": 0, - "fg_color": ["gray90", "gray13"], - "top_fg_color": ["gray85", "gray16"], - "border_color": ["gray65", "gray28"] + "fg_color": ["#F0F0F0", "#3C3C3C"], + "top_fg_color": ["#E0E0E0", "#4B4B4B"], + "border_color": ["#B0B0B0", "#5A5A5A"] }, "CTkButton": { "corner_radius": 0, "border_width": 0, - "fg_color": ["#2aa666", "#1f538d"], - "hover_color": ["#3cb666", "#14375e"], - "border_color": ["#3e4a40", "#949A9F"], - "text_color": ["#f3faf6", "#f3faf6"], + "fg_color": ["#007ACC", "#007ACC"], + "hover_color": ["#005EA3", "#005EA3"], + "border_color": ["#004C8A", "#004C8A"], + "text_color": ["#FFFFFF", "#FFFFFF"], "text_color_disabled": ["gray74", "gray60"] }, "CTkLabel": { "corner_radius": 0, "fg_color": "transparent", - "text_color": ["gray14", "gray84"] + "text_color": ["#000000", "#FFFFFF"] }, "CTkEntry": { "corner_radius": 0, "border_width": 2, - "fg_color": ["#F9F9FA", "#343638"], - "border_color": ["#979DA2", "#565B5E"], - "text_color": ["gray14", "gray84"], + "fg_color": ["#FFFFFF", "#333333"], + "border_color": ["#A0A0A0", "#5A5A5A"], + "text_color": ["#000000", "#FFFFFF"], "placeholder_text_color": ["gray52", "gray62"] }, - "CTkCheckbox": { - "corner_radius": 0, - "border_width": 3, - "fg_color": ["#2aa666", "#1f538d"], - "border_color": ["#3e4a40", "#949A9F"], - "hover_color": ["#3cb666", "#14375e"], - "checkmark_color": ["#f3faf6", "gray90"], - "text_color": ["gray14", "gray84"], - "text_color_disabled": ["gray60", "gray45"] - }, "CTkSwitch": { "corner_radius": 1000, "border_width": 3, "button_length": 0, "fg_color": ["#939BA2", "#4A4D50"], "progress_color": ["#2aa666", "#1f538d"], - "button_color": ["gray36", "#D5D9DE"], - "button_hover_color": ["gray20", "gray100"], - "text_color": ["gray14", "gray84"], + "button_color": ["#444444", "#D5D9DE"], + "button_hover_color": ["#333333", "#FFFFFF"], + "text_color": ["#000000", "#FFFFFF"], "text_color_disabled": ["gray60", "gray45"] }, - "CTkRadiobutton": { - "corner_radius": 1000, - "border_width_checked": 6, - "border_width_unchecked": 3, + "CTkOptionMenu": { + "corner_radius": 0, "fg_color": ["#2aa666", "#1f538d"], - "border_color": ["#3e4a40", "#949A9F"], - "hover_color": ["#3cb666", "#14375e"], - "text_color": ["gray14", "gray84"], - "text_color_disabled": ["gray60", "gray45"] - }, - "CTkProgressBar": { - "corner_radius": 1000, - "border_width": 0, - "fg_color": ["#939BA2", "#4A4D50"], - "progress_color": ["#2aa666", "#1f538d"], - "border_color": ["gray", "gray"] + "button_color": ["#3cb666", "#14375e"], + "button_hover_color": ["#234567", "#1e2c40"], + "text_color": ["#FFFFFF", "#FFFFFF"], + "text_color_disabled": ["gray74", "gray60"] }, "CTkSlider": { "corner_radius": 1000, @@ -82,59 +63,6 @@ "button_color": ["#2aa666", "#1f538d"], "button_hover_color": ["#3cb666", "#14375e"] }, - "CTkOptionMenu": { - "corner_radius": 0, - "fg_color": ["#2aa666", "#1f538d"], - "button_color": ["#3cb666", "#14375e"], - "button_hover_color": ["#234567", "#1e2c40"], - "text_color": ["#f3faf6", "#f3faf6"], - "text_color_disabled": ["gray74", "gray60"] - }, - "CTkComboBox": { - "corner_radius": 0, - "border_width": 2, - "fg_color": ["#F9F9FA", "#343638"], - "border_color": ["#979DA2", "#565B5E"], - "button_color": ["#979DA2", "#565B5E"], - "button_hover_color": ["#6E7174", "#7A848D"], - "text_color": ["gray14", "gray84"], - "text_color_disabled": ["gray50", "gray45"] - }, - "CTkScrollbar": { - "corner_radius": 1000, - "border_spacing": 4, - "fg_color": "transparent", - "button_color": ["gray55", "gray41"], - "button_hover_color": ["gray40", "gray53"] - }, - "CTkSegmentedButton": { - "corner_radius": 0, - "border_width": 2, - "fg_color": ["#979DA2", "gray29"], - "selected_color": ["#2aa666", "#1f538d"], - "selected_hover_color": ["#3cb666", "#14375e"], - "unselected_color": ["#979DA2", "gray29"], - "unselected_hover_color": ["gray70", "gray41"], - "text_color": ["#f3faf6", "#f3faf6"], - "text_color_disabled": ["gray74", "gray60"] - }, - "CTkTextbox": { - "corner_radius": 0, - "border_width": 0, - "fg_color": ["gray100", "gray20"], - "border_color": ["#979DA2", "#565B5E"], - "text_color": ["gray14", "gray84"], - "scrollbar_button_color": ["gray55", "gray41"], - "scrollbar_button_hover_color": ["gray40", "gray53"] - }, - "CTkScrollableFrame": { - "label_fg_color": ["gray80", "gray21"] - }, - "DropdownMenu": { - "fg_color": ["gray90", "gray20"], - "hover_color": ["gray75", "gray28"], - "text_color": ["gray14", "gray84"] - }, "CTkFont": { "macOS": { "family": "Avenir", @@ -152,7 +80,12 @@ "weight": "normal" } }, + "DropdownMenu": { + "fg_color": ["#FFFFFF", "#2D2D2D"], + "hover_color": ["#E0E0E0", "#4B4B4B"], + "text_color": ["#000000", "#FFFFFF"] + }, "URL": { - "text_color": ["gray74", "gray60"] + "text_color": ["#007ACC", "#1E90FF"] } } diff --git a/modules/ui.py b/modules/ui.py index 166b1ca..0a1bb39 100644 --- a/modules/ui.py +++ b/modules/ui.py @@ -1,10 +1,17 @@ import os +import platform import webbrowser import customtkinter as ctk from typing import Callable, Tuple import cv2 from PIL import Image, ImageOps +# Import OS-specific modules only when necessary +if platform.system() == 'Darwin': # macOS + import objc + from Foundation import NSObject + import AVFoundation + import modules.globals import modules.metadata from modules.face_analyser import get_one_face @@ -33,9 +40,47 @@ status_label = None img_ft, vid_ft = modules.globals.file_types +def check_camera_permissions(): + """Check and request camera access permission on macOS.""" + if platform.system() == 'Darwin': # macOS-specific + auth_status = AVFoundation.AVCaptureDevice.authorizationStatusForMediaType_(AVFoundation.AVMediaTypeVideo) + + if auth_status == AVFoundation.AVAuthorizationStatusNotDetermined: + # Request access to the camera + def completion_handler(granted): + if granted: + print("Access granted to the camera.") + else: + print("Access denied to the camera.") + + AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(AVFoundation.AVMediaTypeVideo, completion_handler) + elif auth_status == AVFoundation.AVAuthorizationStatusAuthorized: + print("Camera access already authorized.") + elif auth_status == AVFoundation.AVAuthorizationStatusDenied: + print("Camera access denied. Please enable it in System Preferences.") + elif auth_status == AVFoundation.AVAuthorizationStatusRestricted: + print("Camera access restricted. The app is not allowed to use the camera.") + + +def select_camera(camera_name: str): + """Select the appropriate camera based on its name (cross-platform).""" + if platform.system() == 'Darwin': # macOS-specific + devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo) + for device in devices: + if device.localizedName() == camera_name: + return device + elif platform.system() == 'Windows' or platform.system() == 'Linux': + # On Windows/Linux, simply return the camera name as OpenCV can handle it by index + return camera_name + return None + + def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk: global ROOT, PREVIEW + if platform.system() == 'Darwin': # macOS-specific + check_camera_permissions() # Check camera permissions before initializing the UI + ROOT = create_root(start, destroy) PREVIEW = create_preview(ROOT) @@ -49,10 +94,11 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C ctk.set_appearance_mode('system') ctk.set_default_color_theme(resolve_relative_path('ui.json')) + print("Creating root window...") + root = ctk.CTk() root.minsize(ROOT_WIDTH, ROOT_HEIGHT) root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}') - root.configure() root.protocol('WM_DELETE_WINDOW', lambda: destroy()) source_label = ctk.CTkLabel(root, text=None) @@ -61,10 +107,10 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C target_label = ctk.CTkLabel(root, text=None) target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25) - source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path()) + source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=select_source_path) source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1) - target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path()) + target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=select_target_path) target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1) keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps) @@ -75,9 +121,8 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get())) keep_frames_switch.place(relx=0.1, rely=0.65) - # for FRAME PROCESSOR ENHANCER tumbler: enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer']) - enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get())) + enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer', enhancer_value.get())) enhancer_switch.place(relx=0.1, rely=0.7) keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio) @@ -93,57 +138,51 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C nsfw_switch.place(relx=0.6, rely=0.7) start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start)) - start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05) + start_button.place(relx=0.15, rely=0.8, relwidth=0.2, relheight=0.05) - stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy()) - stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05) + stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy) + stop_button.place(relx=0.4, rely=0.8, relwidth=0.2, relheight=0.05) - preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview()) - preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05) + preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview) + preview_button.place(relx=0.65, rely=0.8, relwidth=0.2, relheight=0.05) - # --- Camera Selection --- camera_label = ctk.CTkLabel(root, text="Select Camera:") camera_label.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05) available_cameras = get_available_cameras() - - # Convert camera indices to strings for CTkOptionMenu available_camera_strings = [str(cam) for cam in available_cameras] camera_variable = ctk.StringVar(value=available_camera_strings[0] if available_camera_strings else "No cameras found") - camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, - values=available_camera_strings) + camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, values=available_camera_strings) camera_optionmenu.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05) - live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(int(camera_variable.get()))) + live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(camera_variable.get())) live_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05) - # --- End Camera Selection --- status_label = ctk.CTkLabel(root, text=None, justify='center') - status_label.place(relx=0.1, rely=0.9, relwidth=0.8) + status_label.place(relx=0.1, relwidth=0.8, rely=0.9) donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2') donate_label.place(relx=0.1, rely=0.95, relwidth=0.8) donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color')) - donate_label.bind('