|
|
import os |
|
|
from typing import Dict, List, Tuple |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
|
|
|
from tools.config import OUTPUT_FOLDER, SAVE_WORD_SEGMENTER_OUTPUT_IMAGES |
|
|
|
|
|
|
|
|
BLOCK_SIZE_FACTOR = 1.5 |
|
|
C_VALUE = 2 |
|
|
|
|
|
|
|
|
INITIAL_KERNEL_WIDTH_FACTOR = 0.0 |
|
|
INITIAL_VALLEY_THRESHOLD_FACTOR = ( |
|
|
0.0 |
|
|
) |
|
|
MAIN_VALLEY_THRESHOLD_FACTOR = ( |
|
|
0.15 |
|
|
) |
|
|
MIN_SPACE_FACTOR = 0.2 |
|
|
MATCH_TOLERANCE = 0 |
|
|
|
|
|
|
|
|
MIN_AREA_THRESHOLD = 6 |
|
|
DEFAULT_TRIM_PERCENTAGE = ( |
|
|
0.2 |
|
|
) |
|
|
|
|
|
|
|
|
MIN_SKEW_THRESHOLD = 0.5 |
|
|
MAX_SKEW_THRESHOLD = 15.0 |
|
|
|
|
|
|
|
|
def _sanitize_filename(filename: str, max_length: int = 100) -> str: |
|
|
""" |
|
|
Sanitizes a string to be used as a valid filename. |
|
|
Removes or replaces invalid characters for Windows/Linux file systems. |
|
|
|
|
|
Args: |
|
|
filename: The string to sanitize |
|
|
max_length: Maximum length of the sanitized filename |
|
|
|
|
|
Returns: |
|
|
A sanitized string safe for use in file names |
|
|
""" |
|
|
if not filename: |
|
|
return "unnamed" |
|
|
|
|
|
|
|
|
sanitized = filename.replace(" ", "_") |
|
|
|
|
|
|
|
|
|
|
|
invalid_chars = '<>:"/\\|?*' |
|
|
for char in invalid_chars: |
|
|
sanitized = sanitized.replace(char, "_") |
|
|
|
|
|
|
|
|
sanitized = "".join( |
|
|
char for char in sanitized if ord(char) >= 32 or char in "\n\r\t" |
|
|
) |
|
|
|
|
|
|
|
|
sanitized = sanitized.strip(". ") |
|
|
|
|
|
|
|
|
while "__" in sanitized: |
|
|
sanitized = sanitized.replace("__", "_") |
|
|
|
|
|
|
|
|
if len(sanitized) > max_length: |
|
|
sanitized = sanitized[:max_length] |
|
|
|
|
|
|
|
|
if not sanitized: |
|
|
sanitized = "unnamed" |
|
|
|
|
|
return sanitized |
|
|
|
|
|
|
|
|
class AdaptiveSegmenter: |
|
|
""" |
|
|
Line to word segmentation pipeline. It features: |
|
|
1. Adaptive Thresholding. |
|
|
2. Targeted Noise Removal using Connected Component Analysis. |
|
|
3. The robust two-stage adaptive search (Valley -> Kernel). |
|
|
4. CCA for final pixel-perfect refinement. |
|
|
""" |
|
|
|
|
|
def __init__(self, output_folder: str = OUTPUT_FOLDER): |
|
|
self.output_folder = output_folder |
|
|
self.fallback_segmenter = HybridWordSegmenter() |
|
|
|
|
|
def _correct_orientation( |
|
|
self, gray_image: np.ndarray |
|
|
) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Detects and corrects 90-degree orientation issues. |
|
|
""" |
|
|
h, w = gray_image.shape |
|
|
center = (w // 2, h // 2) |
|
|
|
|
|
block_size = 21 |
|
|
if h < block_size: |
|
|
block_size = h if h % 2 != 0 else h - 1 |
|
|
|
|
|
if block_size > 3: |
|
|
binary = cv2.adaptiveThreshold( |
|
|
gray_image, |
|
|
255, |
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
|
|
cv2.THRESH_BINARY_INV, |
|
|
block_size, |
|
|
4, |
|
|
) |
|
|
else: |
|
|
_, binary = cv2.threshold( |
|
|
gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
|
|
) |
|
|
|
|
|
opening_kernel = np.ones((2, 2), np.uint8) |
|
|
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel) |
|
|
|
|
|
coords = np.column_stack(np.where(binary > 0)) |
|
|
if len(coords) < 50: |
|
|
M_orient = cv2.getRotationMatrix2D(center, 0, 1.0) |
|
|
return gray_image, M_orient |
|
|
|
|
|
ymin, xmin = coords.min(axis=0) |
|
|
ymax, xmax = coords.max(axis=0) |
|
|
box_height = ymax - ymin |
|
|
box_width = xmax - xmin |
|
|
|
|
|
orientation_angle = 0.0 |
|
|
if box_height > box_width: |
|
|
orientation_angle = 90.0 |
|
|
else: |
|
|
M_orient = cv2.getRotationMatrix2D(center, 0, 1.0) |
|
|
return gray_image, M_orient |
|
|
|
|
|
M_orient = cv2.getRotationMatrix2D(center, orientation_angle, 1.0) |
|
|
new_w, new_h = h, w |
|
|
M_orient[0, 2] += (new_w - w) / 2 |
|
|
M_orient[1, 2] += (new_h - h) / 2 |
|
|
|
|
|
oriented_gray = cv2.warpAffine( |
|
|
gray_image, |
|
|
M_orient, |
|
|
(new_w, new_h), |
|
|
flags=cv2.INTER_CUBIC, |
|
|
borderMode=cv2.BORDER_REPLICATE, |
|
|
) |
|
|
|
|
|
return oriented_gray, M_orient |
|
|
|
|
|
def _deskew_image(self, gray_image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: |
|
|
""" |
|
|
Detects skew using a robust method that normalizes minAreaRect. |
|
|
""" |
|
|
h, w = gray_image.shape |
|
|
|
|
|
block_size = 21 |
|
|
if h < block_size: |
|
|
block_size = h if h % 2 != 0 else h - 1 |
|
|
|
|
|
if block_size > 3: |
|
|
binary = cv2.adaptiveThreshold( |
|
|
gray_image, |
|
|
255, |
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
|
|
cv2.THRESH_BINARY_INV, |
|
|
block_size, |
|
|
4, |
|
|
) |
|
|
else: |
|
|
_, binary = cv2.threshold( |
|
|
gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
|
|
) |
|
|
|
|
|
opening_kernel = np.ones((2, 2), np.uint8) |
|
|
binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel) |
|
|
|
|
|
coords = np.column_stack(np.where(binary > 0)) |
|
|
if len(coords) < 50: |
|
|
M = cv2.getRotationMatrix2D((w // 2, h // 2), 0, 1.0) |
|
|
return gray_image, M |
|
|
|
|
|
rect = cv2.minAreaRect(coords[:, ::-1]) |
|
|
rect_width, rect_height = rect[1] |
|
|
angle = rect[2] |
|
|
|
|
|
if rect_width < rect_height: |
|
|
rect_width, rect_height = rect_height, rect_width |
|
|
angle += 90 |
|
|
|
|
|
if angle > 45: |
|
|
angle -= 90 |
|
|
elif angle < -45: |
|
|
angle += 90 |
|
|
|
|
|
correction_angle = angle |
|
|
|
|
|
if abs(correction_angle) < MIN_SKEW_THRESHOLD: |
|
|
correction_angle = 0.0 |
|
|
elif abs(correction_angle) > MAX_SKEW_THRESHOLD: |
|
|
correction_angle = 0.0 |
|
|
|
|
|
center = (w // 2, h // 2) |
|
|
M = cv2.getRotationMatrix2D(center, correction_angle, 1.0) |
|
|
|
|
|
deskewed_gray = cv2.warpAffine( |
|
|
gray_image, |
|
|
M, |
|
|
(w, h), |
|
|
flags=cv2.INTER_CUBIC, |
|
|
borderMode=cv2.BORDER_REPLICATE, |
|
|
) |
|
|
|
|
|
return deskewed_gray, M |
|
|
|
|
|
def _get_boxes_from_profile( |
|
|
self, |
|
|
binary_image: np.ndarray, |
|
|
stable_avg_char_width: float, |
|
|
min_space_factor: float, |
|
|
valley_threshold_factor: float, |
|
|
) -> List: |
|
|
""" |
|
|
Extracts word bounding boxes from vertical projection profile. |
|
|
""" |
|
|
img_h, img_w = binary_image.shape |
|
|
vertical_projection = np.sum(binary_image, axis=0) |
|
|
peaks = vertical_projection[vertical_projection > 0] |
|
|
if len(peaks) == 0: |
|
|
return [] |
|
|
avg_peak_height = np.mean(peaks) |
|
|
valley_threshold = int(avg_peak_height * valley_threshold_factor) |
|
|
min_space_width = int(stable_avg_char_width * min_space_factor) |
|
|
|
|
|
patched_projection = vertical_projection.copy() |
|
|
in_gap = False |
|
|
gap_start = 0 |
|
|
|
|
|
for x, col_sum in enumerate(patched_projection): |
|
|
if col_sum <= valley_threshold and not in_gap: |
|
|
in_gap = True |
|
|
gap_start = x |
|
|
elif col_sum > valley_threshold and in_gap: |
|
|
in_gap = False |
|
|
if (x - gap_start) < min_space_width: |
|
|
patched_projection[gap_start:x] = int(avg_peak_height) |
|
|
|
|
|
unlabeled_boxes = [] |
|
|
in_word = False |
|
|
start_x = 0 |
|
|
for x, col_sum in enumerate(patched_projection): |
|
|
if col_sum > valley_threshold and not in_word: |
|
|
start_x = x |
|
|
in_word = True |
|
|
elif col_sum <= valley_threshold and in_word: |
|
|
|
|
|
unlabeled_boxes.append((start_x, 0, x - start_x, img_h)) |
|
|
in_word = False |
|
|
if in_word: |
|
|
unlabeled_boxes.append((start_x, 0, img_w - start_x, img_h)) |
|
|
return unlabeled_boxes |
|
|
|
|
|
def _enforce_logical_constraints( |
|
|
self, output: Dict[str, List], image_width: int, image_height: int |
|
|
) -> Dict[str, List]: |
|
|
""" |
|
|
Enforces geometric sanity checks with 2D awareness. |
|
|
""" |
|
|
if not output or not output["text"]: |
|
|
return output |
|
|
|
|
|
num_items = len(output["text"]) |
|
|
boxes = [] |
|
|
for i in range(num_items): |
|
|
boxes.append( |
|
|
{ |
|
|
"text": output["text"][i], |
|
|
"left": int(output["left"][i]), |
|
|
"top": int(output["top"][i]), |
|
|
"width": int(output["width"][i]), |
|
|
"height": int(output["height"][i]), |
|
|
"conf": output["conf"][i], |
|
|
} |
|
|
) |
|
|
|
|
|
valid_boxes = [] |
|
|
for box in boxes: |
|
|
x0 = max(0, box["left"]) |
|
|
y0 = max(0, box["top"]) |
|
|
x1 = min(image_width, box["left"] + box["width"]) |
|
|
y1 = min(image_height, box["top"] + box["height"]) |
|
|
|
|
|
w = x1 - x0 |
|
|
h = y1 - y0 |
|
|
|
|
|
if w > 0 and h > 0: |
|
|
box["left"] = x0 |
|
|
box["top"] = y0 |
|
|
box["width"] = w |
|
|
box["height"] = h |
|
|
valid_boxes.append(box) |
|
|
boxes = valid_boxes |
|
|
|
|
|
is_vertical = image_height > (image_width * 1.2) |
|
|
if is_vertical: |
|
|
boxes.sort(key=lambda b: (b["top"], b["left"])) |
|
|
else: |
|
|
boxes.sort(key=lambda b: (b["left"], -b["width"])) |
|
|
|
|
|
final_pass_boxes = [] |
|
|
if boxes: |
|
|
keep_indices = [True] * len(boxes) |
|
|
for i in range(len(boxes)): |
|
|
for j in range(len(boxes)): |
|
|
if i == j: |
|
|
continue |
|
|
b1 = boxes[i] |
|
|
b2 = boxes[j] |
|
|
|
|
|
x_nested = (b1["left"] >= b2["left"] - 2) and ( |
|
|
b1["left"] + b1["width"] <= b2["left"] + b2["width"] + 2 |
|
|
) |
|
|
y_nested = (b1["top"] >= b2["top"] - 2) and ( |
|
|
b1["top"] + b1["height"] <= b2["top"] + b2["height"] + 2 |
|
|
) |
|
|
|
|
|
if x_nested and y_nested: |
|
|
if b1["text"] == b2["text"]: |
|
|
if b1["width"] * b1["height"] <= b2["width"] * b2["height"]: |
|
|
keep_indices[i] = False |
|
|
|
|
|
for i, keep in enumerate(keep_indices): |
|
|
if keep: |
|
|
final_pass_boxes.append(boxes[i]) |
|
|
|
|
|
boxes = final_pass_boxes |
|
|
|
|
|
if is_vertical: |
|
|
boxes.sort(key=lambda b: (b["top"], b["left"])) |
|
|
else: |
|
|
boxes.sort(key=lambda b: (b["left"], -b["width"])) |
|
|
|
|
|
for i in range(len(boxes)): |
|
|
for j in range(i + 1, len(boxes)): |
|
|
b1 = boxes[i] |
|
|
b2 = boxes[j] |
|
|
|
|
|
x_overlap = min( |
|
|
b1["left"] + b1["width"], b2["left"] + b2["width"] |
|
|
) - max(b1["left"], b2["left"]) |
|
|
y_overlap = min( |
|
|
b1["top"] + b1["height"], b2["top"] + b2["height"] |
|
|
) - max(b1["top"], b2["top"]) |
|
|
|
|
|
if x_overlap > 0 and y_overlap > 0: |
|
|
if is_vertical: |
|
|
if b1["top"] < b2["top"]: |
|
|
new_h = max(1, b2["top"] - b1["top"]) |
|
|
b1["height"] = new_h |
|
|
else: |
|
|
if b1["left"] < b2["left"]: |
|
|
b1_right = b1["left"] + b1["width"] |
|
|
b2_right = b2["left"] + b2["width"] |
|
|
left_slice_width = max(0, b2["left"] - b1["left"]) |
|
|
right_slice_width = max(0, b1_right - b2_right) |
|
|
|
|
|
if ( |
|
|
b1_right > b2_right |
|
|
and right_slice_width > left_slice_width |
|
|
): |
|
|
b1["left"] = b2_right |
|
|
b1["width"] = right_slice_width |
|
|
else: |
|
|
b1["width"] = max(1, left_slice_width) |
|
|
|
|
|
cleaned_output = { |
|
|
k: [] for k in ["text", "left", "top", "width", "height", "conf"] |
|
|
} |
|
|
if is_vertical: |
|
|
boxes.sort(key=lambda b: (b["top"], b["left"])) |
|
|
else: |
|
|
boxes.sort(key=lambda b: (b["left"], -b["width"])) |
|
|
|
|
|
for box in boxes: |
|
|
for key in cleaned_output.keys(): |
|
|
cleaned_output[key].append(box[key]) |
|
|
|
|
|
return cleaned_output |
|
|
|
|
|
def _is_geometry_valid( |
|
|
self, |
|
|
boxes: List[Tuple[int, int, int, int]], |
|
|
words: List[str], |
|
|
expected_height: float = 0, |
|
|
) -> bool: |
|
|
""" |
|
|
Validates if the detected boxes are physically plausible. |
|
|
[FIX] Improved robustness for punctuation and mixed-case text. |
|
|
""" |
|
|
if len(boxes) != len(words): |
|
|
return False |
|
|
|
|
|
baseline = expected_height |
|
|
|
|
|
if baseline < 5: |
|
|
heights = [b[3] for b in boxes] |
|
|
if heights: |
|
|
baseline = np.median(heights) |
|
|
|
|
|
if baseline < 5: |
|
|
return True |
|
|
|
|
|
for i, box in enumerate(boxes): |
|
|
word = words[i] |
|
|
|
|
|
|
|
|
|
|
|
is_punctuation = not any(c.isalnum() for c in word) |
|
|
if is_punctuation: |
|
|
continue |
|
|
|
|
|
|
|
|
num_chars = len(word) |
|
|
if num_chars < 1: |
|
|
continue |
|
|
|
|
|
width = box[2] |
|
|
height = box[3] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if height < (baseline * 0.20): |
|
|
return False |
|
|
|
|
|
avg_char_width = width / num_chars |
|
|
min_expected = baseline * 0.20 |
|
|
|
|
|
|
|
|
if avg_char_width < min_expected and avg_char_width < 4: |
|
|
|
|
|
if num_chars == 1 and avg_char_width >= 2: |
|
|
continue |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def segment( |
|
|
self, |
|
|
line_data: Dict[str, List], |
|
|
line_image: np.ndarray, |
|
|
min_space_factor=MIN_SPACE_FACTOR, |
|
|
match_tolerance=MATCH_TOLERANCE, |
|
|
image_name: str = None, |
|
|
) -> Tuple[Dict[str, List], bool]: |
|
|
|
|
|
if ( |
|
|
line_image is None |
|
|
or not isinstance(line_image, np.ndarray) |
|
|
or line_image.size == 0 |
|
|
): |
|
|
return ({}, False) |
|
|
|
|
|
if len(line_image.shape) < 2: |
|
|
return ({}, False) |
|
|
if not line_data or not line_data.get("text") or len(line_data["text"]) == 0: |
|
|
return ({}, False) |
|
|
|
|
|
line_text = line_data["text"][0] |
|
|
words = line_text.split() |
|
|
|
|
|
|
|
|
if len(words) <= 1: |
|
|
img_h, img_w = line_image.shape[:2] |
|
|
one_word_result = self.fallback_segmenter.convert_line_to_word_level( |
|
|
line_data, img_w, img_h |
|
|
) |
|
|
return (one_word_result, False) |
|
|
|
|
|
line_number = line_data["line"][0] |
|
|
safe_image_name = _sanitize_filename(image_name or "image", max_length=50) |
|
|
safe_line_number = _sanitize_filename(str(line_number), max_length=10) |
|
|
safe_shortened_line_text = _sanitize_filename(line_text, max_length=10) |
|
|
|
|
|
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
|
|
os.makedirs(self.output_folder, exist_ok=True) |
|
|
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_original.png" |
|
|
os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) |
|
|
cv2.imwrite(output_path, line_image) |
|
|
|
|
|
if len(line_image.shape) == 3: |
|
|
gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY) |
|
|
else: |
|
|
gray = line_image.copy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
oriented_gray, M_orient = self._correct_orientation(gray) |
|
|
deskewed_gray, M_skew = self._deskew_image(oriented_gray) |
|
|
|
|
|
|
|
|
M_orient_3x3 = np.vstack([M_orient, [0, 0, 1]]) |
|
|
M_skew_3x3 = np.vstack([M_skew, [0, 0, 1]]) |
|
|
M_total_3x3 = M_skew_3x3 @ M_orient_3x3 |
|
|
M = M_total_3x3[0:2, :] |
|
|
|
|
|
|
|
|
h, w = deskewed_gray.shape |
|
|
deskewed_line_image = cv2.warpAffine( |
|
|
line_image, |
|
|
M, |
|
|
(w, h), |
|
|
flags=cv2.INTER_CUBIC, |
|
|
borderMode=cv2.BORDER_REPLICATE, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
local_line_data = { |
|
|
"text": line_data["text"], |
|
|
"conf": line_data["conf"], |
|
|
"left": [0], |
|
|
"top": [0], |
|
|
"width": [w], |
|
|
"height": [h], |
|
|
"line": line_data.get("line", [0]), |
|
|
} |
|
|
|
|
|
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
|
|
os.makedirs(self.output_folder, exist_ok=True) |
|
|
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_deskewed.png" |
|
|
cv2.imwrite(output_path, deskewed_line_image) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
approx_char_count = len(line_data["text"][0].replace(" ", "")) |
|
|
if approx_char_count == 0: |
|
|
return {}, False |
|
|
|
|
|
img_h, img_w = deskewed_gray.shape |
|
|
estimated_char_height = img_h * 0.6 |
|
|
avg_char_width_approx = img_w / approx_char_count |
|
|
|
|
|
block_size = int(avg_char_width_approx * BLOCK_SIZE_FACTOR) |
|
|
if block_size % 2 == 0: |
|
|
block_size += 1 |
|
|
if block_size < 3: |
|
|
block_size = 3 |
|
|
|
|
|
|
|
|
binary_adaptive = cv2.adaptiveThreshold( |
|
|
deskewed_gray, |
|
|
255, |
|
|
cv2.ADAPTIVE_THRESH_GAUSSIAN_C, |
|
|
cv2.THRESH_BINARY_INV, |
|
|
block_size, |
|
|
C_VALUE, |
|
|
) |
|
|
otsu_thresh_val, _ = cv2.threshold( |
|
|
deskewed_gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
|
|
) |
|
|
strict_thresh_val = otsu_thresh_val * 0.75 |
|
|
_, binary_strict = cv2.threshold( |
|
|
deskewed_gray, strict_thresh_val, 255, cv2.THRESH_BINARY_INV |
|
|
) |
|
|
binary = cv2.bitwise_and(binary_adaptive, binary_strict) |
|
|
|
|
|
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
|
|
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_binary.png" |
|
|
cv2.imwrite(output_path, binary) |
|
|
|
|
|
|
|
|
morph_width = max(3, int(avg_char_width_approx * 0.40)) |
|
|
morph_height = max(2, int(avg_char_width_approx * 0.1)) |
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (morph_width, morph_height)) |
|
|
closed_binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=1) |
|
|
|
|
|
|
|
|
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats( |
|
|
closed_binary, 8, cv2.CV_32S |
|
|
) |
|
|
clean_binary = np.zeros_like(binary) |
|
|
|
|
|
force_fallback = False |
|
|
significant_labels = 0 |
|
|
if num_labels > 1: |
|
|
|
|
|
significant_labels = np.sum(stats[1:, cv2.CC_STAT_AREA] > 3) |
|
|
|
|
|
if approx_char_count > 0 and significant_labels > (approx_char_count * 12): |
|
|
force_fallback = True |
|
|
|
|
|
if num_labels > 1: |
|
|
areas = stats[1:, cv2.CC_STAT_AREA] |
|
|
if len(areas) == 0: |
|
|
clean_binary = binary |
|
|
areas = np.array([0]) |
|
|
else: |
|
|
p1 = np.percentile(areas, 1) |
|
|
img_h, img_w = binary.shape |
|
|
estimated_char_height = img_h * 0.7 |
|
|
estimated_min_letter_area = max( |
|
|
2, int(estimated_char_height * 0.2 * estimated_char_height * 0.15) |
|
|
) |
|
|
area_threshold = max( |
|
|
MIN_AREA_THRESHOLD, min(p1, estimated_min_letter_area) |
|
|
) |
|
|
|
|
|
|
|
|
sorted_areas = np.sort(areas) |
|
|
area_diffs = np.diff(sorted_areas) |
|
|
if len(sorted_areas) > 10 and len(area_diffs) > 0: |
|
|
jump_threshold = np.percentile(area_diffs, 95) |
|
|
significant_jump_thresh = max(10, jump_threshold * 3) |
|
|
jump_indices = np.where(area_diffs > significant_jump_thresh)[0] |
|
|
if len(jump_indices) > 0: |
|
|
gap_idx = jump_indices[0] |
|
|
area_before_gap = sorted_areas[gap_idx] |
|
|
final_threshold = max(area_before_gap + 1, area_threshold) |
|
|
final_threshold = min(final_threshold, 15) |
|
|
area_threshold = final_threshold |
|
|
|
|
|
for i in range(1, num_labels): |
|
|
if stats[i, cv2.CC_STAT_AREA] >= area_threshold: |
|
|
clean_binary[labels == i] = 255 |
|
|
else: |
|
|
clean_binary = binary |
|
|
|
|
|
|
|
|
horizontal_projection = np.sum(clean_binary, axis=1) |
|
|
y_start = 0 |
|
|
non_zero_rows = np.where(horizontal_projection > 0)[0] |
|
|
if len(non_zero_rows) > 0: |
|
|
p_top = int(np.percentile(non_zero_rows, 5)) |
|
|
p_bottom = int(np.percentile(non_zero_rows, 95)) |
|
|
core_height = p_bottom - p_top |
|
|
trim_pixels = int(core_height * 0.1) |
|
|
y_start = max(0, p_top + trim_pixels) |
|
|
y_end = min(clean_binary.shape[0], p_bottom - trim_pixels) |
|
|
if y_end - y_start < 5: |
|
|
y_start = p_top |
|
|
y_end = p_bottom |
|
|
analysis_image = clean_binary[y_start:y_end, :] |
|
|
else: |
|
|
analysis_image = clean_binary |
|
|
|
|
|
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
|
|
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_clean_binary.png" |
|
|
cv2.imwrite(output_path, analysis_image) |
|
|
|
|
|
|
|
|
best_boxes = None |
|
|
successful_binary_image = None |
|
|
|
|
|
if not force_fallback: |
|
|
words = line_data["text"][0].split() |
|
|
target = len(words) |
|
|
backup_boxes_s1 = None |
|
|
|
|
|
|
|
|
for v_factor in np.arange(INITIAL_VALLEY_THRESHOLD_FACTOR, 0.60, 0.02): |
|
|
curr_boxes = self._get_boxes_from_profile( |
|
|
analysis_image, avg_char_width_approx, min_space_factor, v_factor |
|
|
) |
|
|
diff = abs(target - len(curr_boxes)) |
|
|
is_geom_valid = self._is_geometry_valid( |
|
|
curr_boxes, words, estimated_char_height |
|
|
) |
|
|
|
|
|
if diff == 0: |
|
|
if is_geom_valid: |
|
|
best_boxes = curr_boxes |
|
|
successful_binary_image = analysis_image |
|
|
break |
|
|
else: |
|
|
if backup_boxes_s1 is None: |
|
|
backup_boxes_s1 = curr_boxes |
|
|
if diff == 1 and backup_boxes_s1 is None and is_geom_valid: |
|
|
backup_boxes_s1 = curr_boxes |
|
|
|
|
|
|
|
|
if best_boxes is None: |
|
|
backup_boxes_s2 = None |
|
|
for k_factor in np.arange(INITIAL_KERNEL_WIDTH_FACTOR, 0.5, 0.02): |
|
|
k_w = max(1, int(avg_char_width_approx * k_factor)) |
|
|
s2_bin = cv2.morphologyEx( |
|
|
clean_binary, cv2.MORPH_CLOSE, np.ones((1, k_w), np.uint8) |
|
|
) |
|
|
s2_img = ( |
|
|
s2_bin[y_start:y_end, :] if len(non_zero_rows) > 0 else s2_bin |
|
|
) |
|
|
|
|
|
if s2_img is None or s2_img.size == 0: |
|
|
continue |
|
|
|
|
|
curr_boxes = self._get_boxes_from_profile( |
|
|
s2_img, |
|
|
avg_char_width_approx, |
|
|
min_space_factor, |
|
|
MAIN_VALLEY_THRESHOLD_FACTOR, |
|
|
) |
|
|
diff = abs(target - len(curr_boxes)) |
|
|
is_geom_valid = self._is_geometry_valid( |
|
|
curr_boxes, words, estimated_char_height |
|
|
) |
|
|
|
|
|
if diff == 0 and is_geom_valid: |
|
|
best_boxes = curr_boxes |
|
|
successful_binary_image = s2_bin |
|
|
break |
|
|
|
|
|
if diff == 1 and backup_boxes_s2 is None and is_geom_valid: |
|
|
backup_boxes_s2 = curr_boxes |
|
|
|
|
|
if best_boxes is None: |
|
|
if backup_boxes_s1 is not None: |
|
|
best_boxes = backup_boxes_s1 |
|
|
successful_binary_image = analysis_image |
|
|
elif backup_boxes_s2 is not None: |
|
|
best_boxes = backup_boxes_s2 |
|
|
successful_binary_image = clean_binary |
|
|
|
|
|
final_output = None |
|
|
used_fallback = False |
|
|
|
|
|
if best_boxes is None: |
|
|
|
|
|
used_fallback = True |
|
|
|
|
|
final_output = self.fallback_segmenter.refine_words_bidirectional( |
|
|
local_line_data, deskewed_line_image |
|
|
) |
|
|
else: |
|
|
|
|
|
unlabeled_boxes = best_boxes |
|
|
if successful_binary_image is analysis_image: |
|
|
cca_source_image = clean_binary |
|
|
else: |
|
|
cca_source_image = successful_binary_image |
|
|
|
|
|
num_labels, _, stats, _ = cv2.connectedComponentsWithStats( |
|
|
cca_source_image, 8, cv2.CV_32S |
|
|
) |
|
|
cca_img_h, cca_img_w = cca_source_image.shape[:2] |
|
|
|
|
|
component_assignments = {} |
|
|
num_proc = min(len(words), len(unlabeled_boxes)) |
|
|
min_valid_component_area = estimated_char_height * 2 |
|
|
|
|
|
for j in range(1, num_labels): |
|
|
comp_x = stats[j, cv2.CC_STAT_LEFT] |
|
|
comp_w = stats[j, cv2.CC_STAT_WIDTH] |
|
|
comp_area = stats[j, cv2.CC_STAT_AREA] |
|
|
comp_r = comp_x + comp_w |
|
|
comp_center_x = comp_x + comp_w / 2 |
|
|
comp_y = stats[j, cv2.CC_STAT_TOP] |
|
|
comp_h = stats[j, cv2.CC_STAT_HEIGHT] |
|
|
comp_center_y = comp_y + comp_h / 2 |
|
|
|
|
|
if comp_center_y < cca_img_h * 0.1 or comp_center_y > cca_img_h * 0.9: |
|
|
continue |
|
|
if comp_area < min_valid_component_area: |
|
|
continue |
|
|
|
|
|
best_box_idx = None |
|
|
max_overlap = 0 |
|
|
best_center_distance = float("inf") |
|
|
component_center_in_box = False |
|
|
|
|
|
num_to_process = min(len(words), len(unlabeled_boxes)) |
|
|
|
|
|
|
|
|
for i in range( |
|
|
num_to_process |
|
|
): |
|
|
box_x, box_y, box_w, box_h = unlabeled_boxes[i] |
|
|
box_r = box_x + box_w |
|
|
box_center_x = box_x + box_w / 2 |
|
|
|
|
|
if comp_w > box_w * 1.5: |
|
|
continue |
|
|
|
|
|
if comp_x < box_r and box_x < comp_r: |
|
|
overlap_start = max(comp_x, box_x) |
|
|
overlap_end = min(comp_r, box_r) |
|
|
overlap = overlap_end - overlap_start |
|
|
|
|
|
if overlap > 0: |
|
|
center_in_box = box_x <= comp_center_x < box_r |
|
|
center_distance = abs(comp_center_x - box_center_x) |
|
|
|
|
|
if center_in_box: |
|
|
if not component_center_in_box or overlap > max_overlap: |
|
|
component_center_in_box = True |
|
|
best_center_distance = center_distance |
|
|
max_overlap = overlap |
|
|
best_box_idx = i |
|
|
elif not component_center_in_box: |
|
|
if center_distance < best_center_distance or ( |
|
|
center_distance == best_center_distance |
|
|
and overlap > max_overlap |
|
|
): |
|
|
best_center_distance = center_distance |
|
|
max_overlap = overlap |
|
|
best_box_idx = i |
|
|
|
|
|
if best_box_idx is not None: |
|
|
component_assignments[j] = best_box_idx |
|
|
|
|
|
refined_boxes_list = [] |
|
|
for i in range(num_proc): |
|
|
word_label = words[i] |
|
|
components_in_box = [ |
|
|
stats[j] for j, b in component_assignments.items() if b == i |
|
|
] |
|
|
|
|
|
use_original_box = False |
|
|
if not components_in_box: |
|
|
use_original_box = True |
|
|
else: |
|
|
min_x = min(c[cv2.CC_STAT_LEFT] for c in components_in_box) |
|
|
min_y = min(c[cv2.CC_STAT_TOP] for c in components_in_box) |
|
|
max_r = max( |
|
|
c[cv2.CC_STAT_LEFT] + c[cv2.CC_STAT_WIDTH] |
|
|
for c in components_in_box |
|
|
) |
|
|
max_b = max( |
|
|
c[cv2.CC_STAT_TOP] + c[cv2.CC_STAT_HEIGHT] |
|
|
for c in components_in_box |
|
|
) |
|
|
cca_h = max(1, max_b - min_y) |
|
|
if cca_h < (estimated_char_height * 0.35): |
|
|
use_original_box = True |
|
|
|
|
|
if use_original_box: |
|
|
box_x, box_y, box_w, box_h = unlabeled_boxes[i] |
|
|
adjusted_box_y = y_start + box_y |
|
|
refined_boxes_list.append( |
|
|
{ |
|
|
"text": word_label, |
|
|
"left": box_x, |
|
|
"top": adjusted_box_y, |
|
|
"width": box_w, |
|
|
"height": box_h, |
|
|
"conf": line_data["conf"][0], |
|
|
} |
|
|
) |
|
|
else: |
|
|
refined_boxes_list.append( |
|
|
{ |
|
|
"text": word_label, |
|
|
"left": min_x, |
|
|
"top": min_y, |
|
|
"width": max(1, max_r - min_x), |
|
|
"height": cca_h, |
|
|
"conf": line_data["conf"][0], |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
cca_check_list = [ |
|
|
(b["left"], b["top"], b["width"], b["height"]) |
|
|
for b in refined_boxes_list |
|
|
] |
|
|
if not self._is_geometry_valid( |
|
|
cca_check_list, words, estimated_char_height |
|
|
): |
|
|
if abs(len(refined_boxes_list) - len(words)) > 1: |
|
|
best_boxes = None |
|
|
else: |
|
|
final_output = { |
|
|
k: [] |
|
|
for k in ["text", "left", "top", "width", "height", "conf"] |
|
|
} |
|
|
for box in refined_boxes_list: |
|
|
for key in final_output.keys(): |
|
|
final_output[key].append(box[key]) |
|
|
else: |
|
|
final_output = { |
|
|
k: [] for k in ["text", "left", "top", "width", "height", "conf"] |
|
|
} |
|
|
for box in refined_boxes_list: |
|
|
for key in final_output.keys(): |
|
|
final_output[key].append(box[key]) |
|
|
|
|
|
|
|
|
if best_boxes is None and not used_fallback: |
|
|
used_fallback = True |
|
|
|
|
|
final_output = self.fallback_segmenter.refine_words_bidirectional( |
|
|
local_line_data, deskewed_line_image |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
M_inv = cv2.invertAffineTransform(M) |
|
|
remapped_boxes_list = [] |
|
|
for i in range(len(final_output["text"])): |
|
|
left, top = final_output["left"][i], final_output["top"][i] |
|
|
width, height = final_output["width"][i], final_output["height"][i] |
|
|
|
|
|
|
|
|
corners = np.array( |
|
|
[ |
|
|
[left, top], |
|
|
[left + width, top], |
|
|
[left + width, top + height], |
|
|
[left, top + height], |
|
|
], |
|
|
dtype="float32", |
|
|
) |
|
|
corners_expanded = np.expand_dims(corners, axis=1) |
|
|
original_corners = cv2.transform(corners_expanded, M_inv) |
|
|
squeezed_corners = original_corners.squeeze(axis=1) |
|
|
|
|
|
|
|
|
min_x = int(np.min(squeezed_corners[:, 0])) |
|
|
max_x = int(np.max(squeezed_corners[:, 0])) |
|
|
min_y = int(np.min(squeezed_corners[:, 1])) |
|
|
max_y = int(np.max(squeezed_corners[:, 1])) |
|
|
|
|
|
remapped_boxes_list.append( |
|
|
{ |
|
|
"text": final_output["text"][i], |
|
|
"left": min_x, |
|
|
"top": min_y, |
|
|
"width": max_x - min_x, |
|
|
"height": max_y - min_y, |
|
|
"conf": final_output["conf"][i], |
|
|
} |
|
|
) |
|
|
|
|
|
remapped_output = {k: [] for k in final_output.keys()} |
|
|
for box in remapped_boxes_list: |
|
|
for key in remapped_output.keys(): |
|
|
remapped_output[key].append(box[key]) |
|
|
|
|
|
img_h, img_w = line_image.shape[:2] |
|
|
remapped_output = self._enforce_logical_constraints( |
|
|
remapped_output, img_w, img_h |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
words = line_data["text"][0].split() |
|
|
target_count = len(words) |
|
|
current_count = len(remapped_output["text"]) |
|
|
has_collapsed_boxes = any(w < 3 for w in remapped_output["width"]) |
|
|
|
|
|
if current_count > 0: |
|
|
total_text_len = sum(len(t) for t in remapped_output["text"]) |
|
|
total_box_width = sum(remapped_output["width"]) |
|
|
avg_width_pixels = total_box_width / max(1, total_text_len) |
|
|
else: |
|
|
avg_width_pixels = 0 |
|
|
is_suspiciously_thin = avg_width_pixels < 4 |
|
|
|
|
|
if current_count != target_count or is_suspiciously_thin or has_collapsed_boxes: |
|
|
used_fallback = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
temp_local_output = self.fallback_segmenter.refine_words_bidirectional( |
|
|
local_line_data, deskewed_line_image |
|
|
) |
|
|
|
|
|
|
|
|
if len(temp_local_output["text"]) != target_count: |
|
|
h, w = deskewed_line_image.shape[:2] |
|
|
temp_local_output = self.fallback_segmenter.convert_line_to_word_level( |
|
|
local_line_data, w, h |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
remapped_boxes_list = [] |
|
|
for i in range(len(temp_local_output["text"])): |
|
|
left, top = temp_local_output["left"][i], temp_local_output["top"][i] |
|
|
width, height = ( |
|
|
temp_local_output["width"][i], |
|
|
temp_local_output["height"][i], |
|
|
) |
|
|
|
|
|
corners = np.array( |
|
|
[ |
|
|
[left, top], |
|
|
[left + width, top], |
|
|
[left + width, top + height], |
|
|
[left, top + height], |
|
|
], |
|
|
dtype="float32", |
|
|
) |
|
|
corners_expanded = np.expand_dims(corners, axis=1) |
|
|
original_corners = cv2.transform(corners_expanded, M_inv) |
|
|
squeezed_corners = original_corners.squeeze(axis=1) |
|
|
|
|
|
min_x = int(np.min(squeezed_corners[:, 0])) |
|
|
max_x = int(np.max(squeezed_corners[:, 0])) |
|
|
min_y = int(np.min(squeezed_corners[:, 1])) |
|
|
max_y = int(np.max(squeezed_corners[:, 1])) |
|
|
|
|
|
remapped_boxes_list.append( |
|
|
{ |
|
|
"text": temp_local_output["text"][i], |
|
|
"left": min_x, |
|
|
"top": min_y, |
|
|
"width": max_x - min_x, |
|
|
"height": max_y - min_y, |
|
|
"conf": temp_local_output["conf"][i], |
|
|
} |
|
|
) |
|
|
|
|
|
remapped_output = {k: [] for k in temp_local_output.keys()} |
|
|
for box in remapped_boxes_list: |
|
|
for key in remapped_output.keys(): |
|
|
remapped_output[key].append(box[key]) |
|
|
|
|
|
if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: |
|
|
output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_shortened_line_text}_final_boxes.png" |
|
|
os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) |
|
|
output_image_vis = line_image.copy() |
|
|
for i in range(len(remapped_output["text"])): |
|
|
x, y, w, h = ( |
|
|
int(remapped_output["left"][i]), |
|
|
int(remapped_output["top"][i]), |
|
|
int(remapped_output["width"][i]), |
|
|
int(remapped_output["height"][i]), |
|
|
) |
|
|
cv2.rectangle(output_image_vis, (x, y), (x + w, y + h), (0, 255, 0), 2) |
|
|
cv2.imwrite(output_path, output_image_vis) |
|
|
|
|
|
return remapped_output, used_fallback |
|
|
|
|
|
|
|
|
class HybridWordSegmenter: |
|
|
""" |
|
|
Implements a two-step approach for word segmentation: |
|
|
1. Proportional estimation based on text. |
|
|
2. Image-based refinement with a "Bounded Scan" to prevent |
|
|
over-correction. |
|
|
""" |
|
|
|
|
|
def convert_line_to_word_level( |
|
|
self, line_data: Dict[str, List], image_width: int, image_height: int |
|
|
) -> Dict[str, List]: |
|
|
""" |
|
|
Step 1: Converts line-level OCR results to word-level by using a |
|
|
robust proportional estimation method. |
|
|
Guarantees output box count equals input word count. |
|
|
""" |
|
|
output = { |
|
|
"text": list(), |
|
|
"left": list(), |
|
|
"top": list(), |
|
|
"width": list(), |
|
|
"height": list(), |
|
|
"conf": list(), |
|
|
} |
|
|
|
|
|
if not line_data or not line_data.get("text"): |
|
|
return output |
|
|
|
|
|
i = 0 |
|
|
line_text = line_data["text"][i] |
|
|
line_left = float(line_data["left"][i]) |
|
|
line_top = float(line_data["top"][i]) |
|
|
line_width = float(line_data["width"][i]) |
|
|
line_height = float(line_data["height"][i]) |
|
|
line_conf = line_data["conf"][i] |
|
|
|
|
|
if not line_text.strip(): |
|
|
return output |
|
|
words = line_text.split() |
|
|
if not words: |
|
|
return output |
|
|
num_chars = len("".join(words)) |
|
|
num_spaces = len(words) - 1 |
|
|
if num_chars == 0: |
|
|
return output |
|
|
|
|
|
if (num_chars * 2 + num_spaces) > 0: |
|
|
char_space_ratio = 2.0 |
|
|
estimated_space_width = line_width / ( |
|
|
num_chars * char_space_ratio + num_spaces |
|
|
) |
|
|
avg_char_width = estimated_space_width * char_space_ratio |
|
|
else: |
|
|
avg_char_width = line_width / (num_chars if num_chars > 0 else 1) |
|
|
estimated_space_width = avg_char_width |
|
|
|
|
|
|
|
|
avg_char_width = max(3.0, avg_char_width) |
|
|
min_word_width = max(5.0, avg_char_width * 0.5) |
|
|
|
|
|
current_left = line_left |
|
|
for word in words: |
|
|
raw_word_width = len(word) * avg_char_width |
|
|
|
|
|
|
|
|
word_width = max(min_word_width, raw_word_width) |
|
|
|
|
|
clamped_left = max(0, min(current_left, image_width)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
output["text"].append(word) |
|
|
output["left"].append(clamped_left) |
|
|
output["top"].append(line_top) |
|
|
output["width"].append(word_width) |
|
|
output["height"].append(line_height) |
|
|
output["conf"].append(line_conf) |
|
|
current_left += word_width + estimated_space_width |
|
|
|
|
|
return output |
|
|
|
|
|
def _run_single_pass( |
|
|
self, |
|
|
initial_boxes: List[Dict], |
|
|
vertical_projection: np.ndarray, |
|
|
max_scan_distance: int, |
|
|
img_w: int, |
|
|
direction: str = "ltr", |
|
|
) -> List[Dict]: |
|
|
""" |
|
|
Helper function to run one pass of refinement. |
|
|
IMPROVED: Uses local minima detection for cursive script where |
|
|
perfect zero-gaps (white space) might not exist. |
|
|
""" |
|
|
|
|
|
refined_boxes = [box.copy() for box in initial_boxes] |
|
|
|
|
|
if direction == "ltr": |
|
|
last_corrected_right_edge = 0 |
|
|
indices = range(len(refined_boxes)) |
|
|
else: |
|
|
next_corrected_left_edge = img_w |
|
|
indices = range(len(refined_boxes) - 1, -1, -1) |
|
|
|
|
|
for i in indices: |
|
|
box = refined_boxes[i] |
|
|
left = int(box["left"]) |
|
|
right = int(box["left"] + box["width"]) |
|
|
|
|
|
left = max(0, min(left, img_w - 1)) |
|
|
right = max(0, min(right, img_w - 1)) |
|
|
|
|
|
new_left, new_right = left, right |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if direction == "ltr" or direction == "both": |
|
|
if right < img_w: |
|
|
scan_limit = min(img_w, right + max_scan_distance) |
|
|
search_range = range(right, scan_limit) |
|
|
|
|
|
best_x = right |
|
|
min_density = float("inf") |
|
|
found_zero = False |
|
|
|
|
|
|
|
|
for x in search_range: |
|
|
density = vertical_projection[x] |
|
|
if density == 0: |
|
|
new_right = x |
|
|
found_zero = True |
|
|
break |
|
|
if density < min_density: |
|
|
min_density = density |
|
|
best_x = x |
|
|
|
|
|
if not found_zero: |
|
|
|
|
|
new_right = best_x |
|
|
|
|
|
if direction == "rtl" or direction == "both": |
|
|
if left > 0: |
|
|
scan_limit = max(0, left - max_scan_distance) |
|
|
search_range = range(left, scan_limit, -1) |
|
|
|
|
|
best_x = left |
|
|
min_density = float("inf") |
|
|
found_zero = False |
|
|
|
|
|
for x in search_range: |
|
|
density = vertical_projection[x] |
|
|
if density == 0: |
|
|
new_left = x |
|
|
found_zero = True |
|
|
break |
|
|
if density < min_density: |
|
|
min_density = density |
|
|
best_x = x |
|
|
|
|
|
if not found_zero: |
|
|
new_left = best_x |
|
|
|
|
|
|
|
|
if direction == "ltr": |
|
|
if new_left < last_corrected_right_edge: |
|
|
new_left = last_corrected_right_edge |
|
|
|
|
|
if new_right <= new_left: |
|
|
new_right = new_left + 1 |
|
|
last_corrected_right_edge = new_right |
|
|
else: |
|
|
if new_right > next_corrected_left_edge: |
|
|
new_right = next_corrected_left_edge |
|
|
|
|
|
if new_left >= new_right: |
|
|
new_left = new_right - 1 |
|
|
next_corrected_left_edge = new_left |
|
|
|
|
|
box["left"] = new_left |
|
|
box["width"] = max(1, new_right - new_left) |
|
|
|
|
|
return refined_boxes |
|
|
|
|
|
def refine_words_bidirectional( |
|
|
self, |
|
|
line_data: Dict[str, List], |
|
|
line_image: np.ndarray, |
|
|
) -> Dict[str, List]: |
|
|
""" |
|
|
Refines boxes using a more robust bidirectional scan and averaging. |
|
|
Includes ADAPTIVE NOISE REMOVAL to filter specks based on font size. |
|
|
""" |
|
|
if line_image is None: |
|
|
return line_data |
|
|
|
|
|
|
|
|
if line_data and line_data.get("text"): |
|
|
words = line_data["text"][0].split() |
|
|
if len(words) <= 1: |
|
|
img_h, img_w = line_image.shape[:2] |
|
|
return self.convert_line_to_word_level(line_data, img_w, img_h) |
|
|
|
|
|
|
|
|
gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY) |
|
|
|
|
|
|
|
|
otsu_thresh_val, _ = cv2.threshold( |
|
|
gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
strict_thresh_val = otsu_thresh_val * 0.75 |
|
|
_, binary = cv2.threshold(gray, strict_thresh_val, 255, cv2.THRESH_BINARY_INV) |
|
|
|
|
|
img_h, img_w = binary.shape |
|
|
|
|
|
|
|
|
|
|
|
kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) |
|
|
binary_clean = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) |
|
|
|
|
|
|
|
|
|
|
|
num_labels, labels, stats, _ = cv2.connectedComponentsWithStats( |
|
|
binary_clean, 8, cv2.CV_32S |
|
|
) |
|
|
|
|
|
|
|
|
heights = stats[1:, cv2.CC_STAT_HEIGHT] |
|
|
|
|
|
if len(heights) > 0: |
|
|
|
|
|
|
|
|
significant_heights = heights[heights > img_h * 0.2] |
|
|
if len(significant_heights) > 0: |
|
|
median_h = np.median(significant_heights) |
|
|
else: |
|
|
median_h = np.median(heights) |
|
|
|
|
|
|
|
|
|
|
|
min_height_thresh = median_h * 0.30 |
|
|
|
|
|
clean_binary = np.zeros_like(binary) |
|
|
for i in range(1, num_labels): |
|
|
h = stats[i, cv2.CC_STAT_HEIGHT] |
|
|
w = stats[i, cv2.CC_STAT_WIDTH] |
|
|
area = stats[i, cv2.CC_STAT_AREA] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_tall_enough = h > min_height_thresh |
|
|
is_dot = ( |
|
|
(h <= min_height_thresh) and (w <= min_height_thresh) and (area > 2) |
|
|
) |
|
|
|
|
|
if is_tall_enough or is_dot: |
|
|
clean_binary[labels == i] = 255 |
|
|
|
|
|
|
|
|
vertical_projection = np.sum(clean_binary, axis=0) |
|
|
else: |
|
|
|
|
|
vertical_projection = np.sum(binary, axis=0) |
|
|
|
|
|
|
|
|
char_blobs = [] |
|
|
in_blob = False |
|
|
blob_start = 0 |
|
|
for x, col_sum in enumerate(vertical_projection): |
|
|
if col_sum > 0 and not in_blob: |
|
|
blob_start = x |
|
|
in_blob = True |
|
|
elif col_sum == 0 and in_blob: |
|
|
char_blobs.append((blob_start, x)) |
|
|
in_blob = False |
|
|
if in_blob: |
|
|
char_blobs.append((blob_start, img_w)) |
|
|
|
|
|
if not char_blobs: |
|
|
return self.convert_line_to_word_level(line_data, img_w, img_h) |
|
|
|
|
|
|
|
|
total_chars = len("".join(words)) |
|
|
if total_chars > 0: |
|
|
geom_avg_char_width = img_w / total_chars |
|
|
else: |
|
|
geom_avg_char_width = 10 |
|
|
|
|
|
blob_avg_char_width = np.mean([end - start for start, end in char_blobs]) |
|
|
safe_avg_char_width = min(blob_avg_char_width, geom_avg_char_width * 1.5) |
|
|
max_scan_distance = int(safe_avg_char_width * 2.0) |
|
|
|
|
|
|
|
|
min_safe_box_width = max(4, int(safe_avg_char_width * 0.5)) |
|
|
|
|
|
estimated_data = self.convert_line_to_word_level(line_data, img_w, img_h) |
|
|
if not estimated_data["text"]: |
|
|
return estimated_data |
|
|
|
|
|
initial_boxes = [] |
|
|
for i in range(len(estimated_data["text"])): |
|
|
initial_boxes.append( |
|
|
{ |
|
|
"text": estimated_data["text"][i], |
|
|
"left": estimated_data["left"][i], |
|
|
"top": estimated_data["top"][i], |
|
|
"width": estimated_data["width"][i], |
|
|
"height": estimated_data["height"][i], |
|
|
"conf": estimated_data["conf"][i], |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
ltr_boxes = self._run_single_pass( |
|
|
initial_boxes, vertical_projection, max_scan_distance, img_w, "ltr" |
|
|
) |
|
|
rtl_boxes = self._run_single_pass( |
|
|
initial_boxes, vertical_projection, max_scan_distance, img_w, "rtl" |
|
|
) |
|
|
|
|
|
|
|
|
combined_boxes = [box.copy() for box in initial_boxes] |
|
|
for i in range(len(combined_boxes)): |
|
|
final_left = ltr_boxes[i]["left"] |
|
|
rtl_right = rtl_boxes[i]["left"] + rtl_boxes[i]["width"] |
|
|
|
|
|
combined_boxes[i]["left"] = final_left |
|
|
combined_boxes[i]["width"] = max(min_safe_box_width, rtl_right - final_left) |
|
|
|
|
|
|
|
|
for i in range(len(combined_boxes) - 1): |
|
|
if combined_boxes[i + 1]["left"] <= combined_boxes[i]["left"]: |
|
|
combined_boxes[i + 1]["left"] = ( |
|
|
combined_boxes[i]["left"] + min_safe_box_width |
|
|
) |
|
|
|
|
|
for i in range(len(combined_boxes) - 1): |
|
|
curr = combined_boxes[i] |
|
|
nxt = combined_boxes[i + 1] |
|
|
gap_width = nxt["left"] - curr["left"] |
|
|
curr["width"] = max(min_safe_box_width, gap_width) |
|
|
|
|
|
|
|
|
final_output = {k: [] for k in estimated_data.keys()} |
|
|
for box in combined_boxes: |
|
|
if box["width"] >= min_safe_box_width: |
|
|
for key in final_output.keys(): |
|
|
final_output[key].append(box[key]) |
|
|
|
|
|
return final_output |
|
|
|