Spaces:
Running
on
Zero
Running
on
Zero
| import torch | |
| import torch.nn.functional as F | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import os | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| class CustomDetector: | |
| def __init__(self, model_name="tiiuae/falcon-rw-1b", max_length=512, batch_size=80): | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.model_name = model_name | |
| self.max_length = max_length | |
| self.batch_size = batch_size | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to load model {model_name}: {str(e)}") | |
| self.model.to(self.device) | |
| self.model.eval() | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| def my_detector(self, texts: list[str]) -> list[float]: | |
| if isinstance(texts, str): | |
| texts = [texts] | |
| try: | |
| with torch.no_grad(): | |
| tokenized = self.tokenizer( | |
| texts, | |
| truncation=True, | |
| padding=True, | |
| max_length=self.max_length, | |
| return_tensors="pt", | |
| ) | |
| tokenized = {k: v.to(self.device) for k, v in tokenized.items()} | |
| input_ids = tokenized["input_ids"] | |
| attention_mask = tokenized["attention_mask"] | |
| outputs = self.model(**tokenized) | |
| logits = outputs.logits[:, :-1, :] | |
| labels = tokenized["input_ids"][:, 1:] | |
| log_probs = F.log_softmax(logits, dim=-1) | |
| ll_per_token = log_probs.gather(2, labels.unsqueeze(-1)).squeeze(-1) | |
| attention_mask = tokenized["attention_mask"][:, 1:] | |
| ll_per_sample = (ll_per_token * attention_mask).sum(dim=-1) / attention_mask.sum(dim=1).clamp(min=1) | |
| neg_entropy = (log_probs.exp() * log_probs) | |
| entropy_per_sample = -(neg_entropy.sum(dim=-1) * attention_mask).sum(-1) / attention_mask.sum(dim=1).clamp(min=1) | |
| scores = (entropy_per_sample + ll_per_sample).cpu().tolist() | |
| return scores | |
| except Exception as e: | |
| raise RuntimeError(f"Error computing score: {str(e)}") | |
| def batch_gpu_detector(self, all_texts): | |
| results = [] | |
| for i in range(0, len(all_texts), self.batch_size): | |
| batch_texts = all_texts[i:i + self.batch_size] | |
| results.extend(self.my_detector(batch_texts)) | |
| return results |