# ============================================================ # SMARTVISION AI - MODEL 2 (v2): ResNet50 (STRONG BASELINE) # with manual label smoothing (Keras 3 compatible) # ============================================================ import os import time import json import numpy as np import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers from sklearn.metrics import ( precision_recall_fscore_support, confusion_matrix, classification_report, ) print("TensorFlow version:", tf.__version__) # ------------------------------------------------------------ # 1. CONFIGURATION # ------------------------------------------------------------ BASE_DIR = "smartvision_dataset" CLASS_DIR = os.path.join(BASE_DIR, "classification") TRAIN_DIR = os.path.join(CLASS_DIR, "train") VAL_DIR = os.path.join(CLASS_DIR, "val") TEST_DIR = os.path.join(CLASS_DIR, "test") IMG_SIZE = (224, 224) BATCH_SIZE = 32 NUM_CLASSES = 25 MODELS_DIR = "saved_models" METRICS_DIR = "smartvision_metrics" os.makedirs(MODELS_DIR, exist_ok=True) os.makedirs(METRICS_DIR, exist_ok=True) print("Train dir:", TRAIN_DIR) print("Val dir :", VAL_DIR) print("Test dir :", TEST_DIR) # ------------------------------------------------------------ # 2. LOAD DATASETS # ------------------------------------------------------------ train_ds = tf.keras.utils.image_dataset_from_directory( TRAIN_DIR, image_size=IMG_SIZE, batch_size=BATCH_SIZE, shuffle=True, ) val_ds = tf.keras.utils.image_dataset_from_directory( VAL_DIR, image_size=IMG_SIZE, batch_size=BATCH_SIZE, shuffle=False, ) test_ds = tf.keras.utils.image_dataset_from_directory( TEST_DIR, image_size=IMG_SIZE, batch_size=BATCH_SIZE, shuffle=False, ) class_names = train_ds.class_names print("Detected classes:", class_names) print("Number of classes:", len(class_names)) AUTOTUNE = tf.data.AUTOTUNE train_ds = train_ds.prefetch(AUTOTUNE) val_ds = val_ds.prefetch(AUTOTUNE) test_ds = test_ds.prefetch(AUTOTUNE) # ------------------------------------------------------------ # 3. DATA AUGMENTATION # ------------------------------------------------------------ data_augmentation = keras.Sequential( [ layers.RandomFlip("horizontal"), layers.RandomRotation(0.04), # ~±15° layers.RandomZoom(0.1), layers.RandomContrast(0.15), layers.Lambda( lambda x: tf.image.random_brightness(x, max_delta=0.15) ), layers.Lambda( lambda x: tf.image.random_saturation(x, 0.85, 1.15) ), ], name="data_augmentation", ) # NOTE: We will use ResNet50's preprocess_input, so we do NOT rescale 1./255 here. # ------------------------------------------------------------ # 4. BUILD RESNET50 MODEL # ------------------------------------------------------------ def build_resnet50_model_v2(): """ Returns: model : full ResNet50 classification model base_model : the ResNet50 backbone for fine-tuning """ inputs = keras.Input(shape=(*IMG_SIZE, 3), name="input_layer") # Augmentation (train-time only) x = data_augmentation(inputs) # ResNet50-specific preprocessing x = layers.Lambda( keras.applications.resnet50.preprocess_input, name="resnet50_preprocess", )(x) # Pretrained ResNet50 backbone base_model = keras.applications.ResNet50( include_top=False, weights="imagenet", input_shape=(*IMG_SIZE, 3), ) x = base_model(x) # Custom classification head x = layers.GlobalAveragePooling2D(name="global_average_pooling2d")(x) x = layers.BatchNormalization(name="head_batchnorm")(x) x = layers.Dropout(0.4, name="head_dropout")(x) x = layers.Dense( 256, activation="relu", name="head_dense", )(x) x = layers.BatchNormalization(name="head_batchnorm_2")(x) x = layers.Dropout(0.5, name="head_dropout_2")(x) outputs = layers.Dense( NUM_CLASSES, activation="softmax", name="predictions", )(x) model = keras.Model( inputs=inputs, outputs=outputs, name="ResNet50_smartvision_v2", ) return model, base_model resnet_model, resnet_base = build_resnet50_model_v2() resnet_model.summary() # ------------------------------------------------------------ # 5. CUSTOM LOSS WITH LABEL SMOOTHING # ------------------------------------------------------------ def make_sparse_ce_with_label_smoothing(num_classes, label_smoothing=0.1): """ Implements sparse categorical crossentropy with manual label smoothing. Works even if Keras' SparseCategoricalCrossentropy doesn't have label_smoothing arg. """ ls = float(label_smoothing) nc = int(num_classes) def loss_fn(y_true, y_pred): # y_true: integer labels, shape (batch,) y_true = tf.cast(y_true, tf.int32) y_true_oh = tf.one_hot(y_true, depth=nc) if ls > 0.0: smooth = ls y_true_oh = (1.0 - smooth) * y_true_oh + smooth / tf.cast( nc, tf.float32 ) # y_pred is softmax probabilities return tf.keras.losses.categorical_crossentropy( y_true_oh, y_pred, from_logits=False ) return loss_fn # ------------------------------------------------------------ # 6. TRAINING UTILITY # ------------------------------------------------------------ def compile_and_train( model, model_name: str, train_ds, val_ds, epochs: int, lr: float, model_tag: str, patience_es: int = 5, patience_rlr: int = 2, ): """ Compile and train model, saving best weights by val_accuracy. model_name: e.g. 'resnet50_v2' model_tag : e.g. 'stage1', 'stage2' """ print(f"\n===== {model_tag}: Training {model_name} =====") optimizer = keras.optimizers.Adam(learning_rate=lr) # Use custom loss with label smoothing loss_fn = make_sparse_ce_with_label_smoothing( num_classes=NUM_CLASSES, label_smoothing=0.1, ) model.compile( optimizer=optimizer, loss=loss_fn, metrics=["accuracy"], ) # Keras 3: when save_weights_only=True, must end with ".weights.h5" best_weights_path = os.path.join( MODELS_DIR, f"{model_name}_{model_tag}_best.weights.h5" ) callbacks = [ keras.callbacks.ModelCheckpoint( filepath=best_weights_path, monitor="val_accuracy", save_best_only=True, save_weights_only=True, # ✅ weights-only: avoids architecture issues mode="max", verbose=1, ), keras.callbacks.EarlyStopping( monitor="val_accuracy", patience=patience_es, restore_best_weights=True, verbose=1, ), keras.callbacks.ReduceLROnPlateau( monitor="val_loss", factor=0.5, patience=patience_rlr, min_lr=1e-6, verbose=1, ), ] history = model.fit( train_ds, validation_data=val_ds, epochs=epochs, callbacks=callbacks, ) return history, best_weights_path # ------------------------------------------------------------ # 7. STAGE 1: TRAIN HEAD WITH FROZEN RESNET BASE # ------------------------------------------------------------ print("\n===== STAGE 1: Training head with frozen ResNet50 base =====") # Freeze entire backbone for Stage 1 resnet_base.trainable = False epochs_stage1 = 15 lr_stage1 = 1e-3 history_stage1, resnet_stage1_best = compile_and_train( resnet_model, model_name="resnet50_v2", train_ds=train_ds, val_ds=val_ds, epochs=epochs_stage1, lr=lr_stage1, model_tag="stage1", patience_es=5, patience_rlr=2, ) print("Stage 1 best weights saved at:", resnet_stage1_best) # ------------------------------------------------------------ # 8. STAGE 2: DEEPER FINE-TUNING OF RESNET BASE # ------------------------------------------------------------ print("\n===== STAGE 2: Fine-tuning last layers of ResNet50 base =====") # Load Stage 1 best weights before fine-tuning resnet_model.load_weights(resnet_stage1_best) # Enable deeper fine-tuning on the backbone resnet_base.trainable = True print("Base model name:", resnet_base.name) print("Base model has", len(resnet_base.layers), "layers.") # Unfreeze last N layers of the backbone num_unfreeze = 40 # you can tune 30–50 for layer in resnet_base.layers[:-num_unfreeze]: layer.trainable = False # Keep BatchNorm layers frozen for stability for layer in resnet_base.layers[-num_unfreeze:]: if isinstance(layer, layers.BatchNormalization): layer.trainable = False trainable_count = int(np.sum([l.trainable for l in resnet_model.layers])) print("Total trainable layers in full model after unfreezing:", trainable_count) epochs_stage2 = 30 lr_stage2 = 5e-6 # small LR for safe fine-tuning history_stage2, resnet_stage2_best = compile_and_train( resnet_model, model_name="resnet50_v2", train_ds=train_ds, val_ds=val_ds, epochs=epochs_stage2, lr=lr_stage2, model_tag="stage2", patience_es=8, patience_rlr=3, ) print("Stage 2 best weights saved at:", resnet_stage2_best) # ------------------------------------------------------------ # 9. EVALUATION + SAVE METRICS & CONFUSION MATRIX # ------------------------------------------------------------ def evaluate_and_save(model, save_name, best_weights_path, test_ds, class_names): """ save_name: e.g. 'resnet50_v2_stage1', 'resnet50_v2_stage2' """ print(f"\n===== EVALUATING {save_name.upper()} ON TEST SET =====") # Load best weights model.load_weights(best_weights_path) print(f"Loaded best weights from {best_weights_path}") y_true = [] y_pred = [] all_probs = [] total_time = 0.0 total_images = 0 for images, labels in test_ds: images_np = images.numpy() bs = images_np.shape[0] start = time.perf_counter() probs = model.predict(images_np, verbose=0) end = time.perf_counter() total_time += (end - start) total_images += bs preds = np.argmax(probs, axis=1) y_true.extend(labels.numpy()) y_pred.extend(preds) all_probs.append(probs) y_true = np.array(y_true) y_pred = np.array(y_pred) all_probs = np.concatenate(all_probs, axis=0) # Basic metrics accuracy = float((y_true == y_pred).mean()) precision, recall, f1, _ = precision_recall_fscore_support( y_true, y_pred, average="weighted", zero_division=0 ) # Top-5 accuracy top5_correct = 0 for i, label in enumerate(y_true): if label in np.argsort(all_probs[i])[-5:]: top5_correct += 1 top5_acc = top5_correct / len(y_true) # Inference time time_per_image = total_time / total_images images_per_second = 1.0 / time_per_image if time_per_image > 0 else 0.0 # Model size (weights only) temp_w = os.path.join(MODELS_DIR, f"{save_name}_temp_for_size.weights.h5") model.save_weights(temp_w) size_mb = os.path.getsize(temp_w) / (1024 * 1024) os.remove(temp_w) # Confusion matrix cm = confusion_matrix(y_true, y_pred) print("\nClassification Report:") print( classification_report( y_true, y_pred, target_names=class_names, zero_division=0, ) ) print(f"Test Accuracy : {accuracy:.4f}") print(f"Weighted Precision : {precision:.4f}") print(f"Weighted Recall : {recall:.4f}") print(f"Weighted F1-score : {f1:.4f}") print(f"Top-5 Accuracy : {top5_acc:.4f}") print(f"Avg time per image : {time_per_image*1000:.2f} ms") print(f"Images per second : {images_per_second:.2f}") print(f"Model size (weights) : {size_mb:.2f} MB") print(f"Num parameters : {model.count_params()}") # Save metrics + confusion matrix save_dir = os.path.join(METRICS_DIR, save_name) os.makedirs(save_dir, exist_ok=True) metrics = { "model_name": save_name, "accuracy": accuracy, "precision_weighted": float(precision), "recall_weighted": float(recall), "f1_weighted": float(f1), "top5_accuracy": float(top5_acc), "avg_inference_time_sec": float(time_per_image), "images_per_second": float(images_per_second), "model_size_mb": float(size_mb), "num_parameters": int(model.count_params()), } metrics_path = os.path.join(save_dir, "metrics.json") cm_path = os.path.join(save_dir, "confusion_matrix.npy") with open(metrics_path, "w") as f: json.dump(metrics, f, indent=2) np.save(cm_path, cm) print(f"\nSaved metrics to : {metrics_path}") print(f"Saved confusion matrix to: {cm_path}") return metrics, cm # ---- Evaluate Stage 1 ---- resnet_stage1_metrics, resnet_stage1_cm = evaluate_and_save( resnet_model, save_name="resnet50_v2_stage1", best_weights_path=resnet_stage1_best, test_ds=test_ds, class_names=class_names, ) # ---- Evaluate Stage 2 ---- resnet_stage2_metrics, resnet_stage2_cm = evaluate_and_save( resnet_model, save_name="resnet50_v2_stage2", best_weights_path=resnet_stage2_best, test_ds=test_ds, class_names=class_names, ) # ------------------------------------------------------------ # 10. SUMMARY # ------------------------------------------------------------ print("\n===== SUMMARY: RESNET50 v2 STAGES COMPARISON =====") print("Stage 1 Test Accuracy:", resnet_stage1_metrics["accuracy"]) print("Stage 2 Test Accuracy:", resnet_stage2_metrics["accuracy"]) print("✅ RESNET50 v2 pipeline complete.")