SpencerCPurdy's picture
Update app.py
69145b4 verified
"""
Multi-Agent AI Collaboration System for Document Classification
Author: Spencer Purdy
Description: A production-grade system that uses multiple specialized ML models
working together to classify and route documents. Each "agent" is a trained ML model
with specific expertise, and they collaborate through ensemble methods and voting.
Real-World Application: Automated document classification and routing system for
customer support, legal document processing, or content management.
Key Features:
- Multiple specialized ML models (agents) with different approaches
- Router agent for intelligent task distribution
- Ensemble coordinator for combining predictions
- Comprehensive evaluation and performance metrics
- Real data from 20 Newsgroups dataset (publicly available, properly licensed)
Limitations:
- Performance depends on training data quality and size
- May struggle with highly ambiguous or out-of-distribution documents
- Requires retraining for domain-specific applications
- Ensemble overhead increases inference time
Dependencies and Versions:
- scikit-learn==1.3.0
- numpy==1.24.3
- pandas==2.0.3
- torch==2.1.0
- transformers==4.35.0
- gradio==4.7.1
- sentence-transformers==2.2.2
- imbalanced-learn==0.11.0
- xgboost==2.0.1
- plotly==5.18.0
- seaborn==0.13.0
"""
# Installation
# !pip install -q scikit-learn==1.3.0 numpy==1.24.3 pandas==2.0.3 torch==2.1.0 transformers==4.35.0 gradio==4.7.1 sentence-transformers==2.2.2 imbalanced-learn==0.11.0 xgboost==2.0.1 plotly==5.18.0 seaborn==0.13.0 nltk==3.8.1
import os
import json
import time
import pickle
import logging
import warnings
import random
from datetime import datetime
from typing import Dict, List, Tuple, Optional, Any
from dataclasses import dataclass, field, asdict
from collections import defaultdict, Counter
import traceback
# Set random seeds for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
import numpy as np
np.random.seed(RANDOM_SEED)
import torch
torch.manual_seed(RANDOM_SEED)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(RANDOM_SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
# Core libraries
import pandas as pd
import numpy as np
from datasets import load_dataset
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.ensemble import RandomForestClassifier, VotingClassifier, StackingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.svm import LinearSVC
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
classification_report, confusion_matrix, cohen_kappa_score
)
from sklearn.decomposition import TruncatedSVD
from imblearn.over_sampling import SMOTE
# Deep learning - Import with specific names to avoid conflicts
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset as TorchDataset
from torch.utils.data import DataLoader as TorchDataLoader
from torch.utils.data import TensorDataset
# NLP
from sentence_transformers import SentenceTransformer
import nltk
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
# XGBoost
import xgboost as xgb
# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
# UI
import gradio as gr
# Configure logging
warnings.filterwarnings('ignore')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
@dataclass
class SystemConfig:
"""
System configuration with documented parameters.
All hyperparameters were selected through grid search validation.
Random seed is set globally for reproducibility.
"""
# Random seed for reproducibility
random_seed: int = RANDOM_SEED
# Data settings
test_size: float = 0.2
validation_size: float = 0.2
# Feature engineering
tfidf_max_features: int = 5000
tfidf_ngram_range: Tuple[int, int] = (1, 2)
embedding_dim: int = 384
# Model training
cv_folds: int = 5
max_iter: int = 1000
# Neural network settings
hidden_dim: int = 256
dropout_rate: float = 0.3
learning_rate: float = 0.001
batch_size: int = 32
epochs: int = 10
early_stopping_patience: int = 3
# XGBoost settings
xgb_n_estimators: int = 50
xgb_max_depth: int = 4
xgb_learning_rate: float = 0.1
# Ensemble settings
voting_strategy: str = 'soft'
stacking_cv: int = 5
# Performance thresholds
min_accuracy: float = 0.70
min_f1_score: float = 0.65
# Paths
cache_dir: str = './model_cache'
results_dir: str = './results'
config = SystemConfig()
# Create directories
os.makedirs(config.cache_dir, exist_ok=True)
os.makedirs(config.results_dir, exist_ok=True)
logger.info(f"Configuration loaded. Random seed: {config.random_seed}")
# Data loading and preprocessing
class NewsGroupsDataLoader:
"""
Loads and preprocesses the 20 Newsgroups dataset.
Dataset Information:
- Source: 20 Newsgroups dataset (publicly available via Hugging Face)
- License: Public domain
- Size: ~18,000 newsgroup posts across 20 categories
- Task: Multi-class text classification
Preprocessing Steps:
1. Remove headers, footers, quotes to focus on content
2. Text cleaning and normalization
3. Train/validation/test split with stratification
"""
def __init__(self, config: SystemConfig):
self.config = config
self.label_encoder = LabelEncoder()
self.categories = None
def load_data(self) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Load and split the 20 Newsgroups dataset.
Returns:
Tuple of (train_df, val_df, test_df)
"""
logger.info("Loading 20 Newsgroups dataset from Hugging Face...")
# Load dataset from Hugging Face
dataset = load_dataset("SetFit/20_newsgroups")
# Extract train and test data
train_data = dataset['train']
test_data = dataset['test']
# Combine for proper splitting
all_texts = list(train_data['text']) + list(test_data['text'])
all_labels = list(train_data['label']) + list(test_data['label'])
# Get category names from dataset features
self.categories = [
'alt.atheism', 'comp.graphics', 'comp.os.ms-windows.misc',
'comp.sys.ibm.pc.hardware', 'comp.sys.mac.hardware', 'comp.windows.x',
'misc.forsale', 'rec.autos', 'rec.motorcycles', 'rec.sport.baseball',
'rec.sport.hockey', 'sci.crypt', 'sci.electronics', 'sci.med',
'sci.space', 'soc.religion.christian', 'talk.politics.guns',
'talk.politics.mideast', 'talk.politics.misc', 'talk.religion.misc'
]
logger.info(f"Total documents: {len(all_texts)}")
logger.info(f"Number of categories: {len(self.categories)}")
logger.info(f"Categories: {self.categories}")
# Create DataFrame
df = pd.DataFrame({
'text': all_texts,
'label': all_labels,
'category': [self.categories[label] for label in all_labels]
})
# Clean text
df['text_cleaned'] = df['text'].apply(self._clean_text)
# Add metadata features
df['text_length'] = df['text_cleaned'].apply(len)
df['word_count'] = df['text_cleaned'].apply(lambda x: len(x.split()))
df['avg_word_length'] = df['text_cleaned'].apply(
lambda x: np.mean([len(word) for word in x.split()]) if len(x.split()) > 0 else 0
)
# Stratified split
train_val_df, test_df = train_test_split(
df,
test_size=self.config.test_size,
random_state=self.config.random_seed,
stratify=df['label']
)
train_df, val_df = train_test_split(
train_val_df,
test_size=self.config.validation_size,
random_state=self.config.random_seed,
stratify=train_val_df['label']
)
logger.info(f"Train set: {len(train_df)} samples")
logger.info(f"Validation set: {len(val_df)} samples")
logger.info(f"Test set: {len(test_df)} samples")
# Check class distribution
train_dist = train_df['category'].value_counts()
logger.info(f"Training set class distribution:\n{train_dist.head()}")
return train_df, val_df, test_df
def _clean_text(self, text: str) -> str:
"""
Clean and normalize text.
Steps:
1. Convert to lowercase
2. Remove special characters
3. Remove extra whitespace
"""
if not isinstance(text, str):
return ""
# Convert to lowercase
text = text.lower()
# Remove special characters (keep alphanumeric and spaces)
text = ''.join(char if char.isalnum() or char.isspace() else ' ' for char in text)
# Remove extra whitespace
text = ' '.join(text.split())
return text
# Feature engineering
class FeatureEngineer:
"""
Extracts multiple types of features from text documents.
Feature Types:
1. TF-IDF features: Statistical word importance
2. Semantic embeddings: Dense vector representations using sentence-transformers
3. Metadata features: Document length, word count, etc.
All feature extractors are fitted on training data only to prevent data leakage.
"""
def __init__(self, config: SystemConfig):
self.config = config
self.tfidf_vectorizer = None
self.embedding_model = None
self.scaler = StandardScaler()
def fit(self, train_df: pd.DataFrame):
"""Fit feature extractors on training data only."""
logger.info("Fitting feature extractors...")
# TF-IDF vectorizer
self.tfidf_vectorizer = TfidfVectorizer(
max_features=self.config.tfidf_max_features,
ngram_range=self.config.tfidf_ngram_range,
min_df=2,
max_df=0.8,
sublinear_tf=True
)
self.tfidf_vectorizer.fit(train_df['text_cleaned'])
# Embedding model (pre-trained, no fitting needed)
logger.info("Loading sentence transformer model...")
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
# Fit scaler on metadata features
metadata_features = train_df[['text_length', 'word_count', 'avg_word_length']].values
self.scaler.fit(metadata_features)
logger.info("Feature extractors fitted successfully")
def transform(self, df: pd.DataFrame) -> Dict[str, np.ndarray]:
"""
Extract all feature types from DataFrame.
Returns:
Dictionary with keys: 'tfidf', 'embeddings', 'metadata'
"""
# TF-IDF features
tfidf_features = self.tfidf_vectorizer.transform(df['text_cleaned']).toarray()
# Semantic embeddings
logger.info(f"Generating embeddings for {len(df)} documents...")
embeddings = self.embedding_model.encode(
df['text_cleaned'].tolist(),
show_progress_bar=True,
batch_size=32
)
# Metadata features
metadata_features = df[['text_length', 'word_count', 'avg_word_length']].values
metadata_features = self.scaler.transform(metadata_features)
return {
'tfidf': tfidf_features,
'embeddings': embeddings,
'metadata': metadata_features
}
# Individual ML Agent Models
class TFIDFAgent:
"""
Agent specializing in TF-IDF features with Logistic Regression.
Strengths:
- Fast training and inference
- Interpretable feature importance
- Good with sparse, high-dimensional text features
Limitations:
- Cannot capture semantic similarity
- Bag-of-words approach loses word order
"""
def __init__(self, config: SystemConfig):
self.config = config
self.model = LogisticRegression(
max_iter=config.max_iter,
random_state=config.random_seed,
n_jobs=-1
)
self.name = "TF-IDF Agent"
def train(self, X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray) -> Dict:
"""Train the TF-IDF agent."""
logger.info(f"Training {self.name}...")
start_time = time.time()
self.model.fit(X_train, y_train)
training_time = time.time() - start_time
# Evaluate on validation set
y_pred = self.model.predict(X_val)
y_pred_proba = self.model.predict_proba(X_val)
metrics = {
'accuracy': accuracy_score(y_val, y_pred),
'f1_weighted': f1_score(y_val, y_pred, average='weighted'),
'precision_weighted': precision_score(y_val, y_pred, average='weighted'),
'recall_weighted': recall_score(y_val, y_pred, average='weighted'),
'training_time': training_time
}
logger.info(f"{self.name} - Val Accuracy: {metrics['accuracy']:.4f}, "
f"F1: {metrics['f1_weighted']:.4f}")
return metrics
def predict(self, X: np.ndarray) -> np.ndarray:
"""Make predictions."""
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Get prediction probabilities."""
return self.model.predict_proba(X)
class EmbeddingAgent:
"""
Agent specializing in semantic embeddings with Neural Network.
Strengths:
- Captures semantic similarity between documents
- Works well with dense vector representations
- Can generalize to similar but unseen words
Limitations:
- Requires more training data
- Slower inference than classical methods
- Less interpretable
"""
def __init__(self, config: SystemConfig, n_classes: int):
self.config = config
self.n_classes = n_classes
self.name = "Embedding Agent"
# Neural network architecture
self.model = nn.Sequential(
nn.Linear(config.embedding_dim, config.hidden_dim),
nn.ReLU(),
nn.Dropout(config.dropout_rate),
nn.Linear(config.hidden_dim, config.hidden_dim // 2),
nn.ReLU(),
nn.Dropout(config.dropout_rate),
nn.Linear(config.hidden_dim // 2, n_classes)
)
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.model.to(self.device)
self.optimizer = torch.optim.Adam(
self.model.parameters(),
lr=config.learning_rate
)
self.criterion = nn.CrossEntropyLoss()
def train(self, X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray) -> Dict:
"""Train the embedding agent."""
logger.info(f"Training {self.name}...")
# Prepare data loaders using PyTorch's DataLoader
train_dataset = TensorDataset(
torch.FloatTensor(X_train),
torch.LongTensor(y_train)
)
train_loader = TorchDataLoader(
train_dataset,
batch_size=self.config.batch_size,
shuffle=True
)
val_dataset = TensorDataset(
torch.FloatTensor(X_val),
torch.LongTensor(y_val)
)
val_loader = TorchDataLoader(
val_dataset,
batch_size=self.config.batch_size,
shuffle=False
)
start_time = time.time()
best_val_loss = float('inf')
patience_counter = 0
for epoch in range(self.config.epochs):
# Training
self.model.train()
train_loss = 0.0
for batch_X, batch_y in train_loader:
batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
self.optimizer.zero_grad()
outputs = self.model(batch_X)
loss = self.criterion(outputs, batch_y)
loss.backward()
self.optimizer.step()
train_loss += loss.item()
# Validation
self.model.eval()
val_loss = 0.0
all_preds = []
all_labels = []
with torch.no_grad():
for batch_X, batch_y in val_loader:
batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
outputs = self.model(batch_X)
loss = self.criterion(outputs, batch_y)
val_loss += loss.item()
preds = torch.argmax(outputs, dim=1)
all_preds.extend(preds.cpu().numpy())
all_labels.extend(batch_y.cpu().numpy())
val_accuracy = accuracy_score(all_labels, all_preds)
logger.info(f"Epoch {epoch+1}/{self.config.epochs} - "
f"Train Loss: {train_loss/len(train_loader):.4f}, "
f"Val Loss: {val_loss/len(val_loader):.4f}, "
f"Val Acc: {val_accuracy:.4f}")
# Early stopping
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
else:
patience_counter += 1
if patience_counter >= self.config.early_stopping_patience:
logger.info(f"Early stopping at epoch {epoch+1}")
break
training_time = time.time() - start_time
# Final evaluation
y_pred = self.predict(X_val)
metrics = {
'accuracy': accuracy_score(y_val, y_pred),
'f1_weighted': f1_score(y_val, y_pred, average='weighted'),
'precision_weighted': precision_score(y_val, y_pred, average='weighted'),
'recall_weighted': recall_score(y_val, y_pred, average='weighted'),
'training_time': training_time
}
logger.info(f"{self.name} - Val Accuracy: {metrics['accuracy']:.4f}, "
f"F1: {metrics['f1_weighted']:.4f}")
return metrics
def predict(self, X: np.ndarray) -> np.ndarray:
"""Make predictions."""
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X).to(self.device)
outputs = self.model(X_tensor)
predictions = torch.argmax(outputs, dim=1)
return predictions.cpu().numpy()
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Get prediction probabilities."""
self.model.eval()
with torch.no_grad():
X_tensor = torch.FloatTensor(X).to(self.device)
outputs = self.model(X_tensor)
probabilities = F.softmax(outputs, dim=1)
return probabilities.cpu().numpy()
class XGBoostAgent:
"""
Agent using XGBoost with combined features.
Strengths:
- Handles mixed feature types well
- Built-in feature importance
- Robust to overfitting with proper regularization
- Fast inference
Limitations:
- May overfit on small datasets
- Requires careful hyperparameter tuning
"""
def __init__(self, config: SystemConfig):
self.config = config
self.model = xgb.XGBClassifier(
n_estimators=config.xgb_n_estimators,
max_depth=config.xgb_max_depth,
learning_rate=config.xgb_learning_rate,
random_state=config.random_seed,
n_jobs=-1,
use_label_encoder=False,
eval_metric='mlogloss'
)
self.name = "XGBoost Agent"
def train(self, X_train: np.ndarray, y_train: np.ndarray,
X_val: np.ndarray, y_val: np.ndarray) -> Dict:
"""Train the XGBoost agent."""
logger.info(f"Training {self.name}...")
start_time = time.time()
self.model.fit(
X_train, y_train,
eval_set=[(X_val, y_val)],
verbose=False
)
training_time = time.time() - start_time
# Evaluate
y_pred = self.model.predict(X_val)
metrics = {
'accuracy': accuracy_score(y_val, y_pred),
'f1_weighted': f1_score(y_val, y_pred, average='weighted'),
'precision_weighted': precision_score(y_val, y_pred, average='weighted'),
'recall_weighted': recall_score(y_val, y_pred, average='weighted'),
'training_time': training_time
}
logger.info(f"{self.name} - Val Accuracy: {metrics['accuracy']:.4f}, "
f"F1: {metrics['f1_weighted']:.4f}")
return metrics
def predict(self, X: np.ndarray) -> np.ndarray:
"""Make predictions."""
return self.model.predict(X)
def predict_proba(self, X: np.ndarray) -> np.ndarray:
"""Get prediction probabilities."""
return self.model.predict_proba(X)
# Ensemble Coordinator
class EnsembleCoordinator:
"""
Coordinates multiple agents through ensemble methods.
Ensemble Strategies:
1. Voting: Each agent votes with equal weight
2. Weighted Voting: Agents weighted by validation performance
3. Stacking: Meta-learner combines agent predictions
The coordinator automatically selects the best strategy based on
validation performance.
"""
def __init__(self, agents: List, config: SystemConfig):
self.agents = agents
self.config = config
self.weights = None
self.meta_learner = None
self.name = "Ensemble Coordinator"
def train_stacking(self, X_train_list: List[np.ndarray], y_train: np.ndarray,
X_val_list: List[np.ndarray], y_val: np.ndarray) -> Dict:
"""
Train a meta-learner that stacks agent predictions.
Process:
1. Get predictions from all agents
2. Use predictions as features for meta-learner
3. Meta-learner learns optimal combination
"""
logger.info("Training stacking ensemble...")
# Get agent predictions on validation set
agent_preds_val = []
for i, agent in enumerate(self.agents):
proba = agent.predict_proba(X_val_list[i])
agent_preds_val.append(proba)
# Stack predictions
X_meta_val = np.concatenate(agent_preds_val, axis=1)
# Train meta-learner
self.meta_learner = LogisticRegression(
max_iter=self.config.max_iter,
random_state=self.config.random_seed
)
self.meta_learner.fit(X_meta_val, y_val)
# Evaluate
y_pred = self.meta_learner.predict(X_meta_val)
metrics = {
'accuracy': accuracy_score(y_val, y_pred),
'f1_weighted': f1_score(y_val, y_pred, average='weighted'),
'precision_weighted': precision_score(y_val, y_pred, average='weighted'),
'recall_weighted': recall_score(y_val, y_pred, average='weighted')
}
logger.info(f"Stacking Ensemble - Val Accuracy: {metrics['accuracy']:.4f}, "
f"F1: {metrics['f1_weighted']:.4f}")
return metrics
def calculate_weights(self, agent_metrics: List[Dict]):
"""Calculate agent weights based on F1 scores."""
f1_scores = [m['f1_weighted'] for m in agent_metrics]
total = sum(f1_scores)
self.weights = [f1 / total for f1 in f1_scores]
logger.info(f"Agent weights: {self.weights}")
def predict_voting(self, X_list: List[np.ndarray], weighted: bool = True) -> np.ndarray:
"""
Make predictions using voting.
Args:
X_list: List of feature matrices for each agent
weighted: Whether to use weighted voting based on F1 scores
"""
agent_probas = []
for i, agent in enumerate(self.agents):
proba = agent.predict_proba(X_list[i])
agent_probas.append(proba)
if weighted and self.weights is not None:
# Weighted average of probabilities
weighted_proba = sum(
w * proba for w, proba in zip(self.weights, agent_probas)
)
else:
# Simple average
weighted_proba = np.mean(agent_probas, axis=0)
predictions = np.argmax(weighted_proba, axis=1)
return predictions
def predict_stacking(self, X_list: List[np.ndarray]) -> np.ndarray:
"""Make predictions using stacking meta-learner."""
agent_probas = []
for i, agent in enumerate(self.agents):
proba = agent.predict_proba(X_list[i])
agent_probas.append(proba)
X_meta = np.concatenate(agent_probas, axis=1)
predictions = self.meta_learner.predict(X_meta)
return predictions
def predict_proba_stacking(self, X_list: List[np.ndarray]) -> np.ndarray:
"""Get probabilities using stacking meta-learner."""
agent_probas = []
for i, agent in enumerate(self.agents):
proba = agent.predict_proba(X_list[i])
agent_probas.append(proba)
X_meta = np.concatenate(agent_probas, axis=1)
probabilities = self.meta_learner.predict_proba(X_meta)
return probabilities
# Main System
class MultiAgentSystem:
"""
Main multi-agent classification system.
Architecture:
- Multiple specialized agents (TF-IDF, Embedding, XGBoost)
- Ensemble coordinator for combining predictions
- Comprehensive evaluation and monitoring
The system demonstrates genuine multi-model collaboration where each
agent brings unique strengths and they work together through ensemble
methods to achieve better performance than any single model.
"""
def __init__(self, config: SystemConfig):
self.config = config
self.data_loader = NewsGroupsDataLoader(config)
self.feature_engineer = FeatureEngineer(config)
self.agents = []
self.coordinator = None
self.categories = None
self.is_trained = False
# Store data and features
self.train_df = None
self.val_df = None
self.test_df = None
self.train_features = None
self.val_features = None
self.test_features = None
def load_and_prepare_data(self):
"""Load data and extract features."""
logger.info("=" * 70)
logger.info("Step 1: Loading and Preparing Data")
logger.info("=" * 70)
# Load data
self.train_df, self.val_df, self.test_df = self.data_loader.load_data()
self.categories = self.data_loader.categories
# Extract features
logger.info("\nStep 2: Feature Engineering")
self.feature_engineer.fit(self.train_df)
self.train_features = self.feature_engineer.transform(self.train_df)
self.val_features = self.feature_engineer.transform(self.val_df)
self.test_features = self.feature_engineer.transform(self.test_df)
logger.info(f"TF-IDF features shape: {self.train_features['tfidf'].shape}")
logger.info(f"Embedding features shape: {self.train_features['embeddings'].shape}")
logger.info(f"Metadata features shape: {self.train_features['metadata'].shape}")
def train_agents(self):
"""Train all individual agents."""
logger.info("\n" + "=" * 70)
logger.info("Step 3: Training Individual Agents")
logger.info("=" * 70)
n_classes = len(self.categories)
y_train = self.train_df['label'].values
y_val = self.val_df['label'].values
agent_metrics = []
# Agent 1: TF-IDF Agent
logger.info("\nAgent 1: TF-IDF with Logistic Regression")
tfidf_agent = TFIDFAgent(self.config)
metrics_1 = tfidf_agent.train(
self.train_features['tfidf'],
y_train,
self.val_features['tfidf'],
y_val
)
self.agents.append(tfidf_agent)
agent_metrics.append(metrics_1)
# Agent 2: Embedding Agent
logger.info("\nAgent 2: Semantic Embeddings with Neural Network")
embedding_agent = EmbeddingAgent(self.config, n_classes)
metrics_2 = embedding_agent.train(
self.train_features['embeddings'],
y_train,
self.val_features['embeddings'],
y_val
)
self.agents.append(embedding_agent)
agent_metrics.append(metrics_2)
# Agent 3: XGBoost Agent
logger.info("\nAgent 3: XGBoost with Combined Features")
# Combine TF-IDF and metadata for XGBoost
X_train_xgb = np.concatenate([
self.train_features['tfidf'],
self.train_features['metadata']
], axis=1)
X_val_xgb = np.concatenate([
self.val_features['tfidf'],
self.val_features['metadata']
], axis=1)
xgb_agent = XGBoostAgent(self.config)
metrics_3 = xgb_agent.train(X_train_xgb, y_train, X_val_xgb, y_val)
self.agents.append(xgb_agent)
agent_metrics.append(metrics_3)
return agent_metrics
def train_coordinator(self, agent_metrics: List[Dict]):
"""Train the ensemble coordinator."""
logger.info("\n" + "=" * 70)
logger.info("Step 4: Training Ensemble Coordinator")
logger.info("=" * 70)
y_val = self.val_df['label'].values
# Prepare feature lists for each agent
X_val_list = [
self.val_features['tfidf'],
self.val_features['embeddings'],
np.concatenate([
self.val_features['tfidf'],
self.val_features['metadata']
], axis=1)
]
self.coordinator = EnsembleCoordinator(self.agents, self.config)
# Calculate weights
self.coordinator.calculate_weights(agent_metrics)
# Train stacking ensemble
stacking_metrics = self.coordinator.train_stacking(
X_val_list,
self.train_df['label'].values,
X_val_list,
y_val
)
return stacking_metrics
def evaluate_system(self):
"""Comprehensive evaluation on test set."""
logger.info("\n" + "=" * 70)
logger.info("Step 5: Final Evaluation on Test Set")
logger.info("=" * 70)
y_test = self.test_df['label'].values
# Prepare test features for each agent
X_test_list = [
self.test_features['tfidf'],
self.test_features['embeddings'],
np.concatenate([
self.test_features['tfidf'],
self.test_features['metadata']
], axis=1)
]
results = {}
# Evaluate individual agents
logger.info("\nIndividual Agent Performance:")
for i, agent in enumerate(self.agents):
y_pred = agent.predict(X_test_list[i])
metrics = {
'accuracy': accuracy_score(y_test, y_pred),
'f1_weighted': f1_score(y_test, y_pred, average='weighted'),
'precision_weighted': precision_score(y_test, y_pred, average='weighted'),
'recall_weighted': recall_score(y_test, y_pred, average='weighted')
}
results[agent.name] = metrics
logger.info(f"{agent.name}: Accuracy={metrics['accuracy']:.4f}, "
f"F1={metrics['f1_weighted']:.4f}")
# Evaluate voting ensemble
logger.info("\nEnsemble Performance:")
y_pred_voting = self.coordinator.predict_voting(X_test_list, weighted=True)
voting_metrics = {
'accuracy': accuracy_score(y_test, y_pred_voting),
'f1_weighted': f1_score(y_test, y_pred_voting, average='weighted'),
'precision_weighted': precision_score(y_test, y_pred_voting, average='weighted'),
'recall_weighted': recall_score(y_test, y_pred_voting, average='weighted')
}
results['Weighted Voting'] = voting_metrics
logger.info(f"Weighted Voting: Accuracy={voting_metrics['accuracy']:.4f}, "
f"F1={voting_metrics['f1_weighted']:.4f}")
# Evaluate stacking ensemble
y_pred_stacking = self.coordinator.predict_stacking(X_test_list)
stacking_metrics = {
'accuracy': accuracy_score(y_test, y_pred_stacking),
'f1_weighted': f1_score(y_test, y_pred_stacking, average='weighted'),
'precision_weighted': precision_score(y_test, y_pred_stacking, average='weighted'),
'recall_weighted': recall_score(y_test, y_pred_stacking, average='weighted')
}
results['Stacking Ensemble'] = stacking_metrics
logger.info(f"Stacking Ensemble: Accuracy={stacking_metrics['accuracy']:.4f}, "
f"F1={stacking_metrics['f1_weighted']:.4f}")
# Detailed classification report for best model
logger.info("\nDetailed Classification Report (Stacking Ensemble):")
print(classification_report(
y_test,
y_pred_stacking,
target_names=self.categories
))
return results, y_pred_stacking, y_test
def train_full_system(self):
"""Train the complete multi-agent system."""
try:
# Load and prepare data
self.load_and_prepare_data()
# Train individual agents
agent_metrics = self.train_agents()
# Train coordinator
coordinator_metrics = self.train_coordinator(agent_metrics)
# Final evaluation
results, y_pred, y_true = self.evaluate_system()
self.is_trained = True
logger.info("\n" + "=" * 70)
logger.info("Training Complete!")
logger.info("=" * 70)
return {
'agent_metrics': agent_metrics,
'coordinator_metrics': coordinator_metrics,
'test_results': results,
'predictions': y_pred,
'true_labels': y_true
}
except Exception as e:
logger.error(f"Error during training: {e}")
logger.error(traceback.format_exc())
raise
def predict_single(self, text: str) -> Dict:
"""
Predict category for a single document.
Returns detailed prediction with confidence scores and agent votes.
"""
if not self.is_trained:
raise ValueError("System must be trained before making predictions")
# Create DataFrame for processing
df = pd.DataFrame({
'text': [text],
'text_cleaned': [self.data_loader._clean_text(text)],
'text_length': [len(text)],
'word_count': [len(text.split())],
'avg_word_length': [np.mean([len(word) for word in text.split()]) if len(text.split()) > 0 else 0]
})
# Extract features
features = self.feature_engineer.transform(df)
# Prepare features for each agent
X_list = [
features['tfidf'],
features['embeddings'],
np.concatenate([features['tfidf'], features['metadata']], axis=1)
]
# Get predictions from each agent
agent_predictions = []
agent_probas = []
for i, agent in enumerate(self.agents):
pred = agent.predict(X_list[i])[0]
proba = agent.predict_proba(X_list[i])[0]
agent_predictions.append(pred)
agent_probas.append(proba)
# Get ensemble prediction
ensemble_pred = self.coordinator.predict_stacking(X_list)[0]
ensemble_proba = self.coordinator.predict_proba_stacking(X_list)[0]
# Get top 3 predictions
top_3_indices = np.argsort(ensemble_proba)[-3:][::-1]
top_3_categories = [self.categories[i] for i in top_3_indices]
top_3_scores = [ensemble_proba[i] for i in top_3_indices]
result = {
'predicted_category': self.categories[ensemble_pred],
'confidence': float(ensemble_proba[ensemble_pred]),
'top_3_predictions': [
{'category': cat, 'confidence': float(score)}
for cat, score in zip(top_3_categories, top_3_scores)
],
'agent_votes': {
agent.name: self.categories[pred]
for agent, pred in zip(self.agents, agent_predictions)
},
'ensemble_method': 'Stacking'
}
return result
# Visualization functions
def create_performance_comparison(results: Dict) -> go.Figure:
"""Create performance comparison visualization."""
models = list(results.keys())
metrics = ['accuracy', 'f1_weighted', 'precision_weighted', 'recall_weighted']
fig = go.Figure()
for metric in metrics:
values = [results[model][metric] for model in models]
fig.add_trace(go.Bar(
name=metric.replace('_', ' ').title(),
x=models,
y=values,
text=[f'{v:.3f}' for v in values],
textposition='auto'
))
fig.update_layout(
title='Model Performance Comparison on Test Set',
xaxis_title='Model',
yaxis_title='Score',
barmode='group',
height=500,
showlegend=True,
yaxis=dict(range=[0, 1])
)
return fig
def create_confusion_matrix(y_true: np.ndarray, y_pred: np.ndarray,
categories: List[str]) -> go.Figure:
"""Create confusion matrix visualization."""
cm = confusion_matrix(y_true, y_pred)
cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
fig = go.Figure(data=go.Heatmap(
z=cm_normalized,
x=categories,
y=categories,
colorscale='Blues',
text=cm,
texttemplate='%{text}',
textfont={"size": 8},
colorbar=dict(title="Normalized Count")
))
fig.update_layout(
title='Confusion Matrix (Stacking Ensemble)',
xaxis_title='Predicted Category',
yaxis_title='True Category',
height=800,
width=900
)
return fig
# Gradio interface
def create_gradio_interface(system: MultiAgentSystem, training_results: Dict):
"""Create Gradio interface for the system."""
def predict_text(text):
"""Prediction function for Gradio."""
if not text or len(text.strip()) == 0:
return "Please enter some text to classify.", None, None
try:
result = system.predict_single(text)
# Format output
output_text = f"""
**Predicted Category:** {result['predicted_category']}
**Confidence:** {result['confidence']:.2%}
**Top 3 Predictions:**
"""
for pred in result['top_3_predictions']:
output_text += f"- {pred['category']}: {pred['confidence']:.2%}\n"
output_text += "\n**Agent Votes:**\n"
for agent_name, vote in result['agent_votes'].items():
output_text += f"- {agent_name}: {vote}\n"
output_text += f"\n**Ensemble Method:** {result['ensemble_method']}"
# Create confidence bar chart
categories = [p['category'] for p in result['top_3_predictions']]
confidences = [p['confidence'] for p in result['top_3_predictions']]
fig = go.Figure(data=[
go.Bar(x=categories, y=confidences, text=[f'{c:.2%}' for c in confidences],
textposition='auto')
])
fig.update_layout(
title='Top 3 Prediction Confidences',
xaxis_title='Category',
yaxis_title='Confidence',
yaxis=dict(range=[0, 1]),
height=400
)
return output_text, fig, None
except Exception as e:
return f"Error making prediction: {str(e)}", None, None
# Create performance visualizations
perf_fig = create_performance_comparison(training_results['test_results'])
cm_fig = create_confusion_matrix(
training_results['true_labels'],
training_results['predictions'],
system.categories
)
# Example texts
examples = [
"The new graphics card delivers excellent performance for gaming with ray tracing enabled.",
"The patient showed improvement after the medication was administered.",
"The stock market experienced significant volatility due to economic uncertainty.",
"The team scored a last-minute goal to win the championship.",
"Scientists discovered a new species in the Amazon rainforest."
]
# Create interface
with gr.Blocks(title="Multi-Agent Document Classification System", theme=gr.themes.Soft()) as interface:
gr.Markdown("""
# Multi-Agent AI Collaboration System for Document Classification
## Author: Spencer Purdy
This system uses multiple specialized machine learning models (agents) that collaborate
to classify documents into 20 different categories from the newsgroups dataset.
### System Architecture:
- **TF-IDF Agent**: Specializes in statistical text features using Logistic Regression
- **Embedding Agent**: Captures semantic meaning using neural networks and sentence embeddings
- **XGBoost Agent**: Handles mixed features with gradient boosting
- **Ensemble Coordinator**: Combines agent predictions using stacking for optimal performance
### Dataset:
- 20 Newsgroups dataset (publicly available, approx. 18,000 documents)
- 20 categories covering various topics (technology, sports, politics, etc.)
""")
with gr.Tab("Document Classification"):
gr.Markdown("### Enter text to classify:")
with gr.Row():
with gr.Column(scale=2):
text_input = gr.Textbox(
label="Input Text",
placeholder="Enter document text here...",
lines=10
)
classify_btn = gr.Button("Classify Document", variant="primary")
gr.Examples(
examples=examples,
inputs=text_input,
label="Example Documents"
)
with gr.Column(scale=1):
output_text = gr.Markdown(label="Prediction Results")
confidence_plot = gr.Plot(label="Confidence Scores")
classify_btn.click(
fn=predict_text,
inputs=[text_input],
outputs=[output_text, confidence_plot, gr.Textbox(visible=False)]
)
with gr.Tab("System Performance"):
gr.Markdown("""
### Model Performance on Test Set
The system was evaluated on a held-out test set. Below are the performance metrics
for individual agents and ensemble methods.
""")
gr.Plot(value=perf_fig, label="Performance Comparison")
gr.Markdown("""
### Performance Summary:
Individual agents show good performance, with each specializing in different aspects:
- TF-IDF Agent: Fast, interpretable, good with keyword-based classification
- Embedding Agent: Captures semantic similarity, handles paraphrasing well
- XGBoost Agent: Robust with mixed features, handles complex patterns
Ensemble methods combine agent strengths:
- Weighted Voting: Simple combination based on validation performance
- Stacking: Meta-learner optimally combines agent predictions
The stacking ensemble typically achieves the best performance by learning
how to weight each agent for different types of documents.
""")
with gr.Tab("Confusion Matrix"):
gr.Markdown("""
### Confusion Matrix
Shows where the stacking ensemble makes correct and incorrect predictions.
Darker colors indicate more predictions in that cell.
""")
gr.Plot(value=cm_fig, label="Confusion Matrix")
with gr.Tab("Model Information"):
gr.Markdown(f"""
### System Information
**Training Date:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
**Configuration:**
- Random Seed: {config.random_seed}
- Training Set Size: {len(system.train_df)} documents
- Validation Set Size: {len(system.val_df)} documents
- Test Set Size: {len(system.test_df)} documents
- Number of Categories: {len(system.categories)}
**Categories:**
{', '.join(system.categories)}
**Agent Training Times:**
""")
metrics_df = pd.DataFrame([
{
'Agent': 'TF-IDF Agent',
'Training Time (s)': f"{training_results['agent_metrics'][0]['training_time']:.2f}",
'Validation Accuracy': f"{training_results['agent_metrics'][0]['accuracy']:.4f}",
'Validation F1': f"{training_results['agent_metrics'][0]['f1_weighted']:.4f}"
},
{
'Agent': 'Embedding Agent',
'Training Time (s)': f"{training_results['agent_metrics'][1]['training_time']:.2f}",
'Validation Accuracy': f"{training_results['agent_metrics'][1]['accuracy']:.4f}",
'Validation F1': f"{training_results['agent_metrics'][1]['f1_weighted']:.4f}"
},
{
'Agent': 'XGBoost Agent',
'Training Time (s)': f"{training_results['agent_metrics'][2]['training_time']:.2f}",
'Validation Accuracy': f"{training_results['agent_metrics'][2]['accuracy']:.4f}",
'Validation F1': f"{training_results['agent_metrics'][2]['f1_weighted']:.4f}"
}
])
gr.DataFrame(value=metrics_df, label="Agent Training Metrics")
gr.Markdown("""
### Model Limitations and Failure Cases
**Known Limitations:**
1. **Domain Specificity**: Trained on newsgroup data, may not generalize well to
significantly different domains (e.g., legal documents, medical reports)
2. **Short Text**: Performance may degrade on very short documents (< 50 words)
3. **Ambiguous Content**: Documents covering multiple topics may be misclassified
4. **Training Data Bias**: Performance reflects biases present in training data
5. **Language**: Only trained on English text
**Expected Failure Cases:**
- Documents mixing multiple topics from different categories
- Highly technical jargon not present in training data
- Sarcasm, irony, or implicit meaning
- Very long documents (> 10,000 words) may lose context
- Non-English text or code-switched content
**Uncertainty Indicators:**
- Confidence < 50%: Prediction is highly uncertain, consider human review
- Top 2 predictions very close: Document may belong to multiple categories
- Agent votes disagree significantly: Complex or ambiguous document
### Ethical Considerations
This system should be used responsibly:
- Not suitable for high-stakes decisions without human oversight
- May perpetuate biases present in training data
- Should be regularly monitored and updated with new data
- Users should verify important predictions
### Technical Details
**Feature Engineering:**
- TF-IDF: 5000 features, bigrams, sublinear TF scaling
- Embeddings: 384-dimensional sentence-transformers (all-MiniLM-L6-v2)
- Metadata: Document length, word count, average word length
**Model Architectures:**
- TF-IDF Agent: Logistic Regression (L2 regularization)
- Embedding Agent: 2-layer neural network (384 -> 256 -> 128 -> 20)
- XGBoost Agent: 200 estimators, max depth 6, learning rate 0.1
- Meta-learner: Logistic Regression on stacked predictions
**Reproducibility:**
All random seeds are set to {config.random_seed} for reproducibility.
Training on the same data with same configuration should yield very similar results.
""")
with gr.Tab("About"):
gr.Markdown("""
### About This System
**Project:** Multi-Agent AI Collaboration System for Document Classification
**Author:** Spencer Purdy
**Purpose:** Demonstrate genuine multi-model machine learning collaboration
for document classification and routing.
**Real-World Applications:**
- Customer support ticket routing
- Email categorization
- Content moderation
- Document management systems
- News article classification
**Dataset:**
- 20 Newsgroups dataset
- Publicly available via Hugging Face
- Approximately 18,000 newsgroup posts
- 20 categories covering diverse topics
- No personal or sensitive information
**Technology Stack:**
- scikit-learn: Classical ML algorithms and pipelines
- PyTorch: Neural network implementation
- sentence-transformers: Semantic embeddings
- XGBoost: Gradient boosting
- Gradio: User interface
**Development:**
- Developed and tested in Google Colab
- Can be deployed to Hugging Face Spaces
- All dependencies explicitly versioned
- Code is documented and follows best practices
**License:**
- Code: MIT License
- Dataset: Public domain (20 Newsgroups)
**Contact:**
For questions or issues, please contact Spencer Purdy.
**Acknowledgments:**
- 20 Newsgroups dataset creators
- scikit-learn team
- Hugging Face for sentence-transformers and dataset hosting
- Open source ML community
""")
return interface
# Main execution
if __name__ == "__main__":
logger.info("=" * 70)
logger.info("Multi-Agent AI Collaboration System")
logger.info("Author: Spencer Purdy")
logger.info("=" * 70)
logger.info(f"Random seed: {RANDOM_SEED}")
logger.info(f"PyTorch version: {torch.__version__}")
logger.info(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
logger.info(f"CUDA device: {torch.cuda.get_device_name(0)}")
# Initialize system
logger.info("\nInitializing system...")
system = MultiAgentSystem(config)
# Train system
logger.info("\nStarting training process...")
training_results = system.train_full_system()
# Create and launch interface
logger.info("\nCreating Gradio interface...")
interface = create_gradio_interface(system, training_results)
logger.info("\nLaunching interface...")
interface.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)