|
|
from typing import List
|
|
|
|
|
|
import spacy
|
|
|
from presidio_analyzer import (
|
|
|
AnalyzerEngine,
|
|
|
EntityRecognizer,
|
|
|
Pattern,
|
|
|
PatternRecognizer,
|
|
|
RecognizerResult,
|
|
|
)
|
|
|
from presidio_analyzer.nlp_engine import (
|
|
|
NerModelConfiguration,
|
|
|
NlpArtifacts,
|
|
|
SpacyNlpEngine,
|
|
|
)
|
|
|
from spacy.matcher import Matcher
|
|
|
from spaczz.matcher import FuzzyMatcher
|
|
|
|
|
|
spacy.prefer_gpu()
|
|
|
import os
|
|
|
import re
|
|
|
|
|
|
import gradio as gr
|
|
|
import Levenshtein
|
|
|
import requests
|
|
|
from spacy.cli.download import download
|
|
|
|
|
|
from tools.config import (
|
|
|
CUSTOM_ENTITIES,
|
|
|
DEFAULT_LANGUAGE,
|
|
|
SPACY_MODEL_PATH,
|
|
|
TESSERACT_DATA_FOLDER,
|
|
|
)
|
|
|
|
|
|
score_threshold = 0.001
|
|
|
custom_entities = CUSTOM_ENTITIES
|
|
|
|
|
|
|
|
|
|
|
|
class LoadedSpacyNlpEngine(SpacyNlpEngine):
|
|
|
def __init__(self, loaded_spacy_model, language_code: str):
|
|
|
super().__init__(
|
|
|
ner_model_configuration=NerModelConfiguration(
|
|
|
labels_to_ignore=["CARDINAL", "ORDINAL"]
|
|
|
)
|
|
|
)
|
|
|
self.nlp = {language_code: loaded_spacy_model}
|
|
|
|
|
|
|
|
|
def _base_language_code(language: str) -> str:
|
|
|
lang = _normalize_language_input(language)
|
|
|
if "_" in lang:
|
|
|
return lang.split("_")[0]
|
|
|
return lang
|
|
|
|
|
|
|
|
|
def load_spacy_model(language: str = DEFAULT_LANGUAGE):
|
|
|
"""
|
|
|
Load a spaCy model for the requested language and return it as `nlp`.
|
|
|
|
|
|
Accepts common inputs like: "en", "en_lg", "en_sm", "de", "fr", "es", "it", "nl", "pt", "zh", "ja", "xx".
|
|
|
Falls back through sensible candidates and will download if missing.
|
|
|
"""
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
|
if SPACY_MODEL_PATH and SPACY_MODEL_PATH.strip():
|
|
|
os.environ["SPACY_DATA"] = SPACY_MODEL_PATH
|
|
|
print(f"Setting spaCy model path to: {SPACY_MODEL_PATH}")
|
|
|
else:
|
|
|
print("Using default spaCy model storage location")
|
|
|
|
|
|
synonyms = {
|
|
|
"english": "en",
|
|
|
"catalan": "ca",
|
|
|
"danish": "da",
|
|
|
"german": "de",
|
|
|
"french": "fr",
|
|
|
"greek": "el",
|
|
|
"finnish": "fi",
|
|
|
"croatian": "hr",
|
|
|
"lithuanian": "lt",
|
|
|
"macedonian": "mk",
|
|
|
"norwegian_bokmaal": "nb",
|
|
|
"polish": "pl",
|
|
|
"russian": "ru",
|
|
|
"slovenian": "sl",
|
|
|
"swedish": "sv",
|
|
|
"dutch": "nl",
|
|
|
"portuguese": "pt",
|
|
|
"chinese": "zh",
|
|
|
"japanese": "ja",
|
|
|
"multilingual": "xx",
|
|
|
}
|
|
|
|
|
|
lang_norm = _normalize_language_input(language)
|
|
|
lang_norm = synonyms.get(lang_norm, lang_norm)
|
|
|
base_lang = _base_language_code(lang_norm)
|
|
|
|
|
|
candidates_by_lang = {
|
|
|
|
|
|
"en": [
|
|
|
"en_core_web_lg",
|
|
|
"en_core_web_trf",
|
|
|
"en_core_web_md",
|
|
|
"en_core_web_sm",
|
|
|
],
|
|
|
"en_lg": ["en_core_web_lg"],
|
|
|
"en_trf": ["en_core_web_trf"],
|
|
|
"en_md": ["en_core_web_md"],
|
|
|
"en_sm": ["en_core_web_sm"],
|
|
|
|
|
|
"ca": ["ca_core_news_lg", "ca_core_news_md", "ca_core_news_sm"],
|
|
|
"da": ["da_core_news_lg", "da_core_news_md", "da_core_news_sm"],
|
|
|
"de": ["de_core_news_lg", "de_core_news_md", "de_core_news_sm"],
|
|
|
"el": ["el_core_news_lg", "el_core_news_md", "el_core_news_sm"],
|
|
|
"es": ["es_core_news_lg", "es_core_news_md", "es_core_news_sm"],
|
|
|
"fi": ["fi_core_news_lg", "fi_core_news_md", "fi_core_news_sm"],
|
|
|
"fr": ["fr_core_news_lg", "fr_core_news_md", "fr_core_news_sm"],
|
|
|
"hr": ["hr_core_news_lg", "hr_core_news_md", "hr_core_news_sm"],
|
|
|
"it": ["it_core_news_lg", "it_core_news_md", "it_core_news_sm"],
|
|
|
"ja": ["ja_core_news_lg", "ja_core_news_md", "ja_core_news_sm"],
|
|
|
"ko": ["ko_core_news_lg", "ko_core_news_md", "ko_core_news_sm"],
|
|
|
"lt": ["lt_core_news_lg", "lt_core_news_md", "lt_core_news_sm"],
|
|
|
"mk": ["mk_core_news_lg", "mk_core_news_md", "mk_core_news_sm"],
|
|
|
"nb": [
|
|
|
"nb_core_news_lg",
|
|
|
"nb_core_news_md",
|
|
|
"nb_core_news_sm",
|
|
|
],
|
|
|
"nl": ["nl_core_news_lg", "nl_core_news_md", "nl_core_news_sm"],
|
|
|
"pl": ["pl_core_news_lg", "pl_core_news_md", "pl_core_news_sm"],
|
|
|
"pt": ["pt_core_news_lg", "pt_core_news_md", "pt_core_news_sm"],
|
|
|
"ro": ["ro_core_news_lg", "ro_core_news_md", "ro_core_news_sm"],
|
|
|
"ru": ["ru_core_news_lg", "ru_core_news_md", "ru_core_news_sm"],
|
|
|
"sl": ["sl_core_news_lg", "sl_core_news_md", "sl_core_news_sm"],
|
|
|
"sv": ["sv_core_news_lg", "sv_core_news_md", "sv_core_news_sm"],
|
|
|
"uk": ["uk_core_news_lg", "uk_core_news_md", "uk_core_news_sm"],
|
|
|
"zh": [
|
|
|
"zh_core_web_lg",
|
|
|
"zh_core_web_mod",
|
|
|
"zh_core_web_sm",
|
|
|
"zh_core_web_trf",
|
|
|
],
|
|
|
|
|
|
"xx": ["xx_ent_wiki_sm"],
|
|
|
}
|
|
|
|
|
|
if lang_norm in candidates_by_lang:
|
|
|
candidates = candidates_by_lang[lang_norm]
|
|
|
elif base_lang in candidates_by_lang:
|
|
|
candidates = candidates_by_lang[base_lang]
|
|
|
else:
|
|
|
|
|
|
candidates = candidates_by_lang["xx"]
|
|
|
|
|
|
last_error = None
|
|
|
if language != "en":
|
|
|
print(
|
|
|
f"Attempting to load spaCy model for language '{language}' with candidates: {candidates}"
|
|
|
)
|
|
|
print(
|
|
|
"Note: Models are prioritized by size (lg > md > sm) - will stop after first successful load"
|
|
|
)
|
|
|
|
|
|
for i, candidate in enumerate(candidates):
|
|
|
if language != "en":
|
|
|
print(f"Trying candidate {i+1}/{len(candidates)}: {candidate}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
module = __import__(candidate)
|
|
|
print(f"✓ Successfully imported spaCy model: {candidate}")
|
|
|
return module.load()
|
|
|
except Exception as e:
|
|
|
last_error = e
|
|
|
|
|
|
|
|
|
try:
|
|
|
nlp = spacy.load(candidate)
|
|
|
print(f"✓ Successfully loaded spaCy model via spacy.load: {candidate}")
|
|
|
return nlp
|
|
|
except OSError:
|
|
|
|
|
|
print(f"Model {candidate} not found, attempting to download...")
|
|
|
try:
|
|
|
download(candidate)
|
|
|
print(f"✓ Successfully downloaded spaCy model: {candidate}")
|
|
|
|
|
|
|
|
|
import importlib
|
|
|
import sys
|
|
|
|
|
|
importlib.reload(spacy)
|
|
|
|
|
|
|
|
|
if candidate in sys.modules:
|
|
|
del sys.modules[candidate]
|
|
|
|
|
|
|
|
|
import time
|
|
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
|
nlp = spacy.load(candidate)
|
|
|
print(f"✓ Successfully loaded downloaded spaCy model: {candidate}")
|
|
|
return nlp
|
|
|
except Exception as download_error:
|
|
|
print(f"✗ Failed to download or load {candidate}: {download_error}")
|
|
|
|
|
|
try:
|
|
|
|
|
|
module = __import__(candidate)
|
|
|
print(
|
|
|
f"✓ Successfully loaded {candidate} via direct import after download"
|
|
|
)
|
|
|
return module.load()
|
|
|
except Exception as import_error:
|
|
|
print(f"✗ Direct import also failed: {import_error}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
from spacy.util import get_model_path
|
|
|
|
|
|
model_path = get_model_path(candidate)
|
|
|
if model_path and os.path.exists(model_path):
|
|
|
print(f"Found model at path: {model_path}")
|
|
|
nlp = spacy.load(model_path)
|
|
|
print(
|
|
|
f"✓ Successfully loaded {candidate} from path: {model_path}"
|
|
|
)
|
|
|
return nlp
|
|
|
except Exception as path_error:
|
|
|
print(f"✗ Path-based loading also failed: {path_error}")
|
|
|
|
|
|
last_error = download_error
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
print(f"✗ Failed to load {candidate}: {e}")
|
|
|
last_error = e
|
|
|
continue
|
|
|
|
|
|
|
|
|
error_msg = f"Failed to load spaCy model for language '{language}'"
|
|
|
if last_error:
|
|
|
error_msg += f". Last error: {last_error}"
|
|
|
error_msg += f". Tried candidates: {candidates}"
|
|
|
|
|
|
raise RuntimeError(error_msg)
|
|
|
|
|
|
|
|
|
|
|
|
def _normalize_language_input(language: str) -> str:
|
|
|
return language.strip().lower().replace("-", "_")
|
|
|
|
|
|
|
|
|
|
|
|
ACTIVE_LANGUAGE_CODE = _base_language_code(DEFAULT_LANGUAGE)
|
|
|
nlp = None
|
|
|
|
|
|
|
|
|
def get_tesseract_lang_code(short_code: str):
|
|
|
"""
|
|
|
Maps a two-letter language code to the corresponding Tesseract OCR code.
|
|
|
|
|
|
Args:
|
|
|
short_code (str): The two-letter language code (e.g., "en", "de").
|
|
|
|
|
|
Returns:
|
|
|
str or None: The Tesseract language code (e.g., "eng", "deu"),
|
|
|
or None if no mapping is found.
|
|
|
"""
|
|
|
|
|
|
|
|
|
lang_map = {
|
|
|
"en": "eng",
|
|
|
"de": "deu",
|
|
|
"fr": "fra",
|
|
|
"es": "spa",
|
|
|
"it": "ita",
|
|
|
"nl": "nld",
|
|
|
"pt": "por",
|
|
|
"zh": "chi_sim",
|
|
|
"ja": "jpn",
|
|
|
"ko": "kor",
|
|
|
"lt": "lit",
|
|
|
"mk": "mkd",
|
|
|
"nb": "nor",
|
|
|
"pl": "pol",
|
|
|
"ro": "ron",
|
|
|
"ru": "rus",
|
|
|
"sl": "slv",
|
|
|
"sv": "swe",
|
|
|
"uk": "ukr",
|
|
|
}
|
|
|
|
|
|
return lang_map.get(short_code)
|
|
|
|
|
|
|
|
|
def download_tesseract_lang_pack(
|
|
|
short_lang_code: str, tessdata_dir=TESSERACT_DATA_FOLDER
|
|
|
):
|
|
|
"""
|
|
|
Downloads a Tesseract language pack to a local directory.
|
|
|
|
|
|
Args:
|
|
|
lang_code (str): The short code for the language (e.g., "eng", "fra").
|
|
|
tessdata_dir (str, optional): The directory to save the language pack.
|
|
|
Defaults to "tessdata".
|
|
|
"""
|
|
|
|
|
|
|
|
|
if not os.path.exists(tessdata_dir):
|
|
|
os.makedirs(tessdata_dir)
|
|
|
|
|
|
|
|
|
lang_code = get_tesseract_lang_code(short_lang_code)
|
|
|
|
|
|
if lang_code is None:
|
|
|
raise ValueError(
|
|
|
f"Language code {short_lang_code} not found in Tesseract language map"
|
|
|
)
|
|
|
|
|
|
|
|
|
file_path = os.path.join(tessdata_dir, f"{lang_code}.traineddata")
|
|
|
|
|
|
|
|
|
if os.path.exists(file_path):
|
|
|
print(f"Language pack {lang_code}.traineddata already exists at {file_path}")
|
|
|
return file_path
|
|
|
|
|
|
|
|
|
url = f"https://raw.githubusercontent.com/tesseract-ocr/tessdata/main/{lang_code}.traineddata"
|
|
|
|
|
|
|
|
|
try:
|
|
|
response = requests.get(url, stream=True, timeout=60)
|
|
|
response.raise_for_status()
|
|
|
|
|
|
with open(file_path, "wb") as f:
|
|
|
for chunk in response.iter_content(chunk_size=8192):
|
|
|
f.write(chunk)
|
|
|
|
|
|
print(f"Successfully downloaded {lang_code}.traineddata to {file_path}")
|
|
|
return file_path
|
|
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
|
print(f"Error downloading {lang_code}.traineddata: {e}")
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def _is_regex_pattern(term: str) -> bool:
|
|
|
"""
|
|
|
Detect if a term is intended to be a regex pattern or a literal string.
|
|
|
|
|
|
Args:
|
|
|
term: The term to check
|
|
|
|
|
|
Returns:
|
|
|
True if the term appears to be a regex pattern, False if it's a literal string
|
|
|
"""
|
|
|
term = term.strip()
|
|
|
if not term:
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
re.compile(term)
|
|
|
is_valid_regex = True
|
|
|
except re.error:
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
regex_metacharacters = [
|
|
|
"+",
|
|
|
"*",
|
|
|
"?",
|
|
|
"{",
|
|
|
"}",
|
|
|
"[",
|
|
|
"]",
|
|
|
"(",
|
|
|
")",
|
|
|
"|",
|
|
|
"^",
|
|
|
"$",
|
|
|
".",
|
|
|
]
|
|
|
|
|
|
|
|
|
regex_escape_sequences = [
|
|
|
"\\d",
|
|
|
"\\w",
|
|
|
"\\s",
|
|
|
"\\D",
|
|
|
"\\W",
|
|
|
"\\S",
|
|
|
"\\b",
|
|
|
"\\B",
|
|
|
"\\n",
|
|
|
"\\t",
|
|
|
"\\r",
|
|
|
]
|
|
|
|
|
|
|
|
|
has_metacharacters = False
|
|
|
has_escape_sequences = False
|
|
|
|
|
|
i = 0
|
|
|
while i < len(term):
|
|
|
if term[i] == "\\" and i + 1 < len(term):
|
|
|
|
|
|
escape_seq = term[i : i + 2]
|
|
|
if escape_seq in regex_escape_sequences:
|
|
|
has_escape_sequences = True
|
|
|
|
|
|
i += 2
|
|
|
continue
|
|
|
if term[i] in regex_metacharacters:
|
|
|
has_metacharacters = True
|
|
|
i += 1
|
|
|
|
|
|
|
|
|
if is_valid_regex and (has_metacharacters or has_escape_sequences):
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if has_escape_sequences:
|
|
|
return True
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
def custom_word_list_recogniser(custom_list: List[str] = list()):
|
|
|
|
|
|
|
|
|
|
|
|
quote_str = '"'
|
|
|
replace_str = '(?:"|"|")'
|
|
|
|
|
|
regex_patterns = []
|
|
|
literal_patterns = []
|
|
|
|
|
|
|
|
|
for term in custom_list:
|
|
|
term = term.strip()
|
|
|
if not term:
|
|
|
continue
|
|
|
|
|
|
if _is_regex_pattern(term):
|
|
|
|
|
|
|
|
|
|
|
|
regex_patterns.append(term)
|
|
|
else:
|
|
|
|
|
|
escaped_term = re.escape(term).replace(quote_str, replace_str)
|
|
|
literal_patterns.append(rf"(?<!\w){escaped_term}(?!\w)")
|
|
|
|
|
|
|
|
|
all_patterns = []
|
|
|
|
|
|
|
|
|
for pattern in regex_patterns:
|
|
|
all_patterns.append(f"({pattern})")
|
|
|
|
|
|
|
|
|
all_patterns.extend(literal_patterns)
|
|
|
|
|
|
if not all_patterns:
|
|
|
|
|
|
custom_pattern = Pattern(
|
|
|
name="custom_pattern", regex="(?!)", score=1
|
|
|
)
|
|
|
else:
|
|
|
custom_regex = "|".join(all_patterns)
|
|
|
|
|
|
custom_pattern = Pattern(name="custom_pattern", regex=custom_regex, score=1)
|
|
|
|
|
|
custom_recogniser = PatternRecognizer(
|
|
|
supported_entity="CUSTOM",
|
|
|
name="CUSTOM",
|
|
|
patterns=[custom_pattern],
|
|
|
global_regex_flags=re.DOTALL | re.MULTILINE | re.IGNORECASE,
|
|
|
)
|
|
|
|
|
|
return custom_recogniser
|
|
|
|
|
|
|
|
|
|
|
|
custom_recogniser = custom_word_list_recogniser()
|
|
|
|
|
|
|
|
|
titles_list = [
|
|
|
"Sir",
|
|
|
"Ma'am",
|
|
|
"Madam",
|
|
|
"Mr",
|
|
|
"Mr.",
|
|
|
"Mrs",
|
|
|
"Mrs.",
|
|
|
"Ms",
|
|
|
"Ms.",
|
|
|
"Miss",
|
|
|
"Dr",
|
|
|
"Dr.",
|
|
|
"Professor",
|
|
|
]
|
|
|
titles_regex = (
|
|
|
"\\b" + "\\b|\\b".join(rf"{re.escape(title)}" for title in titles_list) + "\\b"
|
|
|
)
|
|
|
titles_pattern = Pattern(name="titles_pattern", regex=titles_regex, score=1)
|
|
|
titles_recogniser = PatternRecognizer(
|
|
|
supported_entity="TITLES",
|
|
|
name="TITLES",
|
|
|
patterns=[titles_pattern],
|
|
|
global_regex_flags=re.DOTALL | re.MULTILINE,
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ukpostcode_pattern = Pattern(
|
|
|
name="ukpostcode_pattern",
|
|
|
regex=r"\b([A-Z]{1,2}\d[A-Z\d]? ?\d[A-Z]{2}|GIR ?0AA)\b",
|
|
|
score=1,
|
|
|
)
|
|
|
|
|
|
|
|
|
ukpostcode_recogniser = PatternRecognizer(
|
|
|
supported_entity="UKPOSTCODE", name="UKPOSTCODE", patterns=[ukpostcode_pattern]
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_street_name(text: str) -> str:
|
|
|
"""
|
|
|
Extracts the street name and preceding word (that should contain at least one number) from the given text.
|
|
|
|
|
|
"""
|
|
|
|
|
|
street_types = [
|
|
|
"Street",
|
|
|
"St",
|
|
|
"Boulevard",
|
|
|
"Blvd",
|
|
|
"Highway",
|
|
|
"Hwy",
|
|
|
"Broadway",
|
|
|
"Freeway",
|
|
|
"Causeway",
|
|
|
"Cswy",
|
|
|
"Expressway",
|
|
|
"Way",
|
|
|
"Walk",
|
|
|
"Lane",
|
|
|
"Ln",
|
|
|
"Road",
|
|
|
"Rd",
|
|
|
"Avenue",
|
|
|
"Ave",
|
|
|
"Circle",
|
|
|
"Cir",
|
|
|
"Cove",
|
|
|
"Cv",
|
|
|
"Drive",
|
|
|
"Dr",
|
|
|
"Parkway",
|
|
|
"Pkwy",
|
|
|
"Park",
|
|
|
"Court",
|
|
|
"Ct",
|
|
|
"Square",
|
|
|
"Sq",
|
|
|
"Loop",
|
|
|
"Place",
|
|
|
"Pl",
|
|
|
"Parade",
|
|
|
"Estate",
|
|
|
"Alley",
|
|
|
"Arcade",
|
|
|
"Avenue",
|
|
|
"Ave",
|
|
|
"Bay",
|
|
|
"Bend",
|
|
|
"Brae",
|
|
|
"Byway",
|
|
|
"Close",
|
|
|
"Corner",
|
|
|
"Cove",
|
|
|
"Crescent",
|
|
|
"Cres",
|
|
|
"Cul-de-sac",
|
|
|
"Dell",
|
|
|
"Drive",
|
|
|
"Dr",
|
|
|
"Esplanade",
|
|
|
"Glen",
|
|
|
"Green",
|
|
|
"Grove",
|
|
|
"Heights",
|
|
|
"Hts",
|
|
|
"Mews",
|
|
|
"Parade",
|
|
|
"Path",
|
|
|
"Piazza",
|
|
|
"Promenade",
|
|
|
"Quay",
|
|
|
"Ridge",
|
|
|
"Row",
|
|
|
"Terrace",
|
|
|
"Ter",
|
|
|
"Track",
|
|
|
"Trail",
|
|
|
"View",
|
|
|
"Villas",
|
|
|
"Marsh",
|
|
|
"Embankment",
|
|
|
"Cut",
|
|
|
"Hill",
|
|
|
"Passage",
|
|
|
"Rise",
|
|
|
"Vale",
|
|
|
"Side",
|
|
|
]
|
|
|
|
|
|
|
|
|
street_types_pattern = "|".join(
|
|
|
rf"{re.escape(street_type)}" for street_type in street_types
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
pattern = r"(?P<preceding_word>\w*\d\w*)\s*"
|
|
|
pattern += rf"(?P<street_name>\w+\s*\b(?:{street_types_pattern})\b)"
|
|
|
|
|
|
|
|
|
matches = re.finditer(pattern, text, re.DOTALL | re.MULTILINE | re.IGNORECASE)
|
|
|
|
|
|
start_positions = list()
|
|
|
end_positions = list()
|
|
|
|
|
|
for match in matches:
|
|
|
match.group("preceding_word").strip()
|
|
|
match.group("street_name").strip()
|
|
|
start_pos = match.start()
|
|
|
end_pos = match.end()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
start_positions.append(start_pos)
|
|
|
end_positions.append(end_pos)
|
|
|
|
|
|
return start_positions, end_positions
|
|
|
|
|
|
|
|
|
class StreetNameRecognizer(EntityRecognizer):
|
|
|
|
|
|
def load(self) -> None:
|
|
|
"""No loading is required."""
|
|
|
pass
|
|
|
|
|
|
def analyze(
|
|
|
self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts
|
|
|
) -> List[RecognizerResult]:
|
|
|
"""
|
|
|
Logic for detecting a specific PII
|
|
|
"""
|
|
|
|
|
|
start_pos, end_pos = extract_street_name(text)
|
|
|
|
|
|
results = list()
|
|
|
|
|
|
for i in range(0, len(start_pos)):
|
|
|
|
|
|
result = RecognizerResult(
|
|
|
entity_type="STREETNAME", start=start_pos[i], end=end_pos[i], score=1
|
|
|
)
|
|
|
|
|
|
results.append(result)
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
street_recogniser = StreetNameRecognizer(supported_entities=["STREETNAME"])
|
|
|
|
|
|
|
|
|
|
|
|
def custom_fuzzy_word_list_regex(text: str, custom_list: List[str] = list()):
|
|
|
|
|
|
|
|
|
quote_str = '"'
|
|
|
replace_str = '(?:"|"|")'
|
|
|
|
|
|
custom_regex_pattern = "|".join(
|
|
|
rf"(?<!\w){re.escape(term.strip()).replace(quote_str, replace_str)}(?!\w)"
|
|
|
for term in custom_list
|
|
|
)
|
|
|
|
|
|
|
|
|
matches = re.finditer(
|
|
|
custom_regex_pattern, text, re.DOTALL | re.MULTILINE | re.IGNORECASE
|
|
|
)
|
|
|
|
|
|
start_positions = list()
|
|
|
end_positions = list()
|
|
|
|
|
|
for match in matches:
|
|
|
start_pos = match.start()
|
|
|
end_pos = match.end()
|
|
|
|
|
|
start_positions.append(start_pos)
|
|
|
end_positions.append(end_pos)
|
|
|
|
|
|
return start_positions, end_positions
|
|
|
|
|
|
|
|
|
class CustomWordFuzzyRecognizer(EntityRecognizer):
|
|
|
def __init__(
|
|
|
self,
|
|
|
supported_entities: List[str],
|
|
|
custom_list: List[str] = list(),
|
|
|
spelling_mistakes_max: int = 1,
|
|
|
search_whole_phrase: bool = True,
|
|
|
):
|
|
|
super().__init__(supported_entities=supported_entities)
|
|
|
self.custom_list = custom_list
|
|
|
self.spelling_mistakes_max = (
|
|
|
spelling_mistakes_max
|
|
|
)
|
|
|
self.search_whole_phrase = (
|
|
|
search_whole_phrase
|
|
|
)
|
|
|
|
|
|
def load(self) -> None:
|
|
|
"""No loading is required."""
|
|
|
pass
|
|
|
|
|
|
def analyze(
|
|
|
self, text: str, entities: List[str], nlp_artifacts: NlpArtifacts
|
|
|
) -> List[RecognizerResult]:
|
|
|
"""
|
|
|
Logic for detecting a specific PII
|
|
|
"""
|
|
|
start_pos, end_pos = spacy_fuzzy_search(
|
|
|
text, self.custom_list, self.spelling_mistakes_max, self.search_whole_phrase
|
|
|
)
|
|
|
|
|
|
results = list()
|
|
|
|
|
|
for i in range(0, len(start_pos)):
|
|
|
result = RecognizerResult(
|
|
|
entity_type="CUSTOM_FUZZY", start=start_pos[i], end=end_pos[i], score=1
|
|
|
)
|
|
|
results.append(result)
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
custom_list_default = list()
|
|
|
custom_word_fuzzy_recognizer = CustomWordFuzzyRecognizer(
|
|
|
supported_entities=["CUSTOM_FUZZY"], custom_list=custom_list_default
|
|
|
)
|
|
|
|
|
|
|
|
|
loaded_nlp_engine = LoadedSpacyNlpEngine(
|
|
|
loaded_spacy_model=nlp, language_code=ACTIVE_LANGUAGE_CODE
|
|
|
)
|
|
|
|
|
|
|
|
|
def create_nlp_analyser(
|
|
|
language: str = DEFAULT_LANGUAGE,
|
|
|
custom_list: List[str] = None,
|
|
|
spelling_mistakes_max: int = 1,
|
|
|
search_whole_phrase: bool = True,
|
|
|
existing_nlp_analyser: AnalyzerEngine = None,
|
|
|
return_also_model: bool = False,
|
|
|
):
|
|
|
"""
|
|
|
Create an nlp_analyser object based on the specified language input.
|
|
|
|
|
|
Args:
|
|
|
language (str): Language code (e.g., "en", "de", "fr", "es", etc.)
|
|
|
custom_list (List[str], optional): List of custom words to recognize. Defaults to None.
|
|
|
spelling_mistakes_max (int, optional): Maximum number of spelling mistakes for fuzzy matching. Defaults to 1.
|
|
|
search_whole_phrase (bool, optional): Whether to search for whole phrases or individual words. Defaults to True.
|
|
|
existing_nlp_analyser (AnalyzerEngine, optional): Existing nlp_analyser object to use. Defaults to None.
|
|
|
return_also_model (bool, optional): Whether to return the nlp_model object as well. Defaults to False.
|
|
|
|
|
|
Returns:
|
|
|
AnalyzerEngine: Configured nlp_analyser object with custom recognizers
|
|
|
"""
|
|
|
|
|
|
if existing_nlp_analyser is None:
|
|
|
pass
|
|
|
else:
|
|
|
if existing_nlp_analyser.supported_languages[0] == language:
|
|
|
nlp_analyser = existing_nlp_analyser
|
|
|
print(f"Using existing nlp_analyser for {language}")
|
|
|
return nlp_analyser
|
|
|
|
|
|
|
|
|
nlp_model = load_spacy_model(language)
|
|
|
|
|
|
|
|
|
base_lang_code = _base_language_code(language)
|
|
|
|
|
|
|
|
|
if custom_list is None:
|
|
|
custom_list = list()
|
|
|
|
|
|
custom_recogniser = custom_word_list_recogniser(custom_list)
|
|
|
custom_word_fuzzy_recognizer = CustomWordFuzzyRecognizer(
|
|
|
supported_entities=["CUSTOM_FUZZY"],
|
|
|
custom_list=custom_list,
|
|
|
spelling_mistakes_max=spelling_mistakes_max,
|
|
|
search_whole_phrase=search_whole_phrase,
|
|
|
)
|
|
|
|
|
|
|
|
|
loaded_nlp_engine = LoadedSpacyNlpEngine(
|
|
|
loaded_spacy_model=nlp_model, language_code=base_lang_code
|
|
|
)
|
|
|
|
|
|
|
|
|
nlp_analyser = AnalyzerEngine(
|
|
|
nlp_engine=loaded_nlp_engine,
|
|
|
default_score_threshold=score_threshold,
|
|
|
supported_languages=[base_lang_code],
|
|
|
log_decision_process=False,
|
|
|
)
|
|
|
|
|
|
|
|
|
nlp_analyser.registry.add_recognizer(custom_recogniser)
|
|
|
nlp_analyser.registry.add_recognizer(custom_word_fuzzy_recognizer)
|
|
|
|
|
|
|
|
|
if base_lang_code == "en":
|
|
|
nlp_analyser.registry.add_recognizer(street_recogniser)
|
|
|
nlp_analyser.registry.add_recognizer(ukpostcode_recogniser)
|
|
|
nlp_analyser.registry.add_recognizer(titles_recogniser)
|
|
|
|
|
|
if return_also_model:
|
|
|
return nlp_analyser, nlp_model
|
|
|
|
|
|
return nlp_analyser
|
|
|
|
|
|
|
|
|
|
|
|
nlp_analyser, nlp = create_nlp_analyser(DEFAULT_LANGUAGE, return_also_model=True)
|
|
|
|
|
|
|
|
|
def spacy_fuzzy_search(
|
|
|
text: str,
|
|
|
custom_query_list: List[str] = list(),
|
|
|
spelling_mistakes_max: int = 1,
|
|
|
search_whole_phrase: bool = True,
|
|
|
nlp=nlp,
|
|
|
progress=gr.Progress(track_tqdm=True),
|
|
|
):
|
|
|
"""Conduct fuzzy match on a list of text data."""
|
|
|
|
|
|
all_matches = list()
|
|
|
all_start_positions = list()
|
|
|
all_end_positions = list()
|
|
|
all_ratios = list()
|
|
|
|
|
|
|
|
|
|
|
|
if not text:
|
|
|
out_message = "No text data found. Skipping page."
|
|
|
print(out_message)
|
|
|
return all_start_positions, all_end_positions
|
|
|
|
|
|
for string_query in custom_query_list:
|
|
|
|
|
|
query = nlp(string_query)
|
|
|
|
|
|
if search_whole_phrase is False:
|
|
|
|
|
|
token_query = [
|
|
|
token.text
|
|
|
for token in query
|
|
|
if not token.is_space and not token.is_stop and not token.is_punct
|
|
|
]
|
|
|
|
|
|
spelling_mistakes_fuzzy_pattern = "FUZZY" + str(spelling_mistakes_max)
|
|
|
|
|
|
if len(token_query) > 1:
|
|
|
|
|
|
pattern_fuzz = [
|
|
|
{"TEXT": {spelling_mistakes_fuzzy_pattern: {"IN": token_query}}}
|
|
|
]
|
|
|
else:
|
|
|
|
|
|
pattern_fuzz = [
|
|
|
{"TEXT": {spelling_mistakes_fuzzy_pattern: token_query[0]}}
|
|
|
]
|
|
|
|
|
|
matcher = Matcher(nlp.vocab)
|
|
|
matcher.add(string_query, [pattern_fuzz])
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
|
matcher = FuzzyMatcher(nlp.vocab)
|
|
|
patterns = [nlp.make_doc(string_query)]
|
|
|
matcher.add("PHRASE", patterns, [{"ignore_case": True}])
|
|
|
|
|
|
batch_size = 256
|
|
|
docs = nlp.pipe([text], batch_size=batch_size)
|
|
|
|
|
|
|
|
|
for doc in docs:
|
|
|
matches = matcher(doc)
|
|
|
match_count = len(matches)
|
|
|
|
|
|
|
|
|
if search_whole_phrase is False:
|
|
|
all_matches.append(match_count)
|
|
|
|
|
|
for match_id, start, end in matches:
|
|
|
span = str(doc[start:end]).strip()
|
|
|
query_search = str(query).strip()
|
|
|
|
|
|
|
|
|
start_char = doc[start].idx
|
|
|
end_char = doc[end - 1].idx + len(
|
|
|
doc[end - 1]
|
|
|
)
|
|
|
|
|
|
|
|
|
all_matches.append(match_count)
|
|
|
all_start_positions.append(start_char)
|
|
|
all_end_positions.append(end_char)
|
|
|
|
|
|
else:
|
|
|
for match_id, start, end, ratio, pattern in matches:
|
|
|
span = str(doc[start:end]).strip()
|
|
|
query_search = str(query).strip()
|
|
|
|
|
|
|
|
|
distance = Levenshtein.distance(query_search.lower(), span.lower())
|
|
|
|
|
|
|
|
|
|
|
|
if distance > spelling_mistakes_max:
|
|
|
match_count = match_count - 1
|
|
|
else:
|
|
|
|
|
|
start_char = doc[start].idx
|
|
|
end_char = doc[end - 1].idx + len(
|
|
|
doc[end - 1]
|
|
|
)
|
|
|
|
|
|
all_matches.append(match_count)
|
|
|
all_start_positions.append(start_char)
|
|
|
all_end_positions.append(end_char)
|
|
|
all_ratios.append(ratio)
|
|
|
|
|
|
return all_start_positions, all_end_positions
|
|
|
|