SmartVision_AI / scripts /02_resnet50.py
yogesh-venkat's picture
Auto-deploy from GitHub Actions
08d66df verified
raw
history blame
13.9 kB
# ============================================================
# SMARTVISION AI - MODEL 2 (v2): ResNet50 (STRONG BASELINE)
# with manual label smoothing (Keras 3 compatible)
# ============================================================
import os
import time
import json
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.metrics import (
precision_recall_fscore_support,
confusion_matrix,
classification_report,
)
print("TensorFlow version:", tf.__version__)
# ------------------------------------------------------------
# 1. CONFIGURATION
# ------------------------------------------------------------
BASE_DIR = "smartvision_dataset"
CLASS_DIR = os.path.join(BASE_DIR, "classification")
TRAIN_DIR = os.path.join(CLASS_DIR, "train")
VAL_DIR = os.path.join(CLASS_DIR, "val")
TEST_DIR = os.path.join(CLASS_DIR, "test")
IMG_SIZE = (224, 224)
BATCH_SIZE = 32
NUM_CLASSES = 25
MODELS_DIR = "saved_models"
METRICS_DIR = "smartvision_metrics"
os.makedirs(MODELS_DIR, exist_ok=True)
os.makedirs(METRICS_DIR, exist_ok=True)
print("Train dir:", TRAIN_DIR)
print("Val dir :", VAL_DIR)
print("Test dir :", TEST_DIR)
# ------------------------------------------------------------
# 2. LOAD DATASETS
# ------------------------------------------------------------
train_ds = tf.keras.utils.image_dataset_from_directory(
TRAIN_DIR,
image_size=IMG_SIZE,
batch_size=BATCH_SIZE,
shuffle=True,
)
val_ds = tf.keras.utils.image_dataset_from_directory(
VAL_DIR,
image_size=IMG_SIZE,
batch_size=BATCH_SIZE,
shuffle=False,
)
test_ds = tf.keras.utils.image_dataset_from_directory(
TEST_DIR,
image_size=IMG_SIZE,
batch_size=BATCH_SIZE,
shuffle=False,
)
class_names = train_ds.class_names
print("Detected classes:", class_names)
print("Number of classes:", len(class_names))
AUTOTUNE = tf.data.AUTOTUNE
train_ds = train_ds.prefetch(AUTOTUNE)
val_ds = val_ds.prefetch(AUTOTUNE)
test_ds = test_ds.prefetch(AUTOTUNE)
# ------------------------------------------------------------
# 3. DATA AUGMENTATION
# ------------------------------------------------------------
data_augmentation = keras.Sequential(
[
layers.RandomFlip("horizontal"),
layers.RandomRotation(0.04), # ~±15°
layers.RandomZoom(0.1),
layers.RandomContrast(0.15),
layers.Lambda(
lambda x: tf.image.random_brightness(x, max_delta=0.15)
),
layers.Lambda(
lambda x: tf.image.random_saturation(x, 0.85, 1.15)
),
],
name="data_augmentation",
)
# NOTE: We will use ResNet50's preprocess_input, so we do NOT rescale 1./255 here.
# ------------------------------------------------------------
# 4. BUILD RESNET50 MODEL
# ------------------------------------------------------------
def build_resnet50_model_v2():
"""
Returns:
model : full ResNet50 classification model
base_model : the ResNet50 backbone for fine-tuning
"""
inputs = keras.Input(shape=(*IMG_SIZE, 3), name="input_layer")
# Augmentation (train-time only)
x = data_augmentation(inputs)
# ResNet50-specific preprocessing
x = layers.Lambda(
keras.applications.resnet50.preprocess_input,
name="resnet50_preprocess",
)(x)
# Pretrained ResNet50 backbone
base_model = keras.applications.ResNet50(
include_top=False,
weights="imagenet",
input_shape=(*IMG_SIZE, 3),
)
x = base_model(x)
# Custom classification head
x = layers.GlobalAveragePooling2D(name="global_average_pooling2d")(x)
x = layers.BatchNormalization(name="head_batchnorm")(x)
x = layers.Dropout(0.4, name="head_dropout")(x)
x = layers.Dense(
256,
activation="relu",
name="head_dense",
)(x)
x = layers.BatchNormalization(name="head_batchnorm_2")(x)
x = layers.Dropout(0.5, name="head_dropout_2")(x)
outputs = layers.Dense(
NUM_CLASSES,
activation="softmax",
name="predictions",
)(x)
model = keras.Model(
inputs=inputs,
outputs=outputs,
name="ResNet50_smartvision_v2",
)
return model, base_model
resnet_model, resnet_base = build_resnet50_model_v2()
resnet_model.summary()
# ------------------------------------------------------------
# 5. CUSTOM LOSS WITH LABEL SMOOTHING
# ------------------------------------------------------------
def make_sparse_ce_with_label_smoothing(num_classes, label_smoothing=0.1):
"""
Implements sparse categorical crossentropy with manual label smoothing.
Works even if Keras' SparseCategoricalCrossentropy doesn't have label_smoothing arg.
"""
ls = float(label_smoothing)
nc = int(num_classes)
def loss_fn(y_true, y_pred):
# y_true: integer labels, shape (batch,)
y_true = tf.cast(y_true, tf.int32)
y_true_oh = tf.one_hot(y_true, depth=nc)
if ls > 0.0:
smooth = ls
y_true_oh = (1.0 - smooth) * y_true_oh + smooth / tf.cast(
nc, tf.float32
)
# y_pred is softmax probabilities
return tf.keras.losses.categorical_crossentropy(
y_true_oh, y_pred, from_logits=False
)
return loss_fn
# ------------------------------------------------------------
# 6. TRAINING UTILITY
# ------------------------------------------------------------
def compile_and_train(
model,
model_name: str,
train_ds,
val_ds,
epochs: int,
lr: float,
model_tag: str,
patience_es: int = 5,
patience_rlr: int = 2,
):
"""
Compile and train model, saving best weights by val_accuracy.
model_name: e.g. 'resnet50_v2'
model_tag : e.g. 'stage1', 'stage2'
"""
print(f"\n===== {model_tag}: Training {model_name} =====")
optimizer = keras.optimizers.Adam(learning_rate=lr)
# Use custom loss with label smoothing
loss_fn = make_sparse_ce_with_label_smoothing(
num_classes=NUM_CLASSES,
label_smoothing=0.1,
)
model.compile(
optimizer=optimizer,
loss=loss_fn,
metrics=["accuracy"],
)
# Keras 3: when save_weights_only=True, must end with ".weights.h5"
best_weights_path = os.path.join(
MODELS_DIR, f"{model_name}_{model_tag}_best.weights.h5"
)
callbacks = [
keras.callbacks.ModelCheckpoint(
filepath=best_weights_path,
monitor="val_accuracy",
save_best_only=True,
save_weights_only=True, # ✅ weights-only: avoids architecture issues
mode="max",
verbose=1,
),
keras.callbacks.EarlyStopping(
monitor="val_accuracy",
patience=patience_es,
restore_best_weights=True,
verbose=1,
),
keras.callbacks.ReduceLROnPlateau(
monitor="val_loss",
factor=0.5,
patience=patience_rlr,
min_lr=1e-6,
verbose=1,
),
]
history = model.fit(
train_ds,
validation_data=val_ds,
epochs=epochs,
callbacks=callbacks,
)
return history, best_weights_path
# ------------------------------------------------------------
# 7. STAGE 1: TRAIN HEAD WITH FROZEN RESNET BASE
# ------------------------------------------------------------
print("\n===== STAGE 1: Training head with frozen ResNet50 base =====")
# Freeze entire backbone for Stage 1
resnet_base.trainable = False
epochs_stage1 = 15
lr_stage1 = 1e-3
history_stage1, resnet_stage1_best = compile_and_train(
resnet_model,
model_name="resnet50_v2",
train_ds=train_ds,
val_ds=val_ds,
epochs=epochs_stage1,
lr=lr_stage1,
model_tag="stage1",
patience_es=5,
patience_rlr=2,
)
print("Stage 1 best weights saved at:", resnet_stage1_best)
# ------------------------------------------------------------
# 8. STAGE 2: DEEPER FINE-TUNING OF RESNET BASE
# ------------------------------------------------------------
print("\n===== STAGE 2: Fine-tuning last layers of ResNet50 base =====")
# Load Stage 1 best weights before fine-tuning
resnet_model.load_weights(resnet_stage1_best)
# Enable deeper fine-tuning on the backbone
resnet_base.trainable = True
print("Base model name:", resnet_base.name)
print("Base model has", len(resnet_base.layers), "layers.")
# Unfreeze last N layers of the backbone
num_unfreeze = 40 # you can tune 30–50
for layer in resnet_base.layers[:-num_unfreeze]:
layer.trainable = False
# Keep BatchNorm layers frozen for stability
for layer in resnet_base.layers[-num_unfreeze:]:
if isinstance(layer, layers.BatchNormalization):
layer.trainable = False
trainable_count = int(np.sum([l.trainable for l in resnet_model.layers]))
print("Total trainable layers in full model after unfreezing:", trainable_count)
epochs_stage2 = 30
lr_stage2 = 5e-6 # small LR for safe fine-tuning
history_stage2, resnet_stage2_best = compile_and_train(
resnet_model,
model_name="resnet50_v2",
train_ds=train_ds,
val_ds=val_ds,
epochs=epochs_stage2,
lr=lr_stage2,
model_tag="stage2",
patience_es=8,
patience_rlr=3,
)
print("Stage 2 best weights saved at:", resnet_stage2_best)
# ------------------------------------------------------------
# 9. EVALUATION + SAVE METRICS & CONFUSION MATRIX
# ------------------------------------------------------------
def evaluate_and_save(model, save_name, best_weights_path, test_ds, class_names):
"""
save_name: e.g. 'resnet50_v2_stage1', 'resnet50_v2_stage2'
"""
print(f"\n===== EVALUATING {save_name.upper()} ON TEST SET =====")
# Load best weights
model.load_weights(best_weights_path)
print(f"Loaded best weights from {best_weights_path}")
y_true = []
y_pred = []
all_probs = []
total_time = 0.0
total_images = 0
for images, labels in test_ds:
images_np = images.numpy()
bs = images_np.shape[0]
start = time.perf_counter()
probs = model.predict(images_np, verbose=0)
end = time.perf_counter()
total_time += (end - start)
total_images += bs
preds = np.argmax(probs, axis=1)
y_true.extend(labels.numpy())
y_pred.extend(preds)
all_probs.append(probs)
y_true = np.array(y_true)
y_pred = np.array(y_pred)
all_probs = np.concatenate(all_probs, axis=0)
# Basic metrics
accuracy = float((y_true == y_pred).mean())
precision, recall, f1, _ = precision_recall_fscore_support(
y_true, y_pred, average="weighted", zero_division=0
)
# Top-5 accuracy
top5_correct = 0
for i, label in enumerate(y_true):
if label in np.argsort(all_probs[i])[-5:]:
top5_correct += 1
top5_acc = top5_correct / len(y_true)
# Inference time
time_per_image = total_time / total_images
images_per_second = 1.0 / time_per_image if time_per_image > 0 else 0.0
# Model size (weights only)
temp_w = os.path.join(MODELS_DIR, f"{save_name}_temp_for_size.weights.h5")
model.save_weights(temp_w)
size_mb = os.path.getsize(temp_w) / (1024 * 1024)
os.remove(temp_w)
# Confusion matrix
cm = confusion_matrix(y_true, y_pred)
print("\nClassification Report:")
print(
classification_report(
y_true,
y_pred,
target_names=class_names,
zero_division=0,
)
)
print(f"Test Accuracy : {accuracy:.4f}")
print(f"Weighted Precision : {precision:.4f}")
print(f"Weighted Recall : {recall:.4f}")
print(f"Weighted F1-score : {f1:.4f}")
print(f"Top-5 Accuracy : {top5_acc:.4f}")
print(f"Avg time per image : {time_per_image*1000:.2f} ms")
print(f"Images per second : {images_per_second:.2f}")
print(f"Model size (weights) : {size_mb:.2f} MB")
print(f"Num parameters : {model.count_params()}")
# Save metrics + confusion matrix
save_dir = os.path.join(METRICS_DIR, save_name)
os.makedirs(save_dir, exist_ok=True)
metrics = {
"model_name": save_name,
"accuracy": accuracy,
"precision_weighted": float(precision),
"recall_weighted": float(recall),
"f1_weighted": float(f1),
"top5_accuracy": float(top5_acc),
"avg_inference_time_sec": float(time_per_image),
"images_per_second": float(images_per_second),
"model_size_mb": float(size_mb),
"num_parameters": int(model.count_params()),
}
metrics_path = os.path.join(save_dir, "metrics.json")
cm_path = os.path.join(save_dir, "confusion_matrix.npy")
with open(metrics_path, "w") as f:
json.dump(metrics, f, indent=2)
np.save(cm_path, cm)
print(f"\nSaved metrics to : {metrics_path}")
print(f"Saved confusion matrix to: {cm_path}")
return metrics, cm
# ---- Evaluate Stage 1 ----
resnet_stage1_metrics, resnet_stage1_cm = evaluate_and_save(
resnet_model,
save_name="resnet50_v2_stage1",
best_weights_path=resnet_stage1_best,
test_ds=test_ds,
class_names=class_names,
)
# ---- Evaluate Stage 2 ----
resnet_stage2_metrics, resnet_stage2_cm = evaluate_and_save(
resnet_model,
save_name="resnet50_v2_stage2",
best_weights_path=resnet_stage2_best,
test_ds=test_ds,
class_names=class_names,
)
# ------------------------------------------------------------
# 10. SUMMARY
# ------------------------------------------------------------
print("\n===== SUMMARY: RESNET50 v2 STAGES COMPARISON =====")
print("Stage 1 Test Accuracy:", resnet_stage1_metrics["accuracy"])
print("Stage 2 Test Accuracy:", resnet_stage2_metrics["accuracy"])
print("✅ RESNET50 v2 pipeline complete.")