# feature_engineering.py from __future__ import annotations import re from typing import Iterable, List, Tuple, Optional import numpy as np import pandas as pd try: from sentence_transformers import SentenceTransformer, util as sbert_util except Exception: # чтобы не падать на установке SentenceTransformer = None # type: ignore sbert_util = None # type: ignore try: import language_tool_python except Exception: language_tool_python = None # type: ignore _HTML_TAG_RE = re.compile(r"<[^>]+>") _WS_RE = re.compile(r"\s+") _PUNCT_RE = re.compile(r"[^\w\s?!.,:;ёЁа-яА-Я-]", re.UNICODE) # мини-лексиконы под критерии POLITE_WORDS = {"здравствуйте", "здравствуй", "пожалуйста", "спасибо", "будьте добры"} APOLOGY_WORDS = {"извините", "простите", "прошу прощения"} FAMILY_WORDS = {"семья", "сын", "дочь", "дети", "ребёнок", "муж", "жена", "родители"} SEASON_WORDS = {"зима", "весна", "лето", "осень"} SHOP_WORDS = {"рассрочка", "гарантия", "характеристики", "документы", "касса"} YESNO_WORDS = {"да", "нет", "наверное", "возможно"} def _strip_html(s: str) -> str: s = _HTML_TAG_RE.sub(" ", s) s = _WS_RE.sub(" ", s).strip() return s def _only_text(s: str) -> str: s = s.lower() s = _strip_html(s) s = _PUNCT_RE.sub(" ", s) s = _WS_RE.sub(" ", s).strip() return s def _split_sentences(s: str) -> List[str]: # простая сегментация parts = re.split(r"(?<=[.!?])\s+", s) return [p.strip() for p in parts if p.strip()] def _strip_examiner_lines(text: str) -> str: """ Убираем вероятные реплики экзаменатора: предложения с '?', короткие управляющие фразы ("хорошо.", "итак, ..."). """ sents = _split_sentences(text) kept = [] for i, sent in enumerate(sents): low = sent.lower() if "?" in sent: continue if low in {"хорошо.", "отлично.", "прекрасно.", "молодец."}: continue if low.startswith(("итак", "следующий", "теперь", "будьте", "ответьте")) and "?" in low: continue kept.append(sent) return " ".join(kept) if kept else text def _count_matches(words: Iterable[str], tokens: Iterable[str]) -> int: wset = set(w.lower() for w in words) return sum(1 for t in tokens if t in wset) class FeatureExtractor: """ Лёгкий экстрактор признаков: - очистка текста/HTML - отделение реплик экзаменатора (эвристика) - семантическая близость (SBERT) - длины, кол-во предложений, вопросительных/восклицательных и пр. - индикаторы по заданиям (вежливость, извинение, семья, рассрочка, …) - (опц.) grammar_error_count через LanguageTool """ def __init__( self, sbert_model_name: str = "cointegrated/rubert-tiny", use_grammar: bool = False, strip_examiner: bool = True, ) -> None: self.strip_examiner = strip_examiner # SBERT self.sbert: Optional[SentenceTransformer] if SentenceTransformer is None: self.sbert = None else: self.sbert = SentenceTransformer(sbert_model_name) # Grammar self.grammar = None if use_grammar and language_tool_python is not None: try: self.grammar = language_tool_python.LanguageTool("ru") except Exception: self.grammar = None # безопасно отключаем # --------- примитивные фичи ---------- def _basic_text_stats(self, text: str) -> Tuple[int, int, int, int, int, float]: cleaned = _only_text(text) tokens = cleaned.split() sents = _split_sentences(text) qmarks = text.count("?") emarks = text.count("!") avg_sent_len = (len(tokens) / max(len(sents), 1)) if tokens else 0.0 return len(tokens), len(sents), qmarks, emarks, len(set(tokens)), float(avg_sent_len) def _semantic_sim(self, q: str, a: str) -> float: if not self.sbert or sbert_util is None: return 0.0 try: emb_q = self.sbert.encode([q], convert_to_tensor=True, normalize_embeddings=True) emb_a = self.sbert.encode([a], convert_to_tensor=True, normalize_embeddings=True) sim = float(sbert_util.cos_sim(emb_q, emb_a)[0][0].cpu().item()) # нормализуем к [0..1] примерно return max(0.0, min(1.0, (sim + 1.0) / 2.0)) except Exception: return 0.0 def _grammar_errors(self, text: str) -> int: if not self.grammar: return 0 try: matches = self.grammar.check(text) return len(matches) except Exception: return 0 # --------- фичи под задания ---------- def _question_specific_flags(self, qnum: int, answer_text: str, question_text: str) -> dict: a_clean = _only_text(answer_text) a_tokens = a_clean.split() flags = { "has_politeness": int(_count_matches(POLITE_WORDS, a_tokens) > 0), "has_apology": int(_count_matches(APOLOGY_WORDS, a_tokens) > 0), "has_yesno": int(_count_matches(YESNO_WORDS, a_tokens) > 0), "mentions_family": int(_count_matches(FAMILY_WORDS, a_tokens) > 0), "mentions_season": int(_count_matches(SEASON_WORDS, a_tokens) > 0), "mentions_shop": int(_count_matches(SHOP_WORDS, a_tokens) > 0), "has_question_mark": int("?" in answer_text), } # лёгкие правила по задачам if qnum == 1: # извиниться + спросить flags["task_completed_like_q1"] = int(flags["has_apology"] and flags["has_question_mark"]) elif qnum == 2: # диалоговые ответы flags["task_completed_like_q2"] = int(flags["has_yesno"] or len(a_tokens) > 12) elif qnum == 3: # магазин: документы/рассрочка/характеристики flags["task_completed_like_q3"] = int(flags["mentions_shop"] or len(a_tokens) > 25) elif qnum == 4: # описание картинки + семья/дети flags["task_completed_like_q4"] = int(flags["mentions_family"] or flags["mentions_season"]) else: flags["task_completed_like_q1"] = 0 # семантика вопрос-ответ flags["qa_semantic_sim"] = self._semantic_sim(question_text, answer_text) return flags # --------- публичное API ---------- def extract_row_features(self, row: pd.Series) -> dict: qnum = int(row.get("№ вопроса") or row.get("question_number") or 0) qtext_raw = str(row.get("Текст вопроса") or row.get("question_text") or "") atext_raw = str(row.get("Транскрибация") or row.get("transcript") or row.get("answer_text") or "") qtext = _strip_html(qtext_raw) atext = _strip_html(atext_raw) if self.strip_examiner: atext = _strip_examiner_lines(atext) tok_len, sent_cnt, qmarks, emarks, uniq, avg_sent = self._basic_text_stats(atext) grams = self._grammar_errors(atext) base = { "question_number": qnum, "question_text": qtext, "answer_text": atext, "tokens_len": tok_len, "sent_count": sent_cnt, "q_mark_count": qmarks, "excl_mark_count": emarks, "uniq_tokens": uniq, "avg_sent_len": avg_sent, "grammar_errors": grams, "answer_len_chars": len(atext), } base.update(self._question_specific_flags(qnum, atext, qtext)) return base def extract_all_features(self, df: pd.DataFrame) -> pd.DataFrame: feats = [self.extract_row_features(r) for _, r in df.iterrows()] out = pd.DataFrame(feats) # защитимся от NaN и типов num_cols = [c for c in out.columns if c not in {"question_text", "answer_text"}] for c in num_cols: if c not in {"question_text", "answer_text"}: out[c] = pd.to_numeric(out[c], errors="coerce") out = out.fillna( {c: 0 for c in out.columns if c not in {"question_text", "answer_text"}} ) return out