Refactor and Optimize Cross-Platform Support, Error Handling, and UI Enhancements

This commit is contained in:
Zohar Babin 2024-08-10 22:36:12 -04:00
parent 8d3072d906
commit 6b0cc74957
12 changed files with 453 additions and 387 deletions

1
.python-version Normal file
View File

@ -0,0 +1 @@
3.10.14

View File

@ -1,20 +1,38 @@
from typing import Any
from typing import Any, Optional
import cv2
def get_video_frame(video_path: str, frame_number: int = 0) -> Any:
def get_video_frame(video_path: str, frame_number: int = 0) -> Optional[Any]:
"""Retrieve a specific frame from a video."""
capture = cv2.VideoCapture(video_path)
frame_total = capture.get(cv2.CAP_PROP_FRAME_COUNT)
capture.set(cv2.CAP_PROP_POS_FRAMES, min(frame_total, frame_number - 1))
if not capture.isOpened():
print(f"Error: Cannot open video file {video_path}")
return None
frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
# Ensure frame_number is within the valid range
frame_number = max(0, min(frame_number, frame_total - 1))
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
capture.release()
if has_frame:
return frame
return None
if not has_frame:
print(f"Error: Cannot read frame {frame_number} from {video_path}")
return None
return frame
def get_video_frame_total(video_path: str) -> int:
"""Get the total number of frames in a video."""
capture = cv2.VideoCapture(video_path)
video_frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
if not capture.isOpened():
print(f"Error: Cannot open video file {video_path}")
return 0
frame_total = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
capture.release()
return video_frame_total
return frame_total

View File

