Spaces:
Sleeping
Sleeping
| # feature_engineering.py | |
| from __future__ import annotations | |
| import re | |
| from typing import Iterable, List, Tuple, Optional | |
| import numpy as np | |
| import pandas as pd | |
| try: | |
| from sentence_transformers import SentenceTransformer, util as sbert_util | |
| except Exception: # чтобы не падать на установке | |
| SentenceTransformer = None # type: ignore | |
| sbert_util = None # type: ignore | |
| try: | |
| import language_tool_python | |
| except Exception: | |
| language_tool_python = None # type: ignore | |
| _HTML_TAG_RE = re.compile(r"<[^>]+>") | |
| _WS_RE = re.compile(r"\s+") | |
| _PUNCT_RE = re.compile(r"[^\w\s?!.,:;ёЁа-яА-Я-]", re.UNICODE) | |
| # мини-лексиконы под критерии | |
| POLITE_WORDS = {"здравствуйте", "здравствуй", "пожалуйста", "спасибо", "будьте добры"} | |
| APOLOGY_WORDS = {"извините", "простите", "прошу прощения"} | |
| FAMILY_WORDS = {"семья", "сын", "дочь", "дети", "ребёнок", "муж", "жена", "родители"} | |
| SEASON_WORDS = {"зима", "весна", "лето", "осень"} | |
| SHOP_WORDS = {"рассрочка", "гарантия", "характеристики", "документы", "касса"} | |
| YESNO_WORDS = {"да", "нет", "наверное", "возможно"} | |
| def _strip_html(s: str) -> str: | |
| s = _HTML_TAG_RE.sub(" ", s) | |
| s = _WS_RE.sub(" ", s).strip() | |
| return s | |
| def _only_text(s: str) -> str: | |
| s = s.lower() | |
| s = _strip_html(s) | |
| s = _PUNCT_RE.sub(" ", s) | |
| s = _WS_RE.sub(" ", s).strip() | |
| return s | |
| def _split_sentences(s: str) -> List[str]: | |
| # простая сегментация | |
| parts = re.split(r"(?<=[.!?])\s+", s) | |
| return [p.strip() for p in parts if p.strip()] | |
| def _strip_examiner_lines(text: str) -> str: | |
| """ | |
| Убираем вероятные реплики экзаменатора: предложения с '?', | |
| короткие управляющие фразы ("хорошо.", "итак, ..."). | |
| """ | |
| sents = _split_sentences(text) | |
| kept = [] | |
| for i, sent in enumerate(sents): | |
| low = sent.lower() | |
| if "?" in sent: | |
| continue | |
| if low in {"хорошо.", "отлично.", "прекрасно.", "молодец."}: | |
| continue | |
| if low.startswith(("итак", "следующий", "теперь", "будьте", "ответьте")) and "?" in low: | |
| continue | |
| kept.append(sent) | |
| return " ".join(kept) if kept else text | |
| def _count_matches(words: Iterable[str], tokens: Iterable[str]) -> int: | |
| wset = set(w.lower() for w in words) | |
| return sum(1 for t in tokens if t in wset) | |
| class FeatureExtractor: | |
| """ | |
| Лёгкий экстрактор признаков: | |
| - очистка текста/HTML | |
| - отделение реплик экзаменатора (эвристика) | |
| - семантическая близость (SBERT) | |
| - длины, кол-во предложений, вопросительных/восклицательных и пр. | |
| - индикаторы по заданиям (вежливость, извинение, семья, рассрочка, …) | |
| - (опц.) grammar_error_count через LanguageTool | |
| """ | |
| def __init__( | |
| self, | |
| sbert_model_name: str = "cointegrated/rubert-tiny", | |
| use_grammar: bool = False, | |
| strip_examiner: bool = True, | |
| ) -> None: | |
| self.strip_examiner = strip_examiner | |
| # SBERT | |
| self.sbert: Optional[SentenceTransformer] | |
| if SentenceTransformer is None: | |
| self.sbert = None | |
| else: | |
| self.sbert = SentenceTransformer(sbert_model_name) | |
| # Grammar | |
| self.grammar = None | |
| if use_grammar and language_tool_python is not None: | |
| try: | |
| self.grammar = language_tool_python.LanguageTool("ru") | |
| except Exception: | |
| self.grammar = None # безопасно отключаем | |
| # --------- примитивные фичи ---------- | |
| def _basic_text_stats(self, text: str) -> Tuple[int, int, int, int, int, float]: | |
| cleaned = _only_text(text) | |
| tokens = cleaned.split() | |
| sents = _split_sentences(text) | |
| qmarks = text.count("?") | |
| emarks = text.count("!") | |
| avg_sent_len = (len(tokens) / max(len(sents), 1)) if tokens else 0.0 | |
| return len(tokens), len(sents), qmarks, emarks, len(set(tokens)), float(avg_sent_len) | |
| def _semantic_sim(self, q: str, a: str) -> float: | |
| if not self.sbert or sbert_util is None: | |
| return 0.0 | |
| try: | |
| emb_q = self.sbert.encode([q], convert_to_tensor=True, normalize_embeddings=True) | |
| emb_a = self.sbert.encode([a], convert_to_tensor=True, normalize_embeddings=True) | |
| sim = float(sbert_util.cos_sim(emb_q, emb_a)[0][0].cpu().item()) | |
| # нормализуем к [0..1] примерно | |
| return max(0.0, min(1.0, (sim + 1.0) / 2.0)) | |
| except Exception: | |
| return 0.0 | |
| def _grammar_errors(self, text: str) -> int: | |
| if not self.grammar: | |
| return 0 | |
| try: | |
| matches = self.grammar.check(text) | |
| return len(matches) | |
| except Exception: | |
| return 0 | |
| # --------- фичи под задания ---------- | |
| def _question_specific_flags(self, qnum: int, answer_text: str, question_text: str) -> dict: | |
| a_clean = _only_text(answer_text) | |
| a_tokens = a_clean.split() | |
| flags = { | |
| "has_politeness": int(_count_matches(POLITE_WORDS, a_tokens) > 0), | |
| "has_apology": int(_count_matches(APOLOGY_WORDS, a_tokens) > 0), | |
| "has_yesno": int(_count_matches(YESNO_WORDS, a_tokens) > 0), | |
| "mentions_family": int(_count_matches(FAMILY_WORDS, a_tokens) > 0), | |
| "mentions_season": int(_count_matches(SEASON_WORDS, a_tokens) > 0), | |
| "mentions_shop": int(_count_matches(SHOP_WORDS, a_tokens) > 0), | |
| "has_question_mark": int("?" in answer_text), | |
| } | |
| # лёгкие правила по задачам | |
| if qnum == 1: # извиниться + спросить | |
| flags["task_completed_like_q1"] = int(flags["has_apology"] and flags["has_question_mark"]) | |
| elif qnum == 2: # диалоговые ответы | |
| flags["task_completed_like_q2"] = int(flags["has_yesno"] or len(a_tokens) > 12) | |
| elif qnum == 3: # магазин: документы/рассрочка/характеристики | |
| flags["task_completed_like_q3"] = int(flags["mentions_shop"] or len(a_tokens) > 25) | |
| elif qnum == 4: # описание картинки + семья/дети | |
| flags["task_completed_like_q4"] = int(flags["mentions_family"] or flags["mentions_season"]) | |
| else: | |
| flags["task_completed_like_q1"] = 0 | |
| # семантика вопрос-ответ | |
| flags["qa_semantic_sim"] = self._semantic_sim(question_text, answer_text) | |
| return flags | |
| # --------- публичное API ---------- | |
| def extract_row_features(self, row: pd.Series) -> dict: | |
| qnum = int(row.get("№ вопроса") or row.get("question_number") or 0) | |
| qtext_raw = str(row.get("Текст вопроса") or row.get("question_text") or "") | |
| atext_raw = str(row.get("Транскрибация") or row.get("transcript") or row.get("answer_text") or "") | |
| qtext = _strip_html(qtext_raw) | |
| atext = _strip_html(atext_raw) | |
| if self.strip_examiner: | |
| atext = _strip_examiner_lines(atext) | |
| tok_len, sent_cnt, qmarks, emarks, uniq, avg_sent = self._basic_text_stats(atext) | |
| grams = self._grammar_errors(atext) | |
| base = { | |
| "question_number": qnum, | |
| "question_text": qtext, | |
| "answer_text": atext, | |
| "tokens_len": tok_len, | |
| "sent_count": sent_cnt, | |
| "q_mark_count": qmarks, | |
| "excl_mark_count": emarks, | |
| "uniq_tokens": uniq, | |
| "avg_sent_len": avg_sent, | |
| "grammar_errors": grams, | |
| "answer_len_chars": len(atext), | |
| } | |
| base.update(self._question_specific_flags(qnum, atext, qtext)) | |
| return base | |
| def extract_all_features(self, df: pd.DataFrame) -> pd.DataFrame: | |
| feats = [self.extract_row_features(r) for _, r in df.iterrows()] | |
| out = pd.DataFrame(feats) | |
| # защитимся от NaN и типов | |
| num_cols = [c for c in out.columns if c not in {"question_text", "answer_text"}] | |
| for c in num_cols: | |
| if c not in {"question_text", "answer_text"}: | |
| out[c] = pd.to_numeric(out[c], errors="coerce") | |
| out = out.fillna( | |
| {c: 0 for c in out.columns if c not in {"question_text", "answer_text"}} | |
| ) | |
| return out | |