mirror of
https://github.com/hacksider/Deep-Live-Cam.git
synced 2025-04-05 18:38:02 +02:00
95 lines
3.3 KiB
Python
95 lines
3.3 KiB
Python
import cv2
|
|
import numpy as np
|
|
from typing import Optional, Tuple, Callable
|
|
import platform
|
|
import threading
|
|
|
|
# Only import Windows-specific library if on Windows
|
|
if platform.system() == "Windows":
|
|
from pygrabber.dshow_graph import FilterGraph
|
|
|
|
|
|
class VideoCapturer:
|
|
def __init__(self, device_index: int):
|
|
self.device_index = device_index
|
|
self.frame_callback = None
|
|
self._current_frame = None
|
|
self._frame_ready = threading.Event()
|
|
self.is_running = False
|
|
self.cap = None
|
|
|
|
# Initialize Windows-specific components if on Windows
|
|
if platform.system() == "Windows":
|
|
self.graph = FilterGraph()
|
|
# Verify device exists
|
|
devices = self.graph.get_input_devices()
|
|
if self.device_index >= len(devices):
|
|
raise ValueError(
|
|
f"Invalid device index {device_index}. Available devices: {len(devices)}"
|
|
)
|
|
|
|
def start(self, width: int = 960, height: int = 540, fps: int = 60) -> bool:
|
|
"""Initialize and start video capture"""
|
|
try:
|
|
if platform.system() == "Windows":
|
|
# Windows-specific capture methods
|
|
capture_methods = [
|
|
(self.device_index, cv2.CAP_DSHOW), # Try DirectShow first
|
|
(self.device_index, cv2.CAP_ANY), # Then try default backend
|
|
(-1, cv2.CAP_ANY), # Try -1 as fallback
|
|
(0, cv2.CAP_ANY), # Finally try 0 without specific backend
|
|
]
|
|
|
|
for dev_id, backend in capture_methods:
|
|
try:
|
|
self.cap = cv2.VideoCapture(dev_id, backend)
|
|
if self.cap.isOpened():
|
|
break
|
|
self.cap.release()
|
|
except Exception:
|
|
continue
|
|
else:
|
|
# Unix-like systems (Linux/Mac) capture method
|
|
self.cap = cv2.VideoCapture(self.device_index)
|
|
|
|
if not self.cap or not self.cap.isOpened():
|
|
raise RuntimeError("Failed to open camera")
|
|
|
|
# Configure format
|
|
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
|
|
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
|
|
self.cap.set(cv2.CAP_PROP_FPS, fps)
|
|
|
|
self.is_running = True
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Failed to start capture: {str(e)}")
|
|
if self.cap:
|
|
self.cap.release()
|
|
return False
|
|
|
|
def read(self) -> Tuple[bool, Optional[np.ndarray]]:
|
|
"""Read a frame from the camera"""
|
|
if not self.is_running or self.cap is None:
|
|
return False, None
|
|
|
|
ret, frame = self.cap.read()
|
|
if ret:
|
|
self._current_frame = frame
|
|
if self.frame_callback:
|
|
self.frame_callback(frame)
|
|
return True, frame
|
|
return False, None
|
|
|
|
def release(self) -> None:
|
|
"""Stop capture and release resources"""
|
|
if self.is_running and self.cap is not None:
|
|
self.cap.release()
|
|
self.is_running = False
|
|
self.cap = None
|
|
|
|
def set_frame_callback(self, callback: Callable[[np.ndarray], None]) -> None:
|
|
"""Set callback for frame processing"""
|
|
self.frame_callback = callback
|