@ -1,16 +1,17 @@
import os
import sys
# single thread doubles cuda performance - needs to be set before torch import
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
# reduce tensorflow log level
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import warnings
from typing import List
import platform
import signal
import shutil
import argparse
from typing import List
# Set environment variables for CUDA performance and TensorFlow logging
if any(arg.startswith('--execution-provider') for arg in sys.argv):
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import torch
import onnxruntime
import tensorflow
@ -19,34 +20,56 @@ import modules.globals
import modules.metadata
import modules.ui as ui
from modules.processors.frame.core import get_frame_processors_modules
from modules.utilities import has_image_extension, is_image, is_video, detect_fps, create_video, extract_frames, get_temp_frame_paths, restore_audio, create_temp, move_temp, clean_temp, normalize_output_path
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
from modules.utilities import (
has_image_extension,
is_image,
is_video,
detect_fps,
create_video,
extract_frames,
get_temp_frame_paths,
restore_audio,
create_temp,
move_temp,
clean_temp,
normalize_output_path
)
# Filter warnings
warnings.filterwarnings('ignore', category=FutureWarning, module='insightface')
warnings.filterwarnings('ignore', category=UserWarning, module='torchvision')
# Cross-platform resource management
if platform.system() == 'Darwin' and 'ROCMExecutionProvider' in modules.globals.execution_providers:
del torch
def parse_args() -> None:
signal.signal(signal.SIGINT, lambda signal_number, frame: destroy())
program = argparse.ArgumentParser()
program.add_argument('-s', '--source', help='select an source image', dest='source_path')
program.add_argument('-t', '--target', help='select an target image or video', dest='target_path')
program.add_argument('-o', '--output', help='select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='pipeline of frame processors', dest='frame_processor', default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-fps', help='keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--many-faces', help='process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-encoder', help='adjust output video encoder', dest='video_encoder', default='libx264', choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='adjust output video quality', dest='video_quality', type=int, default=18, choices=range(52), metavar='[0-51]')
program.add_argument('--max-memory', help='maximum amount of RAM in GB', dest='max_memory', type=int, default=suggest_max_memory())
program.add_argument('--execution-provider', help='execution provider', dest='execution_provider', default=['cpu'], choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='number of execution threads', dest='execution_threads', type=int, default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version', version=f'{modules.metadata.name} {modules.metadata.version}')
program.add_argument('-s', '--source', help='Select a source image', dest='source_path')
program.add_argument('-t', '--target', help='Select a target image or video', dest='target_path')
program.add_argument('-o', '--output', help='Select output file or directory', dest='output_path')
program.add_argument('--frame-processor', help='Pipeline of frame processors', dest='frame_processor',
default=['face_swapper'], choices=['face_swapper', 'face_enhancer'], nargs='+')
program.add_argument('--keep-fps', help='Keep original fps', dest='keep_fps', action='store_true', default=False)
program.add_argument('--keep-audio', help='Keep original audio', dest='keep_audio', action='store_true', default=True)
program.add_argument('--keep-frames', help='Keep temporary frames', dest='keep_frames', action='store_true', default=False)
program.add_argument('--many-faces', help='Process every face', dest='many_faces', action='store_true', default=False)
program.add_argument('--video-encoder', help='Adjust output video encoder', dest='video_encoder', default='libx264',
choices=['libx264', 'libx265', 'libvpx-vp9'])
program.add_argument('--video-quality', help='Adjust output video quality', dest='video_quality', type=int, default=18,
choices=range(52), metavar='[0-51]')
program.add_argument('--max-memory', help='Maximum amount of RAM in GB', dest='max_memory', type=int,
default=suggest_max_memory())
program.add_argument('--execution-provider', help='Execution provider', dest='execution_provider', default=['cpu'],
choices=suggest_execution_providers(), nargs='+')
program.add_argument('--execution-threads', help='Number of execution threads', dest='execution_threads', type=int,
default=suggest_execution_threads())
program.add_argument('-v', '--version', action='version',
version=f'{modules.metadata.name} {modules.metadata.version}')
# register deprecated args
# Register deprecated args
program.add_argument('-f', '--face', help=argparse.SUPPRESS, dest='source_path_deprecated')
program.add_argument('--cpu-cores', help=argparse.SUPPRESS, dest='cpu_cores_deprecated', type=int)
program.add_argument('--gpu-vendor', help=argparse.SUPPRESS, dest='gpu_vendor_deprecated')
@ -56,7 +79,8 @@ def parse_args() -> None:
modules.globals.source_path = args.source_path
modules.globals.target_path = args.target_path
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path, args.output_path)
modules.globals.output_path = normalize_output_path(modules.globals.source_path, modules.globals.target_path,
args.output_path)
modules.globals.frame_processors = args.frame_processor
modules.globals.headless = args.source_path or args.target_path or args.output_path
modules.globals.keep_fps = args.keep_fps
@ -69,19 +93,22 @@ def parse_args() -> None:
modules.globals.execution_providers = decode_execution_providers(args.execution_provider)
modules.globals.execution_threads = args.execution_threads
#for ENHANCER tumbler:
if 'face_enhancer' in args.frame_processor:
modules.globals.fp_ui['face_enhancer'] = True
else:
modules.globals.fp_ui['face_enhancer'] = False
# Handle face enhancer tumbler
modules.globals.fp_ui['face_enhancer'] = 'face_enhancer' in args.frame_processor
modules.globals.nsfw = False
# translate deprecated args
# Handle deprecated arguments
handle_deprecated_args(args)
def handle_deprecated_args(args) -> None:
"""Handle deprecated arguments by translating them to the new format."""
if args.source_path_deprecated:
print('\033[33mArgument -f and --face are deprecated. Use -s and --source instead.\033[0m')
modules.globals.source_path = args.source_path_deprecated
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path, args.output_path)
modules.globals.output_path = normalize_output_path(args.source_path_deprecated, modules.globals.target_path,
args.output_path)
if args.cpu_cores_deprecated:
print('\033[33mArgument --cpu-cores is deprecated. Use --execution-threads instead.\033[0m')
modules.globals.execution_threads = args.cpu_cores_deprecated
@ -92,7 +119,7 @@ def parse_args() -> None:
print('\033[33mArgument --gpu-vendor nvidia is deprecated. Use --execution-provider cuda instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['cuda'])
if args.gpu_vendor_deprecated == 'amd':
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider cuda instead.\033[0m')
print('\033[33mArgument --gpu-vendor amd is deprecated. Use --execution-provider rocm instead.\033[0m')
modules.globals.execution_providers = decode_execution_providers(['rocm'])
if args.gpu_threads_deprecated:
print('\033[33mArgument --gpu-threads is deprecated. Use --execution-threads instead.\033[0m')
@ -100,18 +127,22 @@ def parse_args() -> None:
def encode_execution_providers(execution_providers: List[str]) -> List[str]:
return [execution_provider.replace('ExecutionProvider', '').lower() for execution_provider in execution_providers]
return [provider.replace('ExecutionProvider', '').lower() for provider in execution_providers]
def decode_execution_providers(execution_providers: List[str]) -> List[str]:
return [provider for provider, encoded_execution_provider in zip(onnxruntime.get_available_providers(), encode_execution_providers(onnxruntime.get_available_providers()))
if any(execution_provider in encoded_execution_provider for execution_provider in execution_providers)]
available_providers = onnxruntime.get_available_providers()
encoded_providers = encode_execution_providers(available_providers)
selected_providers = [available_providers[encoded_providers.index(req)] for req in execution_providers
if req in encoded_providers]
# Default to CPU if no suitable providers are found
return selected_providers if selected_providers else ['CPUExecutionProvider']
def suggest_max_memory() -> int:
if platform.system().lower() == 'darwin':
return 4
return 16
return 4 if platform.system().lower() == 'darwin' else 16
def suggest_execution_providers() -> List[str]:
@ -119,34 +150,41 @@ def suggest_execution_providers() -> List[str]:
def suggest_execution_threads() -> int:
if 'DmlExecutionProvider' in modules.globals.execution_providers:
if 'dml' in modules.globals.execution_providers:
return 1
if 'ROCMExecutionProvider' in modules.globals.execution_providers:
if 'rocm' in modules.globals.execution_providers:
return 1
return 8
def limit_resources() -> None:
# prevent tensorflow memory leak
# Prevent TensorFlow memory leak
gpus = tensorflow.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tensorflow.config.experimental.set_memory_growth(gpu, True)
# limit memory usage
# Limit memory usage
if modules.globals.max_memory:
memory = modules.globals.max_memory * 1024 ** 3
if platform.system().lower() == 'darwin':
memory = modules.globals.max_memory * 1024 ** 6
if platform.system().lower() == 'windows':
memory = modules.globals.max_memory * 1024 ** 3
elif platform.system().lower() == 'windows':
import ctypes
kernel32 = ctypes.windll.kernel32
kernel32.SetProcessWorkingSetSize(-1, ctypes.c_size_t(memory), ctypes.c_size_t(memory))
else:
import resource
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
try:
soft, hard = resource.getrlimit(resource.RLIMIT_DATA)
if memory > hard:
print(f"Warning: Requested memory limit {memory / (1024 ** 3)} GB exceeds system's hard limit. Setting to maximum allowed {hard / (1024 ** 3)} GB.")
memory = hard
resource.setrlimit(resource.RLIMIT_DATA, (memory, memory))
except ValueError as e:
print(f"Warning: Could not set memory limit: {e}. Continuing with default limits.")
def release_resources() -> None:
if 'CUDAExecutionProvider' in modules.globals.execution_providers:
if 'cuda' in modules.globals.execution_providers:
torch.cuda.empty_cache()
@ -157,12 +195,15 @@ def pre_check() -> bool:
if not shutil.which('ffmpeg'):
update_status('ffmpeg is not installed.')
return False
if 'cuda' in modules.globals.execution_providers and not torch.cuda.is_available():
update_status('CUDA is not available. Please check your GPU or CUDA installation.')
return False
return True
def update_status(message: str, scope: str = 'DLC.CORE') -> None:
print(f'[{scope}] {message}')
if not modules.globals.headless:
if not modules.globals.headless and ui.status_label:
ui.update_status(message)
@ -170,37 +211,61 @@ def start() -> None:
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_start():
return
# process image to image
# Process image to image
if has_image_extension(modules.globals.target_path):
if modules.globals.nsfw == False:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy()
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeed!')
else:
update_status('Processing to image failed!')
process_image_to_image()
return
# process image to videos
if modules.globals.nsfw == False:
# Process image to video
process_image_to_video()
def process_image_to_image() -> None:
if not modules.globals.nsfw:
from modules.predicter import predict_image
if predict_image(modules.globals.target_path):
destroy()
shutil.copy2(modules.globals.target_path, modules.globals.output_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Processing...', frame_processor.NAME)
frame_processor.process_image(modules.globals.source_path, modules.globals.output_path, modules.globals.output_path)
release_resources()
if is_image(modules.globals.target_path):
update_status('Processing to image succeeded!')
else:
update_status('Processing to image failed!')
def process_image_to_video() -> None:
if not modules.globals.nsfw:
from modules.predicter import predict_video
if predict_video(modules.globals.target_path):
destroy()
update_status('Creating temp resources...')
update_status('Creating temporary resources...')
create_temp(modules.globals.target_path)
update_status('Extracting frames...')
extract_frames(modules.globals.target_path)
temp_frame_paths = get_temp_frame_paths(modules.globals.target_path)
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
update_status('Progressing...', frame_processor.NAME)
update_status('Processing...', frame_processor.NAME)
frame_processor.process_video(modules.globals.source_path, temp_frame_paths)
release_resources()
# handles fps
handle_video_fps()
handle_video_audio()
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeeded!')
else:
update_status('Processing to video failed!')
def handle_video_fps() -> None:
if modules.globals.keep_fps:
update_status('Detecting fps...')
fps = detect_fps(modules.globals.target_path)
@ -209,7 +274,9 @@ def start() -> None:
else:
update_status('Creating video with 30.0 fps...')
create_video(modules.globals.target_path)
# handle audio
def handle_video_audio() -> None:
if modules.globals.keep_audio:
if modules.globals.keep_fps:
update_status('Restoring audio...')
@ -218,12 +285,6 @@ def start() -> None:
restore_audio(modules.globals.target_path, modules.globals.output_path)
else:
move_temp(modules.globals.target_path, modules.globals.output_path)
# clean and validate
clean_temp(modules.globals.target_path)
if is_video(modules.globals.target_path):
update_status('Processing to video succeed!')
else:
update_status('Processing to video failed!')
def destroy() -> None:
@ -233,15 +294,20 @@ def destroy() -> None:
def run() -> None:
parse_args()
if not pre_check():
return
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_check():
try:
parse_args()
if not pre_check():
return
limit_resources()
if modules.globals.headless:
start()
else:
window = ui.init(start, destroy)
window.mainloop()
for frame_processor in get_frame_processors_modules(modules.globals.frame_processors):
if not frame_processor.pre_check():
return
limit_resources()
if modules.globals.headless:
start()
else:
window = ui.init(start, destroy)
window.mainloop()
except Exception as e:
print(f"UI initialization failed: {str(e)}")
update_status(f"UI initialization failed: {str(e)}")
destroy() # Ensure any resources are cleaned up on failure

View File

@ -1,31 +1,27 @@
from typing import Any
from typing import Any, Optional
import insightface
import modules.globals
from modules.typing import Frame
FACE_ANALYSER = None
FACE_ANALYSER: Optional[insightface.app.FaceAnalysis] = None
def get_face_analyser() -> Any:
def get_face_analyser() -> insightface.app.FaceAnalysis:
global FACE_ANALYSER
if FACE_ANALYSER is None:
FACE_ANALYSER = insightface.app.FaceAnalysis(name='buffalo_l', providers=modules.globals.execution_providers)
FACE_ANALYSER = insightface.app.FaceAnalysis(
name='buffalo_l',
providers=modules.globals.execution_providers
)
FACE_ANALYSER.prepare(ctx_id=0, det_size=(640, 640))
return FACE_ANALYSER
def get_one_face(frame: Frame) -> Optional[Any]:
faces = get_face_analyser().get(frame)
return min(faces, key=lambda x: x.bbox[0], default=None)
def get_one_face(frame: Frame) -> Any:
face = get_face_analyser().get(frame)
try:
return min(face, key=lambda x: x.bbox[0])
except ValueError:
return None
def get_many_faces(frame: Frame) -> Any:
try:
return get_face_analyser().get(frame)
except IndexError:
return None
def get_many_faces(frame: Frame) -> Optional[Any]:
faces = get_face_analyser().get(frame)
return faces if faces else None

View File

@ -1,25 +1,24 @@
import numpy
import numpy as np
import opennsfw2
from PIL import Image
from modules.typing import Frame
MAX_PROBABILITY = 0.85
# Preload the model once for efficiency
model = opennsfw2.make_open_nsfw_model()
def predict_frame(target_frame: Frame) -> bool:
image = Image.fromarray(target_frame)
image = opennsfw2.preprocess_image(image, opennsfw2.Preprocessing.YAHOO)
model = opennsfw2.make_open_nsfw_model()
views = numpy.expand_dims(image, axis=0)
views = np.expand_dims(image, axis=0)
_, probability = model.predict(views)[0]
return probability > MAX_PROBABILITY
def predict_image(target_path: str) -> bool:
return opennsfw2.predict_image(target_path) > MAX_PROBABILITY
probability = opennsfw2.predict_image(target_path, model=model)
return probability > MAX_PROBABILITY
def predict_video(target_path: str) -> bool:
_, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100)
_, probabilities = opennsfw2.predict_video_frames(video_path=target_path, frame_interval=100, model=model)
return any(probability > MAX_PROBABILITY for probability in probabilities)

View File

@ -17,57 +17,56 @@ FRAME_PROCESSORS_INTERFACE = [
'process_video'
]
def load_frame_processor_module(frame_processor: str) -> Any:
def load_frame_processor_module(frame_processor: str) -> ModuleType:
try:
frame_processor_module = importlib.import_module(f'modules.processors.frame.{frame_processor}')
# Ensure all required methods are present
for method_name in FRAME_PROCESSORS_INTERFACE:
if not hasattr(frame_processor_module, method_name):
sys.exit()
raise AttributeError(f"Missing required method {method_name} in {frame_processor} module.")
except ImportError:
print(f"Frame processor {frame_processor} not found")
sys.exit()
print(f"Error: Frame processor '{frame_processor}' not found.")
sys.exit(1)
except AttributeError as e:
print(e)
sys.exit(1)
return frame_processor_module
def get_frame_processors_modules(frame_processors: List[str]) -> List[ModuleType]:
global FRAME_PROCESSORS_MODULES
if not FRAME_PROCESSORS_MODULES:
for frame_processor in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
FRAME_PROCESSORS_MODULES = [load_frame_processor_module(fp) for fp in frame_processors]
set_frame_processors_modules_from_ui(frame_processors)
return FRAME_PROCESSORS_MODULES
def set_frame_processors_modules_from_ui(frame_processors: List[str]) -> None:
global FRAME_PROCESSORS_MODULES
for frame_processor, state in modules.globals.fp_ui.items():
if state == True and frame_processor not in frame_processors:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(frame_processor_module)
if state and frame_processor not in frame_processors:
module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.append(module)
modules.globals.frame_processors.append(frame_processor)
if state == False:
try:
frame_processor_module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.remove(frame_processor_module)
modules.globals.frame_processors.remove(frame_processor)
except:
pass
elif not state and frame_processor in frame_processors:
module = load_frame_processor_module(frame_processor)
FRAME_PROCESSORS_MODULES.remove(module)
modules.globals.frame_processors.remove(frame_processor)
def multi_process_frame(source_path: str, temp_frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None], progress: Any = None) -> None:
with ThreadPoolExecutor(max_workers=modules.globals.execution_threads) as executor:
futures = []
for path in temp_frame_paths:
future = executor.submit(process_frames, source_path, [path], progress)
futures.append(future)
futures = [executor.submit(process_frames, source_path, [path], progress) for path in temp_frame_paths]
for future in futures:
future.result()
def process_video(source_path: str, frame_paths: list[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
def process_video(source_path: str, frame_paths: List[str], process_frames: Callable[[str, List[str], Any], None]) -> None:
progress_bar_format = '{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}{postfix}]'
total = len(frame_paths)
with tqdm(total=total, desc='Processing', unit='frame', dynamic_ncols=True, bar_format=progress_bar_format) as progress:
progress.set_postfix({'execution_providers': modules.globals.execution_providers, 'execution_threads': modules.globals.execution_threads, 'max_memory': modules.globals.max_memory})
progress.set_postfix({
'execution_providers': modules.globals.execution_providers,
'execution_threads': modules.globals.execution_threads,
'max_memory': modules.globals.max_memory
})
multi_process_frame(source_path, frame_paths, process_frames, progress)

View File

@ -8,7 +8,7 @@ import modules.globals
import modules.processors.frame.core
from modules.core import update_status
from modules.face_analyser import get_one_face
from modules.typing import Frame, Face
from modules.typing import Frame, Face # Ensure these are imported
from modules.utilities import conditional_download, resolve_relative_path, is_image, is_video
FACE_ENHANCER = None
@ -16,34 +16,26 @@ THREAD_SEMAPHORE = threading.Semaphore()
THREAD_LOCK = threading.Lock()
NAME = 'DLC.FACE-ENHANCER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('..\models')
conditional_download(download_directory_path, ['https://github.com/TencentARC/GFPGAN/releases/download/v1.3.4/GFPGANv1.4.pth'])
return True
def pre_start() -> bool:
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def get_face_enhancer() -> Any:
global FACE_ENHANCER
with THREAD_LOCK:
if FACE_ENHANCER is None:
if os.name == 'nt':
model_path = resolve_relative_path('..\models\GFPGANv1.4.pth')
# todo: set models path https://github.com/TencentARC/GFPGAN/issues/399
else:
model_path = resolve_relative_path('../models/GFPGANv1.4.pth')
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
model_path = resolve_relative_path('../models/GFPGANv1.4.pth')
FACE_ENHANCER = gfpgan.GFPGANer(model_path=model_path, upscale=1) # type: ignore[attr-defined]
return FACE_ENHANCER
def enhance_face(temp_frame: Frame) -> Frame:
with THREAD_SEMAPHORE:
_, _, temp_frame = get_face_enhancer().enhance(
@ -52,14 +44,12 @@ def enhance_face(temp_frame: Frame) -> Frame:
)
return temp_frame
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
target_face = get_one_face(temp_frame)
if target_face:
temp_frame = enhance_face(temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
for temp_frame_path in temp_frame_paths:
temp_frame = cv2.imread(temp_frame_path)
@ -68,12 +58,10 @@ def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
target_frame = cv2.imread(target_path)
result = process_frame(None, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(None, temp_frame_paths, process_frames)

View File

@ -2,6 +2,7 @@ from typing import Any, List
import cv2
import insightface
import threading
import os
import modules.globals
import modules.processors.frame.core
@ -14,26 +15,25 @@ FACE_SWAPPER = None
THREAD_LOCK = threading.Lock()
NAME = 'DLC.FACE-SWAPPER'
def pre_check() -> bool:
download_directory_path = resolve_relative_path('../models')
conditional_download(download_directory_path, ['https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx'])
conditional_download(download_directory_path, [
'https://huggingface.co/hacksider/deep-live-cam/blob/main/inswapper_128_fp16.onnx'
])
return True
def pre_start() -> bool:
if not is_image(modules.globals.source_path):
update_status('Select an image for source path.', NAME)
return False
elif not get_one_face(cv2.imread(modules.globals.source_path)):
update_status('No face in source path detected.', NAME)
update_status('No face detected in the source path.', NAME)
return False
if not is_image(modules.globals.target_path) and not is_video(modules.globals.target_path):
update_status('Select an image or video for target path.', NAME)
return False
return True
def get_face_swapper() -> Any:
global FACE_SWAPPER
@ -43,11 +43,9 @@ def get_face_swapper() -> Any:
FACE_SWAPPER = insightface.model_zoo.get_model(model_path, providers=modules.globals.execution_providers)
return FACE_SWAPPER
def swap_face(source_face: Face, target_face: Face, temp_frame: Frame) -> Frame:
return get_face_swapper().get(temp_frame, target_face, source_face, paste_back=True)
def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
if modules.globals.many_faces:
many_faces = get_many_faces(temp_frame)
@ -60,7 +58,6 @@ def process_frame(source_face: Face, temp_frame: Frame) -> Frame:
temp_frame = swap_face(source_face, target_face, temp_frame)
return temp_frame
def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any = None) -> None:
source_face = get_one_face(cv2.imread(source_path))
for temp_frame_path in temp_frame_paths:
@ -69,18 +66,15 @@ def process_frames(source_path: str, temp_frame_paths: List[str], progress: Any
result = process_frame(source_face, temp_frame)
cv2.imwrite(temp_frame_path, result)
except Exception as exception:
print(exception)
pass
print(f"Error processing frame {temp_frame_path}: {exception}")
if progress:
progress.update(1)
def process_image(source_path: str, target_path: str, output_path: str) -> None:
source_face = get_one_face(cv2.imread(source_path))
target_frame = cv2.imread(target_path)
result = process_frame(source_face, target_frame)
cv2.imwrite(output_path, result)
def process_video(source_path: str, temp_frame_paths: List[str]) -> None:
modules.processors.frame.core.process_video(source_path, temp_frame_paths, process_frames)

View File

@ -1,76 +1,57 @@
{
"CTk": {
"fg_color": ["gray95", "gray10"]
"fg_color": ["#FFFFFF", "#2D2D2D"]
},
"CTkToplevel": {
"fg_color": ["gray95", "gray10"]
"fg_color": ["#FFFFFF", "#2D2D2D"]
},
"CTkFrame": {
"corner_radius": 0,
"border_width": 0,
"fg_color": ["gray90", "gray13"],
"top_fg_color": ["gray85", "gray16"],
"border_color": ["gray65", "gray28"]
"fg_color": ["#F0F0F0", "#3C3C3C"],
"top_fg_color": ["#E0E0E0", "#4B4B4B"],
"border_color": ["#B0B0B0", "#5A5A5A"]
},
"CTkButton": {
"corner_radius": 0,
"border_width": 0,
"fg_color": ["#2aa666", "#1f538d"],
"hover_color": ["#3cb666", "#14375e"],
"border_color": ["#3e4a40", "#949A9F"],
"text_color": ["#f3faf6", "#f3faf6"],
"fg_color": ["#007ACC", "#007ACC"],
"hover_color": ["#005EA3", "#005EA3"],
"border_color": ["#004C8A", "#004C8A"],
"text_color": ["#FFFFFF", "#FFFFFF"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkLabel": {
"corner_radius": 0,
"fg_color": "transparent",
"text_color": ["gray14", "gray84"]
"text_color": ["#000000", "#FFFFFF"]
},
"CTkEntry": {
"corner_radius": 0,
"border_width": 2,
"fg_color": ["#F9F9FA", "#343638"],
"border_color": ["#979DA2", "#565B5E"],
"text_color": ["gray14", "gray84"],
"fg_color": ["#FFFFFF", "#333333"],
"border_color": ["#A0A0A0", "#5A5A5A"],
"text_color": ["#000000", "#FFFFFF"],
"placeholder_text_color": ["gray52", "gray62"]
},
"CTkCheckbox": {
"corner_radius": 0,
"border_width": 3,
"fg_color": ["#2aa666", "#1f538d"],
"border_color": ["#3e4a40", "#949A9F"],
"hover_color": ["#3cb666", "#14375e"],
"checkmark_color": ["#f3faf6", "gray90"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkSwitch": {
"corner_radius": 1000,
"border_width": 3,
"button_length": 0,
"fg_color": ["#939BA2", "#4A4D50"],
"progress_color": ["#2aa666", "#1f538d"],
"button_color": ["gray36", "#D5D9DE"],
"button_hover_color": ["gray20", "gray100"],
"text_color": ["gray14", "gray84"],
"button_color": ["#444444", "#D5D9DE"],
"button_hover_color": ["#333333", "#FFFFFF"],
"text_color": ["#000000", "#FFFFFF"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkRadiobutton": {
"corner_radius": 1000,
"border_width_checked": 6,
"border_width_unchecked": 3,
"CTkOptionMenu": {
"corner_radius": 0,
"fg_color": ["#2aa666", "#1f538d"],
"border_color": ["#3e4a40", "#949A9F"],
"hover_color": ["#3cb666", "#14375e"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray60", "gray45"]
},
"CTkProgressBar": {
"corner_radius": 1000,
"border_width": 0,
"fg_color": ["#939BA2", "#4A4D50"],
"progress_color": ["#2aa666", "#1f538d"],
"border_color": ["gray", "gray"]
"button_color": ["#3cb666", "#14375e"],
"button_hover_color": ["#234567", "#1e2c40"],
"text_color": ["#FFFFFF", "#FFFFFF"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkSlider": {
"corner_radius": 1000,
@ -82,59 +63,6 @@
"button_color": ["#2aa666", "#1f538d"],
"button_hover_color": ["#3cb666", "#14375e"]
},
"CTkOptionMenu": {
"corner_radius": 0,
"fg_color": ["#2aa666", "#1f538d"],
"button_color": ["#3cb666", "#14375e"],
"button_hover_color": ["#234567", "#1e2c40"],
"text_color": ["#f3faf6", "#f3faf6"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkComboBox": {
"corner_radius": 0,
"border_width": 2,
"fg_color": ["#F9F9FA", "#343638"],
"border_color": ["#979DA2", "#565B5E"],
"button_color": ["#979DA2", "#565B5E"],
"button_hover_color": ["#6E7174", "#7A848D"],
"text_color": ["gray14", "gray84"],
"text_color_disabled": ["gray50", "gray45"]
},
"CTkScrollbar": {
"corner_radius": 1000,
"border_spacing": 4,
"fg_color": "transparent",
"button_color": ["gray55", "gray41"],
"button_hover_color": ["gray40", "gray53"]
},
"CTkSegmentedButton": {
"corner_radius": 0,
"border_width": 2,
"fg_color": ["#979DA2", "gray29"],
"selected_color": ["#2aa666", "#1f538d"],
"selected_hover_color": ["#3cb666", "#14375e"],
"unselected_color": ["#979DA2", "gray29"],
"unselected_hover_color": ["gray70", "gray41"],
"text_color": ["#f3faf6", "#f3faf6"],
"text_color_disabled": ["gray74", "gray60"]
},
"CTkTextbox": {
"corner_radius": 0,
"border_width": 0,
"fg_color": ["gray100", "gray20"],
"border_color": ["#979DA2", "#565B5E"],
"text_color": ["gray14", "gray84"],
"scrollbar_button_color": ["gray55", "gray41"],
"scrollbar_button_hover_color": ["gray40", "gray53"]
},
"CTkScrollableFrame": {
"label_fg_color": ["gray80", "gray21"]
},
"DropdownMenu": {
"fg_color": ["gray90", "gray20"],
"hover_color": ["gray75", "gray28"],
"text_color": ["gray14", "gray84"]
},
"CTkFont": {
"macOS": {
"family": "Avenir",
@ -152,7 +80,12 @@
"weight": "normal"
}
},
"DropdownMenu": {
"fg_color": ["#FFFFFF", "#2D2D2D"],
"hover_color": ["#E0E0E0", "#4B4B4B"],
"text_color": ["#000000", "#FFFFFF"]
},
"URL": {
"text_color": ["gray74", "gray60"]
"text_color": ["#007ACC", "#1E90FF"]
}
}

View File

@ -1,10 +1,17 @@
import os
import platform
import webbrowser
import customtkinter as ctk
from typing import Callable, Tuple
import cv2
from PIL import Image, ImageOps
# Import OS-specific modules only when necessary
if platform.system() == 'Darwin': # macOS
import objc
from Foundation import NSObject
import AVFoundation
import modules.globals
import modules.metadata
from modules.face_analyser import get_one_face
@ -33,9 +40,47 @@ status_label = None
img_ft, vid_ft = modules.globals.file_types
def check_camera_permissions():
"""Check and request camera access permission on macOS."""
if platform.system() == 'Darwin': # macOS-specific
auth_status = AVFoundation.AVCaptureDevice.authorizationStatusForMediaType_(AVFoundation.AVMediaTypeVideo)
if auth_status == AVFoundation.AVAuthorizationStatusNotDetermined:
# Request access to the camera
def completion_handler(granted):
if granted:
print("Access granted to the camera.")
else:
print("Access denied to the camera.")
AVFoundation.AVCaptureDevice.requestAccessForMediaType_completionHandler_(AVFoundation.AVMediaTypeVideo, completion_handler)
elif auth_status == AVFoundation.AVAuthorizationStatusAuthorized:
print("Camera access already authorized.")
elif auth_status == AVFoundation.AVAuthorizationStatusDenied:
print("Camera access denied. Please enable it in System Preferences.")
elif auth_status == AVFoundation.AVAuthorizationStatusRestricted:
print("Camera access restricted. The app is not allowed to use the camera.")
def select_camera(camera_name: str):
"""Select the appropriate camera based on its name (cross-platform)."""
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.localizedName() == camera_name:
return device
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# On Windows/Linux, simply return the camera name as OpenCV can handle it by index
return camera_name
return None
def init(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.CTk:
global ROOT, PREVIEW
if platform.system() == 'Darwin': # macOS-specific
check_camera_permissions() # Check camera permissions before initializing the UI
ROOT = create_root(start, destroy)
PREVIEW = create_preview(ROOT)
@ -49,10 +94,11 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
ctk.set_appearance_mode('system')
ctk.set_default_color_theme(resolve_relative_path('ui.json'))
print("Creating root window...")
root = ctk.CTk()
root.minsize(ROOT_WIDTH, ROOT_HEIGHT)
root.title(f'{modules.metadata.name} {modules.metadata.version} {modules.metadata.edition}')
root.configure()
root.protocol('WM_DELETE_WINDOW', lambda: destroy())
source_label = ctk.CTkLabel(root, text=None)
@ -61,10 +107,10 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
target_label = ctk.CTkLabel(root, text=None)
target_label.place(relx=0.6, rely=0.1, relwidth=0.3, relheight=0.25)
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=lambda: select_source_path())
source_button = ctk.CTkButton(root, text='Select a face', cursor='hand2', command=select_source_path)
source_button.place(relx=0.1, rely=0.4, relwidth=0.3, relheight=0.1)
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=lambda: select_target_path())
target_button = ctk.CTkButton(root, text='Select a target', cursor='hand2', command=select_target_path)
target_button.place(relx=0.6, rely=0.4, relwidth=0.3, relheight=0.1)
keep_fps_value = ctk.BooleanVar(value=modules.globals.keep_fps)
@ -75,9 +121,8 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
keep_frames_switch = ctk.CTkSwitch(root, text='Keep frames', variable=keep_frames_value, cursor='hand2', command=lambda: setattr(modules.globals, 'keep_frames', keep_frames_value.get()))
keep_frames_switch.place(relx=0.1, rely=0.65)
# for FRAME PROCESSOR ENHANCER tumbler:
enhancer_value = ctk.BooleanVar(value=modules.globals.fp_ui['face_enhancer'])
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer',enhancer_value.get()))
enhancer_switch = ctk.CTkSwitch(root, text='Face Enhancer', variable=enhancer_value, cursor='hand2', command=lambda: update_tumbler('face_enhancer', enhancer_value.get()))
enhancer_switch.place(relx=0.1, rely=0.7)
keep_audio_value = ctk.BooleanVar(value=modules.globals.keep_audio)
@ -93,57 +138,51 @@ def create_root(start: Callable[[], None], destroy: Callable[[], None]) -> ctk.C
nsfw_switch.place(relx=0.6, rely=0.7)
start_button = ctk.CTkButton(root, text='Start', cursor='hand2', command=lambda: select_output_path(start))
start_button.place(relx=0.15, rely=0.80, relwidth=0.2, relheight=0.05)
start_button.place(relx=0.15, rely=0.8, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=lambda: destroy())
stop_button.place(relx=0.4, rely=0.80, relwidth=0.2, relheight=0.05)
stop_button = ctk.CTkButton(root, text='Destroy', cursor='hand2', command=destroy)
stop_button.place(relx=0.4, rely=0.8, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=lambda: toggle_preview())
preview_button.place(relx=0.65, rely=0.80, relwidth=0.2, relheight=0.05)
preview_button = ctk.CTkButton(root, text='Preview', cursor='hand2', command=toggle_preview)
preview_button.place(relx=0.65, rely=0.8, relwidth=0.2, relheight=0.05)
# --- Camera Selection ---
camera_label = ctk.CTkLabel(root, text="Select Camera:")
camera_label.place(relx=0.4, rely=0.86, relwidth=0.2, relheight=0.05)
available_cameras = get_available_cameras()
# Convert camera indices to strings for CTkOptionMenu
available_camera_strings = [str(cam) for cam in available_cameras]
camera_variable = ctk.StringVar(value=available_camera_strings[0] if available_camera_strings else "No cameras found")
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable,
values=available_camera_strings)
camera_optionmenu = ctk.CTkOptionMenu(root, variable=camera_variable, values=available_camera_strings)
camera_optionmenu.place(relx=0.65, rely=0.86, relwidth=0.2, relheight=0.05)
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(int(camera_variable.get())))
live_button = ctk.CTkButton(root, text='Live', cursor='hand2', command=lambda: webcam_preview(camera_variable.get()))
live_button.place(relx=0.15, rely=0.86, relwidth=0.2, relheight=0.05)
# --- End Camera Selection ---
status_label = ctk.CTkLabel(root, text=None, justify='center')
status_label.place(relx=0.1, rely=0.9, relwidth=0.8)
status_label.place(relx=0.1, relwidth=0.8, rely=0.9)
donate_label = ctk.CTkLabel(root, text='Deep Live Cam', justify='center', cursor='hand2')
donate_label.place(relx=0.1, rely=0.95, relwidth=0.8)
donate_label.configure(text_color=ctk.ThemeManager.theme.get('URL').get('text_color'))
donate_label.bind('<Button>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
donate_label.bind('<Button-1>', lambda event: webbrowser.open('https://paypal.me/hacksider'))
return root
def create_preview(parent: ctk.CTkToplevel) -> ctk.CTkToplevel:
def create_preview(parent: ctk.CTk) -> ctk.CTkToplevel:
global preview_label, preview_slider
preview = ctk.CTkToplevel(parent)
preview.withdraw()
preview.title('Preview')
preview.configure()
preview.protocol('WM_DELETE_WINDOW', lambda: toggle_preview())
preview.protocol('WM_DELETE_WINDOW', toggle_preview)
preview.resizable(width=False, height=False)
preview_label = ctk.CTkLabel(preview, text=None)
preview_label.pack(fill='both', expand=True)
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=lambda frame_value: update_preview(frame_value))
preview_slider = ctk.CTkSlider(preview, from_=0, to=0, command=update_preview)
return preview
@ -158,10 +197,10 @@ def update_tumbler(var: str, value: bool) -> None:
def select_source_path() -> None:
global RECENT_DIRECTORY_SOURCE, img_ft, vid_ft
global RECENT_DIRECTORY_SOURCE
PREVIEW.withdraw()
source_path = ctk.filedialog.askopenfilename(title='select an source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
source_path = ctk.filedialog.askopenfilename(title='Select a source image', initialdir=RECENT_DIRECTORY_SOURCE, filetypes=[img_ft])
if is_image(source_path):
modules.globals.source_path = source_path
RECENT_DIRECTORY_SOURCE = os.path.dirname(modules.globals.source_path)
@ -173,10 +212,10 @@ def select_source_path() -> None:
def select_target_path() -> None:
global RECENT_DIRECTORY_TARGET, img_ft, vid_ft
global RECENT_DIRECTORY_TARGET
PREVIEW.withdraw()
target_path = ctk.filedialog.askopenfilename(title='select an target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
target_path = ctk.filedialog.askopenfilename(title='Select a target image or video', initialdir=RECENT_DIRECTORY_TARGET, filetypes=[img_ft, vid_ft])
if is_image(target_path):
modules.globals.target_path = target_path
RECENT_DIRECTORY_TARGET = os.path.dirname(modules.globals.target_path)
@ -193,12 +232,12 @@ def select_target_path() -> None:
def select_output_path(start: Callable[[], None]) -> None:
global RECENT_DIRECTORY_OUTPUT, img_ft, vid_ft
global RECENT_DIRECTORY_OUTPUT
if is_image(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
output_path = ctk.filedialog.asksaveasfilename(title='Save image output file', filetypes=[img_ft], defaultextension='.png', initialfile='output.png', initialdir=RECENT_DIRECTORY_OUTPUT)
elif is_video(modules.globals.target_path):
output_path = ctk.filedialog.asksaveasfilename(title='save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
output_path = ctk.filedialog.asksaveasfilename(title='Save video output file', filetypes=[vid_ft], defaultextension='.mp4', initialfile='output.mp4', initialdir=RECENT_DIRECTORY_OUTPUT)
else:
output_path = None
if output_path:
@ -219,13 +258,13 @@ def render_video_preview(video_path: str, size: Tuple[int, int], frame_number: i
if frame_number:
capture.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
has_frame, frame = capture.read()
capture.release()
if has_frame:
image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
if size:
image = ImageOps.fit(image, size, Image.LANCZOS)
return ctk.CTkImage(image, size=image.size)
capture.release()
cv2.destroyAllWindows()
return None
def toggle_preview() -> None:
@ -240,7 +279,7 @@ def toggle_preview() -> None:
def init_preview() -> None:
if is_image(modules.globals.target_path):
preview_slider.pack_forget()
if is_video(modules.globals.target_path):
elif is_video(modules.globals.target_path):
video_frame_total = get_video_frame_total(modules.globals.target_path)
preview_slider.configure(to=video_frame_total)
preview_slider.pack(fill='x')
@ -250,7 +289,7 @@ def init_preview() -> None:
def update_preview(frame_number: int = 0) -> None:
if modules.globals.source_path and modules.globals.target_path:
temp_frame = get_video_frame(modules.globals.target_path, frame_number)
if modules.globals.nsfw == False:
if not modules.globals.nsfw:
from modules.predicter import predict_frame
if predict_frame(temp_frame):
quit()
@ -264,62 +303,98 @@ def update_preview(frame_number: int = 0) -> None:
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
def webcam_preview(camera_index: int):
def webcam_preview(camera_name: str):
if modules.globals.source_path is None:
# No image selected
return
global preview_label, PREVIEW
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():
update_status(f"Error: Could not open camera with index {camera_index}")
# Select the camera by its name
selected_camera = select_camera(camera_name)
if selected_camera is None:
update_status(f"No suitable camera found.")
return
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # Set the width of the resolution
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # Set the height of the resolution
cap.set(cv2.CAP_PROP_FPS, 60) # Set the frame rate of the webcam
# Use OpenCV's camera index for cross-platform compatibility
camera_index = get_camera_index_by_name(camera_name)
cap = cv2.VideoCapture(camera_index)
if not cap.isOpened():
update_status(f"Error: Could not open camera {camera_name}")
return
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540)
cap.set(cv2.CAP_PROP_FPS, 60)
PREVIEW_MAX_WIDTH = 960
PREVIEW_MAX_HEIGHT = 540
preview_label.configure(image=None) # Reset the preview image before startup
PREVIEW.deiconify() # Open preview window
preview_label.configure(image=None)
PREVIEW.deiconify()
frame_processors = get_frame_processors_modules(modules.globals.frame_processors)
source_image = None # Initialize variable for the selected face image
source_image = get_one_face(cv2.imread(modules.globals.source_path))
while True:
ret, frame = cap.read()
if not ret:
update_status(f"Error: Frame not received from camera.")
break
# Select and save face image only once
if source_image is None and modules.globals.source_path:
source_image = get_one_face(cv2.imread(modules.globals.source_path))
temp_frame = frame.copy() #Create a copy of the frame
temp_frame = frame.copy()
for frame_processor in frame_processors:
temp_frame = frame_processor.process_frame(source_image, temp_frame)
image = cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB) # Convert the image to RGB format to display it with Tkinter
image = Image.fromarray(image)
image = Image.fromarray(cv2.cvtColor(temp_frame, cv2.COLOR_BGR2RGB))
image = ImageOps.contain(image, (PREVIEW_MAX_WIDTH, PREVIEW_MAX_HEIGHT), Image.LANCZOS)
image = ctk.CTkImage(image, size=image.size)
preview_label.configure(image=image)
ROOT.update()
cap.release()
PREVIEW.withdraw() # Close preview window when loop is finished
PREVIEW.withdraw()
def get_camera_index_by_name(camera_name: str) -> int:
"""Map camera name to index for OpenCV."""
if platform.system() == 'Darwin': # macOS-specific
if "FaceTime" in camera_name:
return 0 # Assuming FaceTime is at index 0
elif "iPhone" in camera_name:
return 1 # Assuming iPhone camera is at index 1
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# Map camera name to index dynamically (OpenCV on these platforms usually starts with 0)
return get_available_cameras().index(camera_name)
return -1
def get_available_cameras():
"""Returns a list of available camera indices."""
"""Get available camera names (cross-platform)."""
available_cameras = []
for index in range(10): # Check for cameras with index 0 to 9
cap = cv2.VideoCapture(index)
if cap.isOpened():
available_cameras.append(index)
if platform.system() == 'Darwin': # macOS-specific
devices = AVFoundation.AVCaptureDevice.devicesWithMediaType_(AVFoundation.AVMediaTypeVideo)
for device in devices:
if device.deviceType() == AVFoundation.AVCaptureDeviceTypeBuiltInWideAngleCamera:
print(f"Found Built-In Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeExternal":
print(f"Found External Camera: {device.localizedName()}")
available_cameras.append(device.localizedName())
elif device.deviceType() == "AVCaptureDeviceTypeContinuityCamera":
print(f"Skipping Continuity Camera: {device.localizedName()}")
elif platform.system() == 'Windows' or platform.system() == 'Linux':
# Use OpenCV to detect camera indexes
index = 0
while True:
cap = cv2.VideoCapture(index)
if not cap.isOpened():
break
available_cameras.append(f"Camera {index}")
cap.release()
return available_cameras
index += 1
return available_cameras

View File

@ -5,7 +5,7 @@ import platform
import shutil
import ssl
import subprocess
import urllib
import urllib.request
from pathlib import Path
from typing import List, Any
from tqdm import tqdm
@ -15,127 +15,123 @@ import modules.globals
TEMP_FILE = 'temp.mp4'
TEMP_DIRECTORY = 'temp'
# monkey patch ssl for mac
# Monkey patch SSL for macOS to handle issues with some HTTPS requests
if platform.system().lower() == 'darwin':
ssl._create_default_https_context = ssl._create_unverified_context
def run_ffmpeg(args: List[str]) -> bool:
commands = ['ffmpeg', '-hide_banner', '-hwaccel', 'auto', '-loglevel', modules.globals.log_level]
commands.extend(args)
try:
subprocess.check_output(commands, stderr=subprocess.STDOUT)
return True
except Exception:
pass
except subprocess.CalledProcessError as e:
print(f"FFmpeg error: {e.output.decode()}")
return False
def detect_fps(target_path: str) -> float:
command = ['ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=r_frame_rate', '-of', 'default=noprint_wrappers=1:nokey=1', target_path]
output = subprocess.check_output(command).decode().strip().split('/')
command = [
'ffprobe', '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=r_frame_rate',
'-of', 'default=noprint_wrappers=1:nokey=1', target_path
]
try:
output = subprocess.check_output(command).decode().strip().split('/')
numerator, denominator = map(int, output)
return numerator / denominator
except Exception:
pass
except (subprocess.CalledProcessError, ValueError):
print("Failed to detect FPS, defaulting to 30.0 FPS.")
return 30.0
def extract_frames(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
create_temp(target_path)
run_ffmpeg(['-i', target_path, '-pix_fmt', 'rgb24', os.path.join(temp_directory_path, '%04d.png')])
def create_video(target_path: str, fps: float = 30.0) -> None:
temp_output_path = get_temp_output_path(target_path)
temp_directory_path = get_temp_directory_path(target_path)
run_ffmpeg(['-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'), '-c:v', modules.globals.video_encoder, '-crf', str(modules.globals.video_quality), '-pix_fmt', 'yuv420p', '-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1', '-y', temp_output_path])
run_ffmpeg([
'-r', str(fps), '-i', os.path.join(temp_directory_path, '%04d.png'),
'-c:v', modules.globals.video_encoder,
'-crf', str(modules.globals.video_quality),
'-pix_fmt', 'yuv420p',
'-vf', 'colorspace=bt709:iall=bt601-6-625:fast=1',
'-y', temp_output_path
])
def restore_audio(target_path: str, output_path: str) -> None:
temp_output_path = get_temp_output_path(target_path)
done = run_ffmpeg(['-i', temp_output_path, '-i', target_path, '-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path])
done = run_ffmpeg([
'-i', temp_output_path, '-i', target_path,
'-c:v', 'copy', '-map', '0:v:0', '-map', '1:a:0', '-y', output_path
])
if not done:
move_temp(target_path, output_path)
def get_temp_frame_paths(target_path: str) -> List[str]:
temp_directory_path = get_temp_directory_path(target_path)
return glob.glob((os.path.join(glob.escape(temp_directory_path), '*.png')))
return glob.glob(os.path.join(glob.escape(temp_directory_path), '*.png'))
def get_temp_directory_path(target_path: str) -> str:
target_name, _ = os.path.splitext(os.path.basename(target_path))
target_directory_path = os.path.dirname(target_path)
return os.path.join(target_directory_path, TEMP_DIRECTORY, target_name)
target_name = Path(target_path).stem
target_directory_path = Path(target_path).parent
return str(target_directory_path / TEMP_DIRECTORY / target_name)
def get_temp_output_path(target_path: str) -> str:
temp_directory_path = get_temp_directory_path(target_path)
return os.path.join(temp_directory_path, TEMP_FILE)
return str(Path(temp_directory_path) / TEMP_FILE)
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> Any:
if source_path and target_path:
source_name, _ = os.path.splitext(os.path.basename(source_path))
target_name, target_extension = os.path.splitext(os.path.basename(target_path))
if os.path.isdir(output_path):
return os.path.join(output_path, source_name + '-' + target_name + target_extension)
def normalize_output_path(source_path: str, target_path: str, output_path: str) -> str:
if source_path and target_path and os.path.isdir(output_path):
source_name = Path(source_path).stem
target_name = Path(target_path).stem
target_extension = Path(target_path).suffix
return str(Path(output_path) / f"{source_name}-{target_name}{target_extension}")
return output_path
def create_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
Path(temp_directory_path).mkdir(parents=True, exist_ok=True)
def move_temp(target_path: str, output_path: str) -> None:
temp_output_path = get_temp_output_path(target_path)
if os.path.isfile(temp_output_path):
if os.path.isfile(output_path):
os.remove(output_path)
shutil.move(temp_output_path, output_path)
def clean_temp(target_path: str) -> None:
temp_directory_path = get_temp_directory_path(target_path)
parent_directory_path = os.path.dirname(temp_directory_path)
parent_directory_path = Path(temp_directory_path).parent
if not modules.globals.keep_frames and os.path.isdir(temp_directory_path):
shutil.rmtree(temp_directory_path)
if os.path.exists(parent_directory_path) and not os.listdir(parent_directory_path):
os.rmdir(parent_directory_path)
if parent_directory_path.exists() and not list(parent_directory_path.iterdir()):
parent_directory_path.rmdir()
def has_image_extension(image_path: str) -> bool:
return image_path.lower().endswith(('png', 'jpg', 'jpeg'))
def is_image(image_path: str) -> bool:
if image_path and os.path.isfile(image_path):
mimetype, _ = mimetypes.guess_type(image_path)
return bool(mimetype and mimetype.startswith('image/'))
return mimetype and mimetype.startswith('image/')
return False
def is_video(video_path: str) -> bool:
if video_path and os.path.isfile(video_path):
mimetype, _ = mimetypes.guess_type(video_path)
return bool(mimetype and mimetype.startswith('video/'))
return mimetype and mimetype.startswith('video/')
return False
def conditional_download(download_directory_path: str, urls: List[str]) -> None:
if not os.path.exists(download_directory_path):
os.makedirs(download_directory_path)
download_directory = Path(download_directory_path)
download_directory.mkdir(parents=True, exist_ok=True)
for url in urls:
download_file_path = os.path.join(download_directory_path, os.path.basename(url))
if not os.path.exists(download_file_path):
request = urllib.request.urlopen(url) # type: ignore[attr-defined]
total = int(request.headers.get('Content-Length', 0))
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size)) # type: ignore[attr-defined]
download_file_path = download_directory / Path(url).name
if not download_file_path.exists():
with urllib.request.urlopen(url) as request:
total = int(request.headers.get('Content-Length', 0))
with tqdm(total=total, desc='Downloading', unit='B', unit_scale=True, unit_divisor=1024) as progress:
urllib.request.urlretrieve(url, download_file_path, reporthook=lambda count, block_size, total_size: progress.update(block_size))
def resolve_relative_path(path: str) -> str:
return os.path.abspath(os.path.join(os.path.dirname(__file__), path))
return str(Path(__file__).parent / path)

View File

@ -21,3 +21,4 @@ opennsfw2==0.10.2
protobuf==4.23.2
tqdm==4.66.4
gfpgan==1.3.8
pyobjc==9.1; sys_platform == 'darwin'