import torch import torch.nn as nn import torch.nn.functional as F from transformers import BertTokenizer, BertModel import pickle import re import os import sys import numpy as np from collections import defaultdict # ============================================================================= # 1. 모델 클래스 정의 # ============================================================================= # (1) 규칙 기반 스코어러 클래스 class RuleBasedScorer: def __init__(self): # 패턴별 단어 사전 self.patterns = { 11: defaultdict(float), 12: defaultdict(float), 13: defaultdict(float), 14: defaultdict(float) } self.pattern_names = { 11: '의문 유발형(부호)', 12: '의문 유발형(은닉)', 13: '선정표현 사용형', 14: '속어/줄임말 사용형' } # 부호 패턴 (단순 물음표 제외, 과장된 부호만) self.symbol_patterns = { 'repeated': re.compile(r'([!?…~])\1+'), # 반복 부호 (??, !!) 'ellipsis': re.compile(r'\.\.\.|…') # 말줄임표 } def get_score(self, title): # 1. 텍스트 토큰화 (단순 띄어쓰기 및 문자 추출) words = re.findall(r'[가-힣A-Za-z0-9]+', str(title)) scores = {} # 2. 부호 점수 계산 rep = len(self.symbol_patterns['repeated'].findall(title)) ell = len(self.symbol_patterns['ellipsis'].findall(title)) symbol_score = (rep * 30) + (ell * 10) # 3. 패턴별(11~14) 점수 계산 for p in [11, 12, 13, 14]: word_score = 0 # 단어 매칭 점수 (사전에 있는 단어인지 확인) if p in self.patterns: # 안전장치 for word in words: if word in self.patterns[p]: # 가중치 적용 (로그 스케일) word_score += np.log1p(self.patterns[p][word]) * 2 total = 0 # 패턴별 점수 합산 로직 if p == 11: # 의문부호형 total = symbol_score # 오직 부호만 봄 elif p == 12: # 의문은닉형 ("...이유는") total = word_score + (symbol_score * 0.5) else: # 13(선정), 14(속어) total = word_score # 오직 단어만 봄 scores[p] = total # 4. 최종 점수 산출 (가장 높은 점수 선택) if not scores: return {'score': 0, 'pattern': 0, 'pattern_name': '정상'} max_pattern = max(scores, key=scores.get) max_score = min(100, scores[max_pattern]) # 100점 만점 return { 'score': max_score, 'pattern': max_pattern, 'pattern_name': self.pattern_names.get(max_pattern, '알 수 없음') } # 🚨 Pickle 로딩 에러 방지용 import __main__ setattr(__main__, "RuleBasedScorer", RuleBasedScorer) # (2) KoBERT 모델 클래스 class FishingClassifier(nn.Module): def __init__(self, bert, num_classes=2): super().__init__() self.bert = bert self.dropout = nn.Dropout(0.3) self.fc = nn.Linear(768, num_classes) def forward(self, input_ids, mask): _, pooled = self.bert(input_ids=input_ids, attention_mask=mask, return_dict=False) return self.fc(self.dropout(pooled)) # ============================================================================= # 2. 모델 로드 # ============================================================================= print("[AggroModel] 시스템 로딩 시작...") aggro_model = None tokenizer = None rule_scorer = None device = torch.device("cpu") # 절대 경로 설정 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # --- A. 규칙 기반 모델 로드 (.pkl) --- try: pkl_path = os.path.join(BASE_DIR, "rule_based_scorer.pkl") if os.path.exists(pkl_path): with open(pkl_path, "rb") as f: rule_scorer = pickle.load(f) print(f"✅ [Aggro] 규칙 기반 모델 로드 성공!") else: print(f"⚠️ [Aggro] 규칙 파일 없음: {pkl_path}") except Exception as e: print(f"🚨 [Aggro] 규칙 모델 로드 실패: {e}") # --- B. KoBERT 모델 로드 (.pth) --- try: MODEL_NAME = 'skt/kobert-base-v1' try: tokenizer = BertTokenizer.from_pretrained(MODEL_NAME) bert_base = BertModel.from_pretrained(MODEL_NAME) except: print("⚠️ skt 모델 로드 실패, monologg로 재시도...") tokenizer = BertTokenizer.from_pretrained('monologg/kobert') bert_base = BertModel.from_pretrained('monologg/kobert') aggro_model = FishingClassifier(bert_base).to(device) pth_path = os.path.join(BASE_DIR, "bert_fishing_model_best.pth") if os.path.exists(pth_path): state_dict = torch.load(pth_path, map_location=device) aggro_model.load_state_dict(state_dict) aggro_model.eval() print(f"✅ [Aggro] KoBERT 모델 로드 성공!") else: print(f"⚠️ [Aggro] KoBERT 파일 없음: {pth_path}") aggro_model = None except Exception as e: print(f"🚨 [Aggro] KoBERT 로드 실패: {e}") aggro_model = None # ============================================================================= # 3. 메인 함수 # ============================================================================= def get_aggro_score(title: str) -> dict: print(f"\n[DEBUG] 분석할 제목: {title}") # 1. 제목이 잘 들어왔나 확인 # 1. 규칙 기반 점수 rule_score = 0.0 rule_pattern = "분석 불가" try: res = rule_scorer.get_score(title) rule_score = res['score'] # 0~100점 rule_pattern = res.get('pattern_name', '알 수 없음') except Exception as e: print(f"[DEBUG] 규칙 계산 에러: {e}") rule_score = 0.0 print(f"[DEBUG] 1. 규칙 점수: {rule_score}") # 2. 규칙 점수 확인 # 2. KoBERT 점수 bert_score = 0.0 if aggro_model and tokenizer: try: inputs = tokenizer( title, return_tensors='pt', padding="max_length", truncation=True, max_length=64 ) input_ids = inputs['input_ids'].to(device) mask = inputs['attention_mask'].to(device) # 토큰화 결과 확인 (제대로 잘렸는지) # print(f"[DEBUG] 토큰화 결과: {inputs['input_ids'][0][:10]}") with torch.no_grad(): outputs = aggro_model(input_ids, mask) # 🚨 원본 로직 (Logits 값 확인) print(f"[DEBUG] 모델 출력값(Logits): {outputs}") # Temperature Scaling 적용 전/후 비교 probs = F.softmax(outputs / 2.0, dim=1) bert_score = probs[0][1].item() * 100 print(f"[DEBUG] 2. BERT 원본 점수: {bert_score}") # 3. AI 점수 확인 except Exception as e: print(f"[Aggro] KoBERT 예측 오류: {e}") bert_score = 50.0 # 3. Safety Net (점수 깎기) if rule_score < 5: print("[DEBUG] Safety Net 발동! (규칙 점수 미달 -> AI 점수 70% 삭감)") bert_score *= 0.3 elif rule_score < 20: print("[DEBUG] Safety Net 발동! (규칙 점수 낮음 -> AI 점수 20% 삭감)") bert_score *= 0.8 print(f"[DEBUG] 3. 보정된 BERT 점수: {bert_score}") # 4. 깎인 점수 확인 # 4. 최종 합산 w_rule = 0.4 w_bert = 0.6 final_score = (rule_score * w_rule) + (bert_score * w_bert) print(f"[DEBUG] 4. 최종 합산 점수: {final_score}") # # 1. 규칙 기반 점수 # rule_score = 0.0 # rule_pattern = "분석 불가" # if rule_scorer: # try: # res = rule_scorer.get_score(title) # rule_score = res['score'] # rule_pattern = res.get('pattern_name', '알 수 없음') # except Exception as e: # print(f"규칙 계산 에러: {e}") # rule_score = 50.0 # # 2. KoBERT 점수 # bert_score = 0.0 # if aggro_model and tokenizer: # try: # inputs = tokenizer( # title, return_tensors='pt', padding="max_length", truncation=True, max_length=64 # ) # input_ids = inputs['input_ids'].to(device) # mask = inputs['attention_mask'].to(device) # with torch.no_grad(): # outputs = aggro_model(input_ids, mask) # probs = F.softmax(outputs / 2.0, dim=1) # bert_score = probs[0][1].item() * 100 # except: # bert_score = 50.0 # # Safety Net 적용 (규칙 점수가 낮으면 AI 점수도 깎음) # if rule_score < 5: # bert_score *= 0.3 # 규칙 점수가 거의 없으면 AI 점수 70% 삭감 # elif rule_score < 20: # bert_score *= 0.8 # 규칙 점수가 낮으면 AI 점수 20% 삭감 # #3. 합산 # w_rule = 0.0 # w_bert = 1.0 # final_score = (rule_score * w_rule) + (bert_score * w_bert) # 4. 결과 normalized_score = min(final_score / 100.0, 1.0) # 5. 등급 판정 if final_score >= 80: reason = f"매우 높음 🔴 (AI: {bert_score:.2f}점, 규칙: {rule_score:.0f}점,{rule_pattern})" recommendation = "전면 수정 권장" elif final_score >= 60: reason = f"높음 🟠 (AI: {bert_score:.2f}점, 규칙: {rule_score:.0f}점,{rule_pattern})" recommendation = "과장된 표현 수정 필요" elif final_score >= 40: reason = f"보통 🟡 (AI: {bert_score:.2f}점, 규칙: {rule_score:.0f}점,{rule_pattern})" recommendation = "일부 표현 중립화 권장" else: reason = "낮음 🟢" recommendation = "적절한 제목입니다" return { "score": round(normalized_score, 4), "reason": reason, "recommendation": recommendation }