Liketropy-LLM-Detector / detector.py
tararad's picture
Update detector.py
f14f7ab verified
raw
history blame
2.68 kB
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
class CustomDetector:
def __init__(self, model_name="tiiuae/falcon-rw-1b", max_length=512, batch_size=80):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.model_name = model_name
self.max_length = max_length
self.batch_size = batch_size
try:
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16)
except Exception as e:
raise RuntimeError(f"Failed to load model {model_name}: {str(e)}")
self.model.to(self.device)
self.model.eval()
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
def my_detector(self, texts: list[str]) -> list[float]:
if isinstance(texts, str):
texts = [texts]
try:
with torch.no_grad():
tokenized = self.tokenizer(
texts,
truncation=True,
padding=True,
max_length=self.max_length,
return_tensors="pt",
)
tokenized = {k: v.to(self.device) for k, v in tokenized.items()}
input_ids = tokenized["input_ids"]
attention_mask = tokenized["attention_mask"]
outputs = self.model(**tokenized)
logits = outputs.logits[:, :-1, :]
labels = tokenized["input_ids"][:, 1:]
log_probs = F.log_softmax(logits, dim=-1)
ll_per_token = log_probs.gather(2, labels.unsqueeze(-1)).squeeze(-1)
attention_mask = tokenized["attention_mask"][:, 1:]
ll_per_sample = (ll_per_token * attention_mask).sum(dim=-1) / attention_mask.sum(dim=1).clamp(min=1)
neg_entropy = (log_probs.exp() * log_probs)
entropy_per_sample = -(neg_entropy.sum(dim=-1) * attention_mask).sum(-1) / attention_mask.sum(dim=1).clamp(min=1)
scores = (entropy_per_sample + ll_per_sample).cpu().tolist()
return scores
except Exception as e:
raise RuntimeError(f"Error computing score: {str(e)}")
def batch_gpu_detector(self, all_texts):
results = []
for i in range(0, len(all_texts), self.batch_size):
batch_texts = all_texts[i:i + self.batch_size]
results.extend(self.my_detector(batch_texts))
return results