|
|
|
|
|
|
|
|
import argparse
|
|
|
import os
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
from ultralytics import YOLO
|
|
|
from scenedetect import open_video, SceneManager, ContentDetector
|
|
|
import torch
|
|
|
|
|
|
def parse_arguments():
|
|
|
"""Parse command-line arguments."""
|
|
|
parser = argparse.ArgumentParser(
|
|
|
description="Detect full faces in videos and capture 15-second video clips on scene changes.",
|
|
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--input-dir", "-I",
|
|
|
required=True,
|
|
|
help="Directory containing input video files."
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--output-dir", "-O",
|
|
|
required=True,
|
|
|
help="Directory to save video clip outputs."
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--min-width", "-w",
|
|
|
type=int,
|
|
|
default=200,
|
|
|
help="Minimum width of face bounding box to trigger capture."
|
|
|
)
|
|
|
parser.add_argument(
|
|
|
"--min-height", "-m",
|
|
|
type=int,
|
|
|
default=200,
|
|
|
help="Minimum height of face bounding box to trigger capture."
|
|
|
)
|
|
|
return parser.parse_args()
|
|
|
|
|
|
def ensure_directory(directory):
|
|
|
"""Create directory if it doesn't exist."""
|
|
|
if not os.path.exists(directory):
|
|
|
os.makedirs(directory)
|
|
|
|
|
|
def check_cuda():
|
|
|
"""Check CUDA availability and return device."""
|
|
|
if torch.cuda.is_available():
|
|
|
device = torch.device("cuda")
|
|
|
print(f"CUDA is available! Using GPU: {torch.cuda.get_device_name(0)}")
|
|
|
print(f"CUDA version: {torch.version.cuda}")
|
|
|
print(f"Number of GPUs: {torch.cuda.device_count()}")
|
|
|
else:
|
|
|
device = torch.device("cpu")
|
|
|
print("CUDA is not available. Falling back to CPU.")
|
|
|
return device
|
|
|
|
|
|
def is_full_face(box, frame_shape, min_width, min_height, min_proportion=0.1):
|
|
|
"""Check if the bounding box represents a full face within the frame."""
|
|
|
x1, y1, x2, y2 = box
|
|
|
frame_height, frame_width = frame_shape[:2]
|
|
|
|
|
|
|
|
|
if x1 <= 0 or y1 <= 0 or x2 >= frame_width or y2 >= frame_height:
|
|
|
return False
|
|
|
|
|
|
|
|
|
width = x2 - x1
|
|
|
height = y2 - y1
|
|
|
if width < min_width or height < min_height:
|
|
|
return False
|
|
|
|
|
|
|
|
|
if width < frame_width * min_proportion or height < frame_height * min_proportion:
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
def process_video(video_path, output_dir, min_width, min_height, model, device):
|
|
|
"""Process a single video for face detection and capture 15-second video clips."""
|
|
|
|
|
|
try:
|
|
|
video = open_video(video_path)
|
|
|
scene_manager = SceneManager()
|
|
|
scene_manager.add_detector(ContentDetector(threshold=30.0))
|
|
|
except Exception as e:
|
|
|
print(f"Error initializing video for scene detection in {video_path}: {e}")
|
|
|
return
|
|
|
|
|
|
|
|
|
cap = cv2.VideoCapture(video_path)
|
|
|
if not cap.isOpened():
|
|
|
print(f"Error opening video file {video_path}")
|
|
|
return
|
|
|
|
|
|
fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
if fps <= 0:
|
|
|
print(f"Invalid FPS for {video_path}. Skipping.")
|
|
|
cap.release()
|
|
|
return
|
|
|
|
|
|
|
|
|
num_frames = int(fps * 15)
|
|
|
|
|
|
|
|
|
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
|
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
if frame_height == 0:
|
|
|
print(f"Invalid frame height for {video_path}. Skipping.")
|
|
|
cap.release()
|
|
|
return
|
|
|
|
|
|
|
|
|
scale = 480 / frame_height
|
|
|
new_width = int(frame_width * scale)
|
|
|
new_height = 480
|
|
|
|
|
|
|
|
|
try:
|
|
|
scene_manager.detect_scenes(video=video)
|
|
|
scene_list = scene_manager.get_scene_list()
|
|
|
scene_starts = [scene[0].get_frames() for scene in scene_list]
|
|
|
except Exception as e:
|
|
|
print(f"Error detecting scenes in {video_path}: {e}")
|
|
|
cap.release()
|
|
|
return
|
|
|
|
|
|
scene_index = 0
|
|
|
face_detected_in_scene = False
|
|
|
frame_idx = 0
|
|
|
output_count = 0
|
|
|
video_name = os.path.splitext(os.path.basename(video_path))[0]
|
|
|
|
|
|
while cap.isOpened():
|
|
|
ret, frame = cap.read()
|
|
|
if not ret:
|
|
|
break
|
|
|
|
|
|
|
|
|
if scene_index < len(scene_starts) and frame_idx >= scene_starts[scene_index]:
|
|
|
face_detected_in_scene = False
|
|
|
scene_index += 1
|
|
|
print(f"New scene detected at frame {frame_idx}")
|
|
|
|
|
|
|
|
|
if not face_detected_in_scene:
|
|
|
try:
|
|
|
results = model.predict(frame, classes=[0], conf=0.75, device=device)
|
|
|
|
|
|
for result in results:
|
|
|
boxes = result.boxes.xyxy.cpu().numpy()
|
|
|
confidences = result.boxes.conf.cpu().numpy()
|
|
|
classes = result.boxes.cls.cpu().numpy()
|
|
|
|
|
|
for box, conf, cls in zip(boxes, confidences, classes):
|
|
|
if cls == 0:
|
|
|
if is_full_face(box, frame.shape, min_width, min_height):
|
|
|
|
|
|
output_path = os.path.join(output_dir, f"{video_name}_face_{output_count:04d}.mp4")
|
|
|
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
|
|
out = cv2.VideoWriter(output_path, fourcc, fps, (new_width, new_height))
|
|
|
if not out.isOpened():
|
|
|
print(f"Error initializing VideoWriter for {output_path}")
|
|
|
break
|
|
|
|
|
|
|
|
|
frames_captured = 0
|
|
|
start_frame_idx = frame_idx
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame_idx)
|
|
|
|
|
|
while frames_captured < num_frames:
|
|
|
ret, frame = cap.read()
|
|
|
if not ret:
|
|
|
print(f"Warning: Clip at frame {start_frame_idx} in {video_path} is shorter than 15 seconds ({frames_captured/fps:.2f} seconds)")
|
|
|
break
|
|
|
|
|
|
|
|
|
scaled_frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA)
|
|
|
out.write(scaled_frame)
|
|
|
frames_captured += 1
|
|
|
frame_idx += 1
|
|
|
|
|
|
out.release()
|
|
|
print(f"Saved video clip: {output_path} ({frames_captured/fps:.2f} seconds)")
|
|
|
output_count += 1
|
|
|
face_detected_in_scene = True
|
|
|
|
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame_idx + frames_captured)
|
|
|
break
|
|
|
if face_detected_in_scene:
|
|
|
break
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error during face detection in {video_path}: {e}")
|
|
|
|
|
|
else:
|
|
|
frame_idx += 1
|
|
|
|
|
|
cap.release()
|
|
|
print(f"Processed {video_path}: {output_count} video clips saved.")
|
|
|
|
|
|
def main():
|
|
|
"""Main function to process videos in input directory."""
|
|
|
args = parse_arguments()
|
|
|
|
|
|
|
|
|
if not os.path.isdir(args.input_dir):
|
|
|
print(f"Error: Input directory '{args.input_dir}' does not exist.")
|
|
|
return
|
|
|
|
|
|
|
|
|
ensure_directory(args.output_dir)
|
|
|
|
|
|
|
|
|
device = check_cuda()
|
|
|
|
|
|
|
|
|
try:
|
|
|
model = YOLO("yolov11l.pt")
|
|
|
model.to(device)
|
|
|
print(f"YOLO model loaded on device: {device}")
|
|
|
except Exception as e:
|
|
|
print(f"Error loading YOLO model: {e}")
|
|
|
return
|
|
|
|
|
|
|
|
|
video_extensions = ('.mp4', '.avi', '.mov', '.mkv')
|
|
|
|
|
|
|
|
|
for filename in os.listdir(args.input_dir):
|
|
|
if filename.lower().endswith(video_extensions):
|
|
|
video_path = os.path.join(args.input_dir, filename)
|
|
|
print(f"Processing video: {video_path}")
|
|
|
process_video(video_path, args.output_dir, args.min_width, args.min_height, model, device)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |