ishanprogs's picture
Update app.py
7d8bb7f verified
raw
history blame
20.2 kB
# app.py
import gradio as gr
import torch
import clip
from PIL import Image
import numpy as np
import os
import cv2
import gc # Garbage collector
import logging
import random # For annotator colors
import time # For timing checks
import traceback # For detailed error printing
# --- YOLOv8 Imports ---
from ultralytics import YOLO
from ultralytics.utils.plotting import Annotator # For drawing YOLO results
# --- Setup Logging ---
logging.getLogger("ultralytics").setLevel(logging.WARNING) # Reduce YOLO logging noise
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Constants ---
DAMAGE_CLASSES = ['Cracked', 'Scratch', 'Flaking', 'Broken part', 'Corrosion', 'Dent', 'Paint chip', 'Missing part']
NUM_DAMAGE_CLASSES = len(DAMAGE_CLASSES)
CAR_PART_CLASSES = [
"Quarter-panel", "Front-wheel", "Back-window", "Trunk", "Front-door",
"Rocker-panel", "Grille", "Windshield", "Front-window", "Back-door",
"Headlight", "Back-wheel", "Back-windshield", "Hood", "Fender",
"Tail-light", "License-plate", "Front-bumper", "Back-bumper", "Mirror", "Roof"
]
NUM_CAR_PART_CLASSES = len(CAR_PART_CLASSES)
CLIP_TEXT_FEATURES_PATH = "./clip_text_features.pt"
DAMAGE_MODEL_WEIGHTS_PATH = "./best.pt"
PART_MODEL_WEIGHTS_PATH = "./partdetection_yolobest.pt"
DEFAULT_DAMAGE_PRED_THRESHOLD = 0.4
DEFAULT_PART_PRED_THRESHOLD = 0.3
# --- Device Setup ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {DEVICE}")
# --- MODEL LOADING ---
print("--- Initializing Models ---")
clip_model, clip_preprocess, clip_text_features = None, None, None
damage_model, part_model = None, None
clip_load_error_msg, damage_load_error_msg, part_load_error_msg = None, None, None
try:
logger.info("Loading CLIP model (ViT-B/16)...")
clip_model, clip_preprocess = clip.load("ViT-B/16", device=DEVICE, jit=False)
clip_model.eval()
if not os.path.exists(CLIP_TEXT_FEATURES_PATH):
raise FileNotFoundError(f"CLIP text features not found: {CLIP_TEXT_FEATURES_PATH}.")
clip_text_features = torch.load(CLIP_TEXT_FEATURES_PATH, map_location=DEVICE)
logger.info(f"CLIP loaded (Text Features dtype: {clip_text_features.dtype}).")
except Exception as e:
clip_load_error_msg = f"CLIP load error: {e}"
logger.error(clip_load_error_msg, exc_info=True)
try:
logger.info(f"Loading Damage YOLOv8 model from {DAMAGE_MODEL_WEIGHTS_PATH}...")
if not os.path.exists(DAMAGE_MODEL_WEIGHTS_PATH):
raise FileNotFoundError(f"Damage model weights not found: {DAMAGE_MODEL_WEIGHTS_PATH}.")
damage_model = YOLO(DAMAGE_MODEL_WEIGHTS_PATH)
damage_model.to(DEVICE)
logger.info(f"Damage model task: {damage_model.task}")
if damage_model.task != 'segment':
damage_load_error_msg = f"CRITICAL ERROR: Damage model task is {damage_model.task}, not 'segment'. This model won't produce masks!"
logger.error(damage_load_error_msg)
damage_model = None # Invalidate model
else:
loaded_damage_names = list(damage_model.names.values())
if loaded_damage_names != DAMAGE_CLASSES:
logger.warning(f"Mismatch: Defined DAMAGE_CLASSES vs names in {DAMAGE_MODEL_WEIGHTS_PATH}")
DAMAGE_CLASSES = loaded_damage_names
logger.warning(f"Updated DAMAGE_CLASSES to: {DAMAGE_CLASSES}")
logger.info("Damage YOLOv8 model loaded.")
except Exception as e:
damage_load_error_msg = f"Damage YOLO load error: {e}"
logger.error(damage_load_error_msg, exc_info=True)
damage_model = None
try:
logger.info(f"Loading Part YOLOv8 model from {PART_MODEL_WEIGHTS_PATH}...")
if not os.path.exists(PART_MODEL_WEIGHTS_PATH):
raise FileNotFoundError(f"Part model weights not found: {PART_MODEL_WEIGHTS_PATH}.")
part_model = YOLO(PART_MODEL_WEIGHTS_PATH)
part_model.to(DEVICE)
logger.info(f"Part model task: {part_model.task}")
if part_model.task != 'segment':
part_load_error_msg = f"CRITICAL ERROR: Part model task is {part_model.task}, not 'segment'. This model won't produce masks!"
logger.error(part_load_error_msg)
part_model = None # Invalidate model
else:
loaded_part_names = list(part_model.names.values())
if loaded_part_names != CAR_PART_CLASSES:
logger.warning(f"Mismatch: Defined CAR_PART_CLASSES vs names in {PART_MODEL_WEIGHTS_PATH}")
CAR_PART_CLASSES = loaded_part_names
logger.warning(f"Updated CAR_PART_CLASSES to: {CAR_PART_CLASSES}")
logger.info("Part YOLOv8 model loaded.")
except Exception as e:
part_load_error_msg = f"Part YOLO load error: {e}"
logger.error(part_load_error_msg, exc_info=True)
part_model = None
print("--- Model loading process finished. ---")
print(f"CLIP STATUS: {clip_load_error_msg}" if clip_load_error_msg else "CLIP STATUS: Loaded OK.")
print(f"DAMAGE MODEL STATUS: {damage_load_error_msg}" if damage_load_error_msg else "DAMAGE MODEL STATUS: Loaded OK.")
print(f"PART MODEL STATUS: {part_load_error_msg}" if part_load_error_msg else "PART MODEL STATUS: Loaded OK.")
# --- Prediction Functions ---
def classify_image_clip(image_pil):
if clip_model is None:
return "Error: CLIP Model Not Loaded", {"Error": 1.0}
try:
if image_pil.mode != "RGB":
image_pil = image_pil.convert("RGB")
image_input = clip_preprocess(image_pil).unsqueeze(0).to(DEVICE)
with torch.no_grad():
image_features = clip_model.encode_image(image_input)
image_features /= image_features.norm(dim=-1, keepdim=True)
text_features_matched = clip_text_features
if image_features.dtype != clip_text_features.dtype:
text_features_matched = clip_text_features.to(image_features.dtype)
similarity = (image_features @ text_features_matched.T) * clip_model.logit_scale.exp()
probs = similarity.softmax(dim=-1).squeeze().cpu()
return ("Car" if probs[0] > probs[1] else "Not Car"), {"Car": f"{probs[0]:.3f}", "Not Car": f"{probs[1]:.3f}"}
except Exception as e:
logger.error(f"CLIP Error: {e}", exc_info=True)
return "Error: CLIP", {"Error": 1.0}
def process_car_image(image_np_bgr, damage_threshold, part_threshold):
if damage_model is None:
return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), f"Error: Damage model failed to load ({damage_load_error_msg})"
if part_model is None:
return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), f"Error: Part model failed to load ({part_load_error_msg})"
if damage_model.task != 'segment':
return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), "Error: Damage model is not a segmentation model."
if part_model.task != 'segment':
return cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB), "Error: Part model is not a segmentation model."
final_assignments = []
annotated_image_bgr = image_np_bgr.copy()
img_h, img_w = image_np_bgr.shape[:2]
logger.info("Starting combined YOLO processing...")
im_tensor_gpu_for_annotator = None
try:
# --- Prepare Image Tensor for Annotator ---
logger.info("Preparing image tensor for annotator...")
try:
if image_np_bgr.dtype != np.uint8:
logger.warning(f"Converting input image from {image_np_bgr.dtype} to uint8 for tensor creation.")
image_np_uint8 = image_np_bgr.astype(np.uint8)
else:
image_np_uint8 = image_np_bgr
# Create tensor in HWC format on the correct device
im_tensor_gpu_for_annotator = torch.from_numpy(image_np_uint8).to(DEVICE)
logger.info(f"Image tensor for annotator: shape={im_tensor_gpu_for_annotator.shape}, dtype={im_tensor_gpu_for_annotator.dtype}, device={im_tensor_gpu_for_annotator.device}")
except Exception as e_tensor:
logger.error(f"Could not create image tensor: {e_tensor}. Mask visualization will fail.", exc_info=True)
im_tensor_gpu_for_annotator = None # Set to None if conversion fails
# --- 1. Predict Damages ---
logger.info(f"Running Damage Segmentation (Threshold: {damage_threshold})...")
damage_results = damage_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=damage_threshold)
damage_result = damage_results[0]
logger.info(f"Found {len(damage_result.boxes)} potential damages.")
damage_masks_raw = damage_result.masks.data if damage_result.masks is not None else torch.empty((0,0,0), device=DEVICE)
if damage_result.masks is None:
logger.warning("No damage masks in result! Check if damage model is segmentation type.")
else:
logger.info(f"Damage masks available: shape={damage_masks_raw.shape if damage_masks_raw.numel() > 0 else 'Empty'}")
damage_classes_ids_cpu = damage_result.boxes.cls.cpu().numpy().astype(int) if damage_result.boxes is not None else np.array([])
damage_boxes_xyxy_cpu = damage_result.boxes.xyxy.cpu() if damage_result.boxes is not None else torch.empty((0,4))
# --- 2. Predict Parts ---
logger.info(f"Running Part Segmentation (Threshold: {part_threshold})...")
part_results = part_model.predict(image_np_bgr, verbose=False, device=DEVICE, conf=part_threshold)
part_result = part_results[0]
logger.info(f"Found {len(part_result.boxes)} potential parts.")
part_masks_raw = part_result.masks.data if part_result.masks is not None else torch.empty((0,0,0), device=DEVICE)
if part_result.masks is None:
logger.warning("No part masks in result! Check if part model is segmentation type.")
else:
logger.info(f"Part masks available: shape={part_masks_raw.shape if part_masks_raw.numel() > 0 else 'Empty'}")
part_classes_ids_cpu = part_result.boxes.cls.cpu().numpy().astype(int) if part_result.boxes is not None else np.array([])
part_boxes_xyxy_cpu = part_result.boxes.xyxy.cpu() if part_result.boxes is not None else torch.empty((0,4))
# --- 3. Resize Masks ---
def resize_masks(masks_tensor, target_h, target_w):
masks_np_bool = masks_tensor.cpu().numpy().astype(bool)
if masks_np_bool.shape[0] == 0 or (masks_np_bool.shape[1] == target_h and masks_np_bool.shape[2] == target_w):
return masks_np_bool
resized_masks_list = []
for i in range(masks_np_bool.shape[0]):
mask = masks_np_bool[i]
mask_resized = cv2.resize(mask.astype(np.uint8), (target_w, target_h), interpolation=cv2.INTER_NEAREST)
resized_masks_list.append(mask_resized.astype(bool))
return np.array(resized_masks_list)
damage_masks_np = resize_masks(damage_masks_raw, img_h, img_w)
part_masks_np = resize_masks(part_masks_raw, img_h, img_w)
# --- 4. Calculate Overlap ---
logger.info("Calculating overlap...")
if damage_masks_np.shape[0] > 0 and part_masks_np.shape[0] > 0:
overlap_threshold = 0.4
for i in range(len(damage_masks_np)):
damage_mask = damage_masks_np[i]
damage_class_id = damage_classes_ids_cpu[i]
try:
damage_name = DAMAGE_CLASSES[damage_class_id]
except IndexError:
logger.warning(f"Invalid damage ID {damage_class_id}")
continue
damage_area = np.sum(damage_mask)
if damage_area < 10:
continue
max_overlap = 0
assigned_part_name = "Unknown / Outside Parts"
for j in range(len(part_masks_np)):
part_mask = part_masks_np[j]
part_class_id = part_classes_ids_cpu[j]
try:
part_name = CAR_PART_CLASSES[part_class_id]
except IndexError:
logger.warning(f"Invalid part ID {part_class_id}")
continue
intersection = np.logical_and(damage_mask, part_mask)
overlap_ratio = np.sum(intersection) / damage_area if damage_area > 0 else 0
if overlap_ratio > max_overlap:
max_overlap = overlap_ratio
if max_overlap >= overlap_threshold:
assigned_part_name = part_name
assignment_desc = f"{damage_name} in {assigned_part_name}"
if assigned_part_name == "Unknown / Outside Parts":
assignment_desc += f" (Overlap < {overlap_threshold*100:.0f}%)"
final_assignments.append(assignment_desc)
elif damage_masks_np.shape[0] > 0:
final_assignments.append(f"{len(damage_masks_np)} damages found, but no parts detected/matched above threshold {part_threshold}.")
elif part_masks_np.shape[0] > 0:
final_assignments.append(f"No damages detected above threshold {damage_threshold}.")
else:
final_assignments.append(f"No damages or parts detected above thresholds.")
logger.info(f" Assignment results: {final_assignments}")
# --- 5. Visualization using YOLO Annotator ---
logger.info("Visualizing results...")
annotator = Annotator(annotated_image_bgr, line_width=2, example=CAR_PART_CLASSES)
# Draw PART masks
if part_masks_raw.numel() > 0 and im_tensor_gpu_for_annotator is not None:
try:
logger.info("Attempting to draw part masks...")
colors_part = [(0, random.randint(100, 200), 0) for _ in part_classes_ids_cpu]
mask_data_part = part_masks_raw
if mask_data_part.device != im_tensor_gpu_for_annotator.device:
mask_data_part = mask_data_part.to(im_tensor_gpu_for_annotator.device)
annotator.masks(mask_data_part, colors=colors_part, im_gpu=im_tensor_gpu_for_annotator, alpha=0.3)
logger.info("Successfully drew part masks.")
for box, cls_id in zip(part_boxes_xyxy_cpu, part_classes_ids_cpu):
try:
label = f"{CAR_PART_CLASSES[cls_id]}"
annotator.box_label(box, label=label, color=(0, 200, 0))
except IndexError:
logger.warning(f"Invalid part ID {cls_id} during drawing")
except Exception as e_part_vis:
logger.error(f"Error drawing part masks/boxes: {e_part_vis}", exc_info=True)
traceback.print_exc()
elif part_masks_raw.numel() > 0:
logger.warning("Part masks exist but image tensor for annotator is None. Skipping part mask drawing.")
# Draw DAMAGE masks
if damage_masks_raw.numel() > 0 and im_tensor_gpu_for_annotator is not None:
try:
logger.info("Attempting to draw damage masks...")
colors_dmg = [(random.randint(100, 200), 0, 0) for _ in damage_classes_ids_cpu]
mask_data_dmg = damage_masks_raw
logger.info(f"Damage mask data: shape={mask_data_dmg.shape}, dtype={mask_data_dmg.dtype}, device={mask_data_dmg.device}")
if mask_data_dmg.device != im_tensor_gpu_for_annotator.device:
logger.warning(f"Moving damage masks to match image tensor device ({im_tensor_gpu_for_annotator.device})")
mask_data_dmg = mask_data_dmg.to(im_tensor_gpu_for_annotator.device)
annotator.masks(mask_data_dmg, colors=colors_dmg, im_gpu=im_tensor_gpu_for_annotator, alpha=0.4)
logger.info("Successfully drew damage masks.")
for box, cls_id in zip(damage_boxes_xyxy_cpu, damage_classes_ids_cpu):
try:
label = f"{DAMAGE_CLASSES[cls_id]}"
annotator.box_label(box, label=label, color=(200, 0, 0))
except IndexError:
logger.warning(f"Invalid damage ID {cls_id} during drawing")
except Exception as e_dmg_vis:
logger.error(f"Error drawing damage masks/boxes: {e_dmg_vis}", exc_info=True)
traceback.print_exc()
elif damage_masks_raw.numel() > 0:
logger.warning("Damage masks exist but image tensor for annotator is None. Skipping damage mask drawing.")
annotated_image_bgr = annotator.result()
except Exception as e:
logger.error(f"Error during combined processing: {e}", exc_info=True)
traceback.print_exc()
final_assignments.append("Error during segmentation/processing.")
assignment_text = "\n".join(final_assignments) if final_assignments else "No damage assignments generated."
final_output_image_rgb = cv2.cvtColor(annotated_image_bgr, cv2.COLOR_BGR2RGB)
return final_output_image_rgb, assignment_text
# --- Main Gradio Function ---
def predict_pipeline(image_np_input, damage_thresh, part_thresh):
if image_np_input is None:
return "Please upload an image.", {}, None, "N/A"
logger.info(f"--- New Request (Damage Thr: {damage_thresh:.2f}, Part Thr: {part_thresh:.2f}) ---")
start_time = time.time()
image_np_bgr = cv2.cvtColor(image_np_input, cv2.COLOR_RGB2BGR)
image_pil = Image.fromarray(image_np_input)
final_output_image, assignment_text, classification_result, probabilities = None, "Processing...", "Error", {}
try:
classification_result, probabilities = classify_image_clip(image_pil)
except Exception as e:
logger.error(f"CLIP Error: {e}", exc_info=True)
assignment_text = f"CLIP Error: {e}"
final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB)
if classification_result == "Car":
try:
final_output_image, assignment_text = process_car_image(image_np_bgr, damage_thresh, part_thresh)
except Exception as e:
logger.error(f"Seg/Assign Error: {e}", exc_info=True)
assignment_text = f"Seg Error: {e}"
final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB)
elif classification_result == "Not Car":
final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB)
assignment_text = "Image classified as Not Car."
elif final_output_image is None:
final_output_image = cv2.cvtColor(image_np_bgr, cv2.COLOR_BGR2RGB)
assignment_text = "Error during classification."
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info(f"Total processing time: {time.time() - start_time:.2f}s.")
return classification_result, probabilities, final_output_image, assignment_text
# --- Gradio Interface ---
logger.info("Setting up Gradio interface...")
title = "🚗 Car Damage Detection"
description = "1. Upload... 2. Classify... 3. Segment... 4. Assign... 5. Output..." # Shortened
input_image = gr.Image(type="numpy", label="Upload Car Image")
damage_threshold_slider = gr.Slider(minimum=0.05, maximum=0.95, step=0.05, value=DEFAULT_DAMAGE_PRED_THRESHOLD, label="Damage Confidence Threshold")
part_threshold_slider = gr.Slider(minimum=0.05, maximum=0.95, step=0.05, value=DEFAULT_PART_PRED_THRESHOLD, label="Part Confidence Threshold")
output_classification = gr.Textbox(label="1. Classification Result")
output_probabilities = gr.Label(label="Classification Probabilities")
output_image_display = gr.Image(type="numpy", label="3. Segmentation Visualization")
output_assignment = gr.Textbox(label="2. Damage Assignments", lines=5, interactive=False)
iface = gr.Interface(
fn=predict_pipeline,
inputs=[input_image, damage_threshold_slider, part_threshold_slider],
outputs=[output_classification, output_probabilities, output_image_display, output_assignment],
title=title,
description=description,
allow_flagging="never"
)
if __name__ == "__main__":
logger.info("Launching Gradio app...")
iface.launch()