Spaces:
Sleeping
Sleeping
| import torch | |
| import torch.nn as nn | |
| # import jieba | |
| import string | |
| import numpy as np | |
| from copy import deepcopy | |
| from tqdm import tqdm | |
| import time | |
| from datetime import datetime | |
| import os | |
| from sklearn.linear_model import LinearRegression | |
| from torch.multiprocessing import Process,Pool | |
| from transformers import BertTokenizer | |
| os.environ['TOKENIZERS_PARALLELISM']='True' | |
| # torch.autograd.set_detect_anomaly(True) | |
| class BaseAttack: | |
| def __init__(self, name, model, tokenizer, device, max_per, padding,max_length,label_to_id,sentence1_key,sentence2_key): | |
| self.name = name | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.device = device | |
| self.model = self.model.to(self.device) | |
| self.model.eval() | |
| self.padding = padding | |
| self.max_length = max_length | |
| self.label_to_id = label_to_id | |
| self.sentence1_key = sentence1_key | |
| self.sentence2_key = sentence2_key | |
| # 修改token个数的最大值 | |
| self.max_per = max_per | |
| # linear regression model initialization | |
| # self.linear_regression() | |
| self.random_tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') | |
| def run_attack(self, x): | |
| pass | |
| def compute_loss(self, x): | |
| pass | |
| def preprocess_function(self,examples,to_device=True): | |
| # Tokenize the texts | |
| texts = ((examples[0],) if self.sentence2_key is None else (examples[0], examples[1])) | |
| result = self.tokenizer(*texts, padding=self.padding, max_length=self.max_length, truncation=True) | |
| new_result = {} | |
| for key,item in result.items(): | |
| if to_device: | |
| new_result[key] = torch.tensor(item).unsqueeze(0).to(self.device) | |
| else: | |
| new_result[key] = torch.tensor(item).unsqueeze(0) | |
| return new_result | |
| def get_pred(self,input_): | |
| return self.get_prob(input_).logits.argmax(dim=-1) | |
| def get_prob(self,input_): | |
| toc = datetime.now() | |
| batch = self.preprocess_function(input_) | |
| # batch['gumbel_softmax']=gradient | |
| # print(batch) | |
| outputs = self.model(**batch) # get all logits | |
| tic = datetime.now() | |
| running_time = (tic-toc).total_seconds() | |
| return outputs,running_time | |
| def output_analysis(self,outputs): | |
| # print(outputs) | |
| all_skim_loss, all_tokens_remained = list(), list() | |
| all_layer_tokens_remained = [[] for _ in range(len(outputs.layer_tokens_remained))] | |
| all_skim_loss.append(outputs.skim_loss) | |
| all_tokens_remained.append(outputs.tokens_remained) | |
| for layer_idx,mac in enumerate(outputs.layer_tokens_remained): | |
| all_layer_tokens_remained[layer_idx].append(mac) | |
| skim_loss = torch.mean(torch.stack(all_skim_loss)) | |
| tokens_remained = torch.mean(torch.stack(all_tokens_remained)) | |
| layers_result = [torch.mean(torch.stack(macs)) for i,macs in enumerate(all_layer_tokens_remained)] | |
| return skim_loss,tokens_remained,layers_result | |
| def load_data(self,model_path_key,mode='train'): | |
| path = f'flops_count/{model_path_key}/{mode}' | |
| if os.path.exists(f'{path}/process_data.pth'): | |
| print(f'loading data from {path}') | |
| data = torch.load(f'{path}/process_data.pth') | |
| else: | |
| time_list = torch.load(f'{path}/time_list.pth') | |
| ratio_list = torch.load(f'{path}/ratio_list.pth') | |
| token_num_list = torch.load(f'{path}/text_len_list_tokenizer.pth') | |
| ratio_list_ = [] | |
| for ratio in ratio_list: | |
| ratio_list_.append(ratio.item()) | |
| y = np.expand_dims(np.array(ratio_list_),axis=1) | |
| # print(x.shape) | |
| time_list_ = [] | |
| for time,token_num in zip(time_list,token_num_list): | |
| time_list_.append((time/(token_num*(10**8)))) | |
| x = np.expand_dims(np.array(time_list_),axis=1) | |
| # print(y.shape) | |
| data = dict() | |
| data['x']=x | |
| data['y']=y | |
| torch.save(data,f'{path}/process_data.pth') | |
| return data | |
| def predict(self,x): | |
| return self.w*x+self.b | |
| def linear_regression(self): | |
| print("="*20) | |
| print('Linear Regression Generation') | |
| data_train = self.load_data(self.name,mode='train') | |
| data_test = self.load_data(self.name,mode='test') | |
| # print(data_train,data_test) | |
| reg = LinearRegression().fit(data_train['x'],data_train['y']) | |
| train_score = reg.score(data_train['x'],data_train['y']) | |
| test_score = reg.score(data_test['x'],data_test['y']) | |
| print(f'train set score: {train_score}') | |
| print(f'test set score: {test_score}') | |
| self.w = reg.coef_[0][0] | |
| self.b = reg.intercept_[0] | |
| print("w:",self.w) | |
| print("b:",self.b) | |
| print(self.predict(0.8)) | |
| class MyAttack(BaseAttack): | |
| def __init__(self, name, model, tokenizer, device, max_per, padding, max_length, label_to_id, sentence1_key, sentence2_key): | |
| super(MyAttack, self).__init__(name, model, tokenizer, device, max_per, padding, max_length, label_to_id, sentence1_key, sentence2_key) | |
| # self.insert_character = string.punctuation | |
| self.insert_character = string.digits | |
| self.insert_character += string.ascii_letters | |
| # self.insert_character -= """"'/\\""" | |
| # print(self.insert_character) | |
| self.origin_ratio = [] | |
| self.attack_ratio = [] | |
| self.layer_result = [] | |
| self.origin_layer_result = [] | |
| # @torch.no_grad() | |
| # def select_best(self, new_strings): | |
| # best_string = None | |
| # best_loss = 0 | |
| # for new_string in new_strings: | |
| # new_predicted_loss = self.compute_loss(new_string) | |
| # if new_predicted_loss>best_loss: | |
| # best_loss = new_predicted_loss | |
| # best_string = new_string | |
| # assert best_string is not None | |
| # return best_string,best_loss | |
| def select_best(self, new_strings): | |
| # self.model.to('cpu') | |
| best_string = None | |
| best_loss = 0 | |
| with Pool(processes=4) as pool: | |
| loss_list = pool.map(self.compute_loss,new_strings) | |
| idx = np.argmax(np.array(loss_list)) | |
| best_loss = loss_list[idx] | |
| best_string = new_strings[idx] | |
| # self.model.to(self.device) | |
| # for new_string in new_strings: | |
| # new_predicted_loss = self.compute_loss(new_string) | |
| # if new_predicted_loss>best_loss: | |
| # best_loss = new_predicted_loss | |
| # best_string = new_string | |
| assert best_string is not None | |
| # self.model.to(self.device) | |
| return best_string,best_loss | |
| def compute_loss(self, xxx): | |
| raise NotImplementedError | |
| def mutation(self, current_adv_text, grad, modify_pos): | |
| raise NotImplementedError | |
| def run_attack(self, text): | |
| # assert len(text) == 1 | |
| # print(text) | |
| text[0] = text[0].strip(" .") | |
| text[1] = text[1].strip(" .") | |
| print(f'Origin Text: {text}') | |
| current_adv_text = deepcopy(text) | |
| # max_per 最多扰动单词的个数 | |
| # pbar = tqdm(range(self.max_per)) | |
| best_loss = 0 | |
| best_tokens_remained = 0 | |
| best_layer_result = None | |
| output,_ = self.get_prob(current_adv_text) | |
| origin_skim_loss,origin_ratio_,origin_layer_result_ = self.output_analysis(output) | |
| print(origin_skim_loss,origin_ratio_) | |
| self.origin_ratio.append(origin_ratio_.item()) | |
| self.origin_layer_result.append(origin_layer_result_) | |
| # for it in pbar: | |
| for _ in range(self.max_per): | |
| # 得到每个修改的位置 | |
| new_strings = self.mutation(current_adv_text) | |
| #print(new_strings) | |
| current_adv_text,current_loss = self.select_best(new_strings) | |
| # print(new_strings) | |
| # print(current_adv_text,current_loss,current_tokens_remained) | |
| if current_loss > best_loss: | |
| best_adv_text = deepcopy(current_adv_text) | |
| best_loss = current_loss | |
| print(best_adv_text) | |
| output,_ = self.get_prob(best_adv_text) | |
| _,best_tokens_remained,best_layer_result = self.output_analysis(output) | |
| self.attack_ratio.append(best_tokens_remained.item()) | |
| self.layer_result.append(best_layer_result) | |
| print(f'Malicious Text: {best_adv_text}') | |
| print(f'Origin Ratio: {self.origin_ratio[-1]} Attack Ratio: {self.attack_ratio[-1]}') | |
| print(f'Layer Result: {self.layer_result[-1]}') | |
| return best_adv_text,best_loss,best_tokens_remained,best_layer_result | |