|
|
import joblib |
|
|
import numpy as np |
|
|
import torch |
|
|
import os |
|
|
from transformers import AutoTokenizer, AutoModel |
|
|
from sklearn.preprocessing import StandardScaler |
|
|
|
|
|
|
|
|
_device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
|
|
_models = {} |
|
|
_tokenizers = {} |
|
|
_classifiers = {} |
|
|
_scalers = {} |
|
|
|
|
|
def initialize_models(): |
|
|
"""Pre-load all models at startup""" |
|
|
model_configs = { |
|
|
'Distilbert': 'distilbert-base-uncased', |
|
|
'Roberta': 'roberta-base', |
|
|
'Longformer': 'allenai/longformer-base-4096' |
|
|
} |
|
|
|
|
|
for name, path in model_configs.items(): |
|
|
key = name.lower() |
|
|
print(f"Loading {name}...") |
|
|
|
|
|
|
|
|
_tokenizers[key] = AutoTokenizer.from_pretrained(path) |
|
|
_models[key] = AutoModel.from_pretrained(path).to(_device).eval() |
|
|
|
|
|
|
|
|
clf_path = f"{name}_xgboost_model.pkl" |
|
|
if not os.path.exists(clf_path): |
|
|
raise FileNotFoundError(f"Missing classifier: {clf_path}") |
|
|
_classifiers[key] = joblib.load(clf_path) |
|
|
|
|
|
scaler_path = f"{name}_scaler.pkl" |
|
|
if os.path.exists(scaler_path): |
|
|
_scalers[key] = joblib.load(scaler_path) |
|
|
else: |
|
|
_scalers[key] = StandardScaler().fit(np.eye(768)) |
|
|
|
|
|
def get_embedding(text, model_name): |
|
|
"""Generate standardized embeddings with proper error handling""" |
|
|
try: |
|
|
model_key = model_name.lower() |
|
|
if model_key not in _models: |
|
|
raise ValueError(f"Model {model_name} not initialized") |
|
|
|
|
|
inputs = _tokenizers[model_key]( |
|
|
text, |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
padding=True, |
|
|
max_length=512 |
|
|
).to(_device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = _models[model_key](**inputs) |
|
|
last_hidden = outputs.last_hidden_state |
|
|
attention_mask = inputs["attention_mask"].unsqueeze(-1) |
|
|
pooled = (last_hidden * attention_mask).sum(1) / attention_mask.sum(1) |
|
|
|
|
|
embedding = pooled.cpu().numpy().squeeze(0) |
|
|
return _scalers[model_key].transform(embedding.reshape(1, -1))[0] |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Embedding error: {str(e)}") |
|
|
return np.zeros(768) |
|
|
|
|
|
def get_prediction(text, model_name): |
|
|
try: |
|
|
model_key = model_name.lower() |
|
|
if model_key not in _classifiers: |
|
|
raise ValueError(f"Classifier for {model_name} not loaded") |
|
|
|
|
|
embedding = get_embedding(text, model_name).reshape(1, -1) |
|
|
proba = _classifiers[model_key].predict_proba(embedding)[0][1] |
|
|
|
|
|
threshold = 0.5 |
|
|
return { |
|
|
"prediction": "π Jailbreak" if proba > threshold else "β
Benign", |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Prediction error: {str(e)}") |
|
|
return {"error": str(e)} |
|
|
|
|
|
|
|
|
initialize_models() |