facesaver / clipsaver.py
quarterturn's picture
Upload 4 files
e862c01 verified
#!/usr/bin/env python3
import argparse
import os
import cv2
import numpy as np
from ultralytics import YOLO
from scenedetect import open_video, SceneManager, ContentDetector
import torch
def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Detect full faces in videos and capture 15-second video clips on scene changes.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument(
"--input-dir", "-I",
required=True,
help="Directory containing input video files."
)
parser.add_argument(
"--output-dir", "-O",
required=True,
help="Directory to save video clip outputs."
)
parser.add_argument(
"--min-width", "-w",
type=int,
default=200,
help="Minimum width of face bounding box to trigger capture."
)
parser.add_argument(
"--min-height", "-m",
type=int,
default=200,
help="Minimum height of face bounding box to trigger capture."
)
return parser.parse_args()
def ensure_directory(directory):
"""Create directory if it doesn't exist."""
if not os.path.exists(directory):
os.makedirs(directory)
def check_cuda():
"""Check CUDA availability and return device."""
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"CUDA is available! Using GPU: {torch.cuda.get_device_name(0)}")
print(f"CUDA version: {torch.version.cuda}")
print(f"Number of GPUs: {torch.cuda.device_count()}")
else:
device = torch.device("cpu")
print("CUDA is not available. Falling back to CPU.")
return device
def is_full_face(box, frame_shape, min_width, min_height, min_proportion=0.1):
"""Check if the bounding box represents a full face within the frame."""
x1, y1, x2, y2 = box
frame_height, frame_width = frame_shape[:2]
# Check if box is fully within frame (not touching edges)
if x1 <= 0 or y1 <= 0 or x2 >= frame_width or y2 >= frame_height:
return False
# Check minimum size
width = x2 - x1
height = y2 - y1
if width < min_width or height < min_height:
return False
# Check if box is large enough relative to frame (likely a face)
if width < frame_width * min_proportion or height < frame_height * min_proportion:
return False
return True
def process_video(video_path, output_dir, min_width, min_height, model, device):
"""Process a single video for face detection and capture 15-second video clips."""
# Initialize PySceneDetect for scene detection
try:
video = open_video(video_path)
scene_manager = SceneManager()
scene_manager.add_detector(ContentDetector(threshold=30.0))
except Exception as e:
print(f"Error initializing video for scene detection in {video_path}: {e}")
return
# Get video capture for OpenCV
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f"Error opening video file {video_path}")
return
fps = cap.get(cv2.CAP_PROP_FPS)
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
if fps <= 0:
print(f"Invalid FPS for {video_path}. Skipping.")
cap.release()
return
# Calculate frames for 15-second clip
num_frames = int(fps * 15)
# Get original dimensions
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if frame_height == 0:
print(f"Invalid frame height for {video_path}. Skipping.")
cap.release()
return
# Calculate scaled dimensions (height=480, maintain aspect ratio)
scale = 480 / frame_height
new_width = int(frame_width * scale)
new_height = 480
# Find scenes
try:
scene_manager.detect_scenes(video=video)
scene_list = scene_manager.get_scene_list()
scene_starts = [scene[0].get_frames() for scene in scene_list]
except Exception as e:
print(f"Error detecting scenes in {video_path}: {e}")
cap.release()
return
scene_index = 0
face_detected_in_scene = False
frame_idx = 0
output_count = 0
video_name = os.path.splitext(os.path.basename(video_path))[0]
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Check if current frame is start of a new scene
if scene_index < len(scene_starts) and frame_idx >= scene_starts[scene_index]:
face_detected_in_scene = False # Reset face detection for new scene
scene_index += 1
print(f"New scene detected at frame {frame_idx}")
# Perform face detection if no face has been detected in this scene
if not face_detected_in_scene:
try:
results = model.predict(frame, classes=[0], conf=0.75, device=device)
for result in results:
boxes = result.boxes.xyxy.cpu().numpy()
confidences = result.boxes.conf.cpu().numpy()
classes = result.boxes.cls.cpu().numpy()
for box, conf, cls in zip(boxes, confidences, classes):
if cls == 0: # Class 0 is 'person' in COCO, used as proxy for face
if is_full_face(box, frame.shape, min_width, min_height):
# Initialize VideoWriter
output_path = os.path.join(output_dir, f"{video_name}_face_{output_count:04d}.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (new_width, new_height))
if not out.isOpened():
print(f"Error initializing VideoWriter for {output_path}")
break
# Capture 15 seconds of frames
frames_captured = 0
start_frame_idx = frame_idx
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame_idx) # Reset to start frame
while frames_captured < num_frames:
ret, frame = cap.read()
if not ret:
print(f"Warning: Clip at frame {start_frame_idx} in {video_path} is shorter than 15 seconds ({frames_captured/fps:.2f} seconds)")
break
# Scale frame
scaled_frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
out.write(scaled_frame)
frames_captured += 1
frame_idx += 1
out.release()
print(f"Saved video clip: {output_path} ({frames_captured/fps:.2f} seconds)")
output_count += 1
face_detected_in_scene = True
# Skip to frame after clip
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame_idx + frames_captured)
break # Stop checking boxes after first valid face
if face_detected_in_scene:
break # Stop checking results after first valid face
except Exception as e:
print(f"Error during face detection in {video_path}: {e}")
else:
frame_idx += 1
cap.release()
print(f"Processed {video_path}: {output_count} video clips saved.")
def main():
"""Main function to process videos in input directory."""
args = parse_arguments()
# Validate input directory
if not os.path.isdir(args.input_dir):
print(f"Error: Input directory '{args.input_dir}' does not exist.")
return
# Ensure output directory exists
ensure_directory(args.output_dir)
# Check CUDA and set device once
device = check_cuda()
# Load YOLO model once
try:
model = YOLO("yolov11l.pt")
model.to(device)
print(f"YOLO model loaded on device: {device}")
except Exception as e:
print(f"Error loading YOLO model: {e}")
return
# Supported video extensions
video_extensions = ('.mp4', '.avi', '.mov', '.mkv')
# Iterate over video files in input directory
for filename in os.listdir(args.input_dir):
if filename.lower().endswith(video_extensions):
video_path = os.path.join(args.input_dir, filename)
print(f"Processing video: {video_path}")
process_video(video_path, args.output_dir, args.min_width, args.min_height, model, device)
if __name__ == "__main__":
main()