File size: 2,684 Bytes
5cc63c0
 
 
f14f7ab
 
 
5cc63c0
 
 
 
 
 
 
 
f14f7ab
 
 
 
 
5cc63c0
 
 
 
 
 
 
f14f7ab
5cc63c0
 
 
f14f7ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModelForCausalLM
import os

os.environ["TOKENIZERS_PARALLELISM"] = "false"

class CustomDetector:
    def __init__(self, model_name="tiiuae/falcon-rw-1b", max_length=512, batch_size=80):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model_name = model_name
        self.max_length = max_length
        self.batch_size = batch_size

        try:
            self.tokenizer = AutoTokenizer.from_pretrained(model_name)
            self.model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16)
        except Exception as e:
            raise RuntimeError(f"Failed to load model {model_name}: {str(e)}")

        self.model.to(self.device)
        self.model.eval()

        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

    def my_detector(self, texts: list[str]) -> list[float]:
        if isinstance(texts, str):
            texts = [texts]

        try:
            with torch.no_grad():
                tokenized = self.tokenizer(
                    texts,
                    truncation=True,
                    padding=True,
                    max_length=self.max_length,
                    return_tensors="pt",
                )
                tokenized = {k: v.to(self.device) for k, v in tokenized.items()}
                input_ids = tokenized["input_ids"]
                attention_mask = tokenized["attention_mask"]

                outputs = self.model(**tokenized)
                logits = outputs.logits[:, :-1, :]
                labels = tokenized["input_ids"][:, 1:]

                log_probs = F.log_softmax(logits, dim=-1)
                ll_per_token = log_probs.gather(2, labels.unsqueeze(-1)).squeeze(-1)
                attention_mask = tokenized["attention_mask"][:, 1:]
                ll_per_sample = (ll_per_token * attention_mask).sum(dim=-1) / attention_mask.sum(dim=1).clamp(min=1)

                neg_entropy = (log_probs.exp() * log_probs)
                entropy_per_sample = -(neg_entropy.sum(dim=-1) * attention_mask).sum(-1) / attention_mask.sum(dim=1).clamp(min=1)

                scores = (entropy_per_sample + ll_per_sample).cpu().tolist()

            return scores
        except Exception as e:
            raise RuntimeError(f"Error computing score: {str(e)}")

    def batch_gpu_detector(self, all_texts):
        results = []
        for i in range(0, len(all_texts), self.batch_size):
            batch_texts = all_texts[i:i + self.batch_size]
            results.extend(self.my_detector(batch_texts))
        return results