Spaces:
Sleeping
Sleeping
| # src/semantic_cache.py | |
| from __future__ import annotations | |
| import sqlite3 | |
| import hashlib | |
| from pathlib import Path | |
| from typing import List, Dict, Tuple | |
| import numpy as np | |
| # Абсолютный путь к БД: <корень проекта>/data/cache/embeddings.sqlite | |
| ROOT = Path(__file__).resolve().parents[1] | |
| DB_PATH = ROOT / "data" / "cache" / "embeddings.sqlite" | |
| def _norm_text(t: str) -> str: | |
| return " ".join((t or "").strip().split()) | |
| def _hash_text(t: str) -> str: | |
| return hashlib.blake2s(_norm_text(t).encode("utf-8")).hexdigest() | |
| def _ensure_db() -> None: | |
| DB_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| with sqlite3.connect(DB_PATH) as con: | |
| # Немного настроек для Windows, чтобы запись была надёжнее | |
| con.execute("PRAGMA journal_mode = WAL;") | |
| con.execute("PRAGMA synchronous = NORMAL;") | |
| con.execute(""" | |
| CREATE TABLE IF NOT EXISTS emb ( | |
| h TEXT PRIMARY KEY, | |
| text TEXT NOT NULL, | |
| dim INTEGER NOT NULL, | |
| vec BLOB NOT NULL | |
| ) | |
| """) | |
| con.commit() | |
| def fetch_from_cache(texts: List[str]) -> Dict[str, np.ndarray]: | |
| _ensure_db() | |
| hashes = [_hash_text(t) for t in texts] | |
| out: Dict[str, np.ndarray] = {} | |
| if not hashes: | |
| return out | |
| with sqlite3.connect(DB_PATH) as con: | |
| q = "SELECT h, dim, vec FROM emb WHERE h IN ({})".format(",".join("?" * len(hashes))) | |
| for h, dim, blob in con.execute(q, hashes): | |
| arr = np.frombuffer(blob, dtype=np.float32).reshape(dim) | |
| out[h] = arr | |
| return out | |
| def write_to_cache(items: List[Tuple[str, str, np.ndarray]]) -> None: | |
| if not items: | |
| return | |
| _ensure_db() | |
| with sqlite3.connect(DB_PATH) as con: | |
| con.executemany( | |
| "INSERT OR REPLACE INTO emb(h, text, dim, vec) VALUES (?,?,?,?)", | |
| [(h, _norm_text(t), v.size, v.astype(np.float32).tobytes()) for h, t, v in items] | |
| ) | |
| con.commit() | |
| def embed_with_cache(texts: List[str], model, batch_size: int = 16, verbose: bool = True) -> np.ndarray: | |
| """ | |
| Возвращает эмбеддинги для texts. Сначала достаёт из кэша, | |
| для недостающих — считает моделью и кладёт в кэш. | |
| """ | |
| _ensure_db() | |
| hashes = [_hash_text(t) for t in texts] | |
| cached = fetch_from_cache(texts) # hash -> vec | |
| out: List[np.ndarray | None] = [None] * len(texts) | |
| missing_idx = [i for i, h in enumerate(hashes) if h not in cached] | |
| if verbose: | |
| print(f"[cache] DB: {DB_PATH}") | |
| print(f"[cache] всего текстов: {len(texts)} | из кэша найдено: {len(texts) - len(missing_idx)} | посчитать: {len(missing_idx)}") | |
| # Из кэша | |
| for i, h in enumerate(hashes): | |
| if h in cached: | |
| out[i] = cached[h] | |
| # Досчитываем моделью | |
| if missing_idx: | |
| to_compute = [texts[i] for i in missing_idx] | |
| vecs = [] | |
| if verbose: | |
| print(f"[cache] считаем моделью в батчах по {batch_size}...") | |
| for b in range(0, len(to_compute), batch_size): | |
| chunk = to_compute[b:b + batch_size] | |
| if verbose: | |
| print(f"[cache] batch {b//batch_size + 1}/{(len(to_compute)+batch_size-1)//batch_size} | {len(chunk)} примеров") | |
| vecs_chunk = model.encode( | |
| chunk, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, | |
| show_progress_bar=True # включаем прогрессбар | |
| ) | |
| vecs.append(vecs_chunk) | |
| vecs = np.vstack(vecs) | |
| # Запись в out и в кэш | |
| items_for_cache: List[Tuple[str, str, np.ndarray]] = [] | |
| for j, idx in enumerate(missing_idx): | |
| v = vecs[j] | |
| out[idx] = v | |
| items_for_cache.append((hashes[idx], texts[idx], v)) | |
| write_to_cache(items_for_cache) | |
| if verbose: | |
| print(f"[cache] записано в кэш: {len(items_for_cache)} векторов") | |
| # Упаковываем результат | |
| result = np.vstack(out).astype(np.float32) | |
| if verbose: | |
| print(f"[cache] готово: shape={result.shape}, dtype={result.dtype}") | |
| return result | |