Spaces:
Running
on
Zero
Running
on
Zero
| import os | |
| from typing import Dict, List, Tuple | |
| import cv2 | |
| import numpy as np | |
| from tools.config import OUTPUT_FOLDER, SAVE_WORD_SEGMENTER_OUTPUT_IMAGES | |
| # Adaptive thresholding parameters | |
| BLOCK_SIZE_FACTOR = 1.5 # Multiplier for adaptive threshold block size | |
| C_VALUE = 2 # Constant subtracted from mean in adaptive thresholding | |
| # Word segmentation search parameters | |
| INITIAL_KERNEL_WIDTH_FACTOR = 0.0 # Starting kernel width factor for Stage 2 search | |
| INITIAL_VALLEY_THRESHOLD_FACTOR = ( | |
| 0.0 # Starting valley threshold factor for Stage 1 search | |
| ) | |
| MAIN_VALLEY_THRESHOLD_FACTOR = ( | |
| 0.15 # Primary valley threshold factor for word separation | |
| ) | |
| MIN_SPACE_FACTOR = 0.2 # Minimum space width relative to character width | |
| MATCH_TOLERANCE = 0 # Tolerance for word count matching | |
| # Noise removal parameters | |
| MIN_AREA_THRESHOLD = 6 # Minimum component area to be considered valid text | |
| DEFAULT_TRIM_PERCENTAGE = ( | |
| 0.2 # Percentage to trim from top/bottom for vertical cropping | |
| ) | |
| # Skew detection parameters | |
| MIN_SKEW_THRESHOLD = 0.5 # Ignore angles smaller than this (likely noise) | |
| MAX_SKEW_THRESHOLD = 15.0 # Angles larger than this are extreme and likely errors | |
| def _sanitize_filename(filename: str, max_length: int = 100) -> str: | |
| """ | |
| Sanitizes a string to be used as a valid filename. | |
| Removes or replaces invalid characters for Windows/Linux file systems. | |
| Args: | |
| filename: The string to sanitize | |
| max_length: Maximum length of the sanitized filename | |
| Returns: | |
| A sanitized string safe for use in file names | |
| """ | |
| if not filename: | |
| return "unnamed" | |
| # Replace spaces with underscores | |
| sanitized = filename.replace(" ", "_") | |
| # Remove or replace invalid characters for Windows/Linux | |
| # Invalid: < > : " / \ | ? * | |
| invalid_chars = '<>:"/\\|?*' | |
| for char in invalid_chars: | |
| sanitized = sanitized.replace(char, "_") | |
| # Remove control characters | |
| sanitized = "".join( | |
| char for char in sanitized if ord(char) >= 32 or char in "\n\r\t" | |
| ) | |
| # Remove leading/trailing dots and spaces (Windows doesn't allow these) | |
| sanitized = sanitized.strip(". ") | |
| # Replace multiple consecutive underscores with a single one | |
| while "__" in sanitized: | |
| sanitized = sanitized.replace("__", "_") | |
| # Truncate if too long | |
| if len(sanitized) > max_length: | |
| sanitized = sanitized[:max_length] | |
| # Ensure it's not empty after sanitization | |
| if not sanitized: | |
| sanitized = "unnamed" | |
| return sanitized | |
| class AdaptiveSegmenter: | |
| """ | |
| Line to word segmentation pipeline. It features: | |
| 1. Adaptive Thresholding. | |
| 2. Targeted Noise Removal using Connected Component Analysis. | |
| 3. The robust two-stage adaptive search (Valley -> Kernel). | |
| 4. CCA for final pixel-perfect refinement. | |
| """ | |
| def __init__(self, output_folder: str = OUTPUT_FOLDER): | |
| self.output_folder = output_folder | |
| self.fallback_segmenter = HybridWordSegmenter() | |
| def _correct_orientation( | |
| self, gray_image: np.ndarray | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Detects and corrects 90-degree orientation issues. | |
| """ | |
| h, w = gray_image.shape | |
| center = (w // 2, h // 2) | |
| block_size = 21 | |
| if h < block_size: | |
| block_size = h if h % 2 != 0 else h - 1 | |
| if block_size > 3: | |
| binary = cv2.adaptiveThreshold( | |
| gray_image, | |
| 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY_INV, | |
| block_size, | |
| 4, | |
| ) | |
| else: | |
| _, binary = cv2.threshold( | |
| gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU | |
| ) | |
| opening_kernel = np.ones((2, 2), np.uint8) | |
| binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel) | |
| coords = np.column_stack(np.where(binary > 0)) | |
| if len(coords) < 50: | |
| M_orient = cv2.getRotationMatrix2D(center, 0, 1.0) | |
| return gray_image, M_orient | |
| ymin, xmin = coords.min(axis=0) | |
| ymax, xmax = coords.max(axis=0) | |
| box_height = ymax - ymin | |
| box_width = xmax - xmin | |
| orientation_angle = 0.0 | |
| if box_height > box_width: | |
| orientation_angle = 90.0 | |
| else: | |
| M_orient = cv2.getRotationMatrix2D(center, 0, 1.0) | |
| return gray_image, M_orient | |
| M_orient = cv2.getRotationMatrix2D(center, orientation_angle, 1.0) | |
| new_w, new_h = h, w | |
| M_orient[0, 2] += (new_w - w) / 2 | |
| M_orient[1, 2] += (new_h - h) / 2 | |
| oriented_gray = cv2.warpAffine( | |
| gray_image, | |
| M_orient, | |
| (new_w, new_h), | |
| flags=cv2.INTER_CUBIC, | |
| borderMode=cv2.BORDER_REPLICATE, | |
| ) | |
| return oriented_gray, M_orient | |
| def _deskew_image(self, gray_image: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Detects skew using a robust method that normalizes minAreaRect. | |
| """ | |
| h, w = gray_image.shape | |
| block_size = 21 | |
| if h < block_size: | |
| block_size = h if h % 2 != 0 else h - 1 | |
| if block_size > 3: | |
| binary = cv2.adaptiveThreshold( | |
| gray_image, | |
| 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY_INV, | |
| block_size, | |
| 4, | |
| ) | |
| else: | |
| _, binary = cv2.threshold( | |
| gray_image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU | |
| ) | |
| opening_kernel = np.ones((2, 2), np.uint8) | |
| binary = cv2.morphologyEx(binary, cv2.MORPH_OPEN, opening_kernel) | |
| coords = np.column_stack(np.where(binary > 0)) | |
| if len(coords) < 50: | |
| M = cv2.getRotationMatrix2D((w // 2, h // 2), 0, 1.0) | |
| return gray_image, M | |
| rect = cv2.minAreaRect(coords[:, ::-1]) | |
| rect_width, rect_height = rect[1] | |
| angle = rect[2] | |
| if rect_width < rect_height: | |
| rect_width, rect_height = rect_height, rect_width | |
| angle += 90 | |
| if angle > 45: | |
| angle -= 90 | |
| elif angle < -45: | |
| angle += 90 | |
| correction_angle = angle | |
| if abs(correction_angle) < MIN_SKEW_THRESHOLD: | |
| correction_angle = 0.0 | |
| elif abs(correction_angle) > MAX_SKEW_THRESHOLD: | |
| correction_angle = 0.0 | |
| center = (w // 2, h // 2) | |
| M = cv2.getRotationMatrix2D(center, correction_angle, 1.0) | |
| deskewed_gray = cv2.warpAffine( | |
| gray_image, | |
| M, | |
| (w, h), | |
| flags=cv2.INTER_CUBIC, | |
| borderMode=cv2.BORDER_REPLICATE, | |
| ) | |
| return deskewed_gray, M | |
| def _get_boxes_from_profile( | |
| self, | |
| binary_image: np.ndarray, | |
| stable_avg_char_width: float, | |
| min_space_factor: float, | |
| valley_threshold_factor: float, | |
| ) -> List: | |
| """ | |
| Extracts word bounding boxes from vertical projection profile. | |
| """ | |
| img_h, img_w = binary_image.shape | |
| vertical_projection = np.sum(binary_image, axis=0) | |
| peaks = vertical_projection[vertical_projection > 0] | |
| if len(peaks) == 0: | |
| return [] | |
| avg_peak_height = np.mean(peaks) | |
| valley_threshold = int(avg_peak_height * valley_threshold_factor) | |
| min_space_width = int(stable_avg_char_width * min_space_factor) | |
| patched_projection = vertical_projection.copy() | |
| in_gap = False | |
| gap_start = 0 | |
| for x, col_sum in enumerate(patched_projection): | |
| if col_sum <= valley_threshold and not in_gap: | |
| in_gap = True | |
| gap_start = x | |
| elif col_sum > valley_threshold and in_gap: | |
| in_gap = False | |
| if (x - gap_start) < min_space_width: | |
| patched_projection[gap_start:x] = int(avg_peak_height) | |
| unlabeled_boxes = [] | |
| in_word = False | |
| start_x = 0 | |
| for x, col_sum in enumerate(patched_projection): | |
| if col_sum > valley_threshold and not in_word: | |
| start_x = x | |
| in_word = True | |
| elif col_sum <= valley_threshold and in_word: | |
| # [NOTE] Returns full height stripe | |
| unlabeled_boxes.append((start_x, 0, x - start_x, img_h)) | |
| in_word = False | |
| if in_word: | |
| unlabeled_boxes.append((start_x, 0, img_w - start_x, img_h)) | |
| return unlabeled_boxes | |
| def _enforce_logical_constraints( | |
| self, output: Dict[str, List], image_width: int, image_height: int | |
| ) -> Dict[str, List]: | |
| """ | |
| Enforces geometric sanity checks with 2D awareness. | |
| """ | |
| if not output or not output["text"]: | |
| return output | |
| num_items = len(output["text"]) | |
| boxes = [] | |
| for i in range(num_items): | |
| boxes.append( | |
| { | |
| "text": output["text"][i], | |
| "left": int(output["left"][i]), | |
| "top": int(output["top"][i]), | |
| "width": int(output["width"][i]), | |
| "height": int(output["height"][i]), | |
| "conf": output["conf"][i], | |
| } | |
| ) | |
| valid_boxes = [] | |
| for box in boxes: | |
| x0 = max(0, box["left"]) | |
| y0 = max(0, box["top"]) | |
| x1 = min(image_width, box["left"] + box["width"]) | |
| y1 = min(image_height, box["top"] + box["height"]) | |
| w = x1 - x0 | |
| h = y1 - y0 | |
| if w > 0 and h > 0: | |
| box["left"] = x0 | |
| box["top"] = y0 | |
| box["width"] = w | |
| box["height"] = h | |
| valid_boxes.append(box) | |
| boxes = valid_boxes | |
| is_vertical = image_height > (image_width * 1.2) | |
| if is_vertical: | |
| boxes.sort(key=lambda b: (b["top"], b["left"])) | |
| else: | |
| boxes.sort(key=lambda b: (b["left"], -b["width"])) | |
| final_pass_boxes = [] | |
| if boxes: | |
| keep_indices = [True] * len(boxes) | |
| for i in range(len(boxes)): | |
| for j in range(len(boxes)): | |
| if i == j: | |
| continue | |
| b1 = boxes[i] | |
| b2 = boxes[j] | |
| x_nested = (b1["left"] >= b2["left"] - 2) and ( | |
| b1["left"] + b1["width"] <= b2["left"] + b2["width"] + 2 | |
| ) | |
| y_nested = (b1["top"] >= b2["top"] - 2) and ( | |
| b1["top"] + b1["height"] <= b2["top"] + b2["height"] + 2 | |
| ) | |
| if x_nested and y_nested: | |
| if b1["text"] == b2["text"]: | |
| if b1["width"] * b1["height"] <= b2["width"] * b2["height"]: | |
| keep_indices[i] = False | |
| for i, keep in enumerate(keep_indices): | |
| if keep: | |
| final_pass_boxes.append(boxes[i]) | |
| boxes = final_pass_boxes | |
| if is_vertical: | |
| boxes.sort(key=lambda b: (b["top"], b["left"])) | |
| else: | |
| boxes.sort(key=lambda b: (b["left"], -b["width"])) | |
| for i in range(len(boxes)): | |
| for j in range(i + 1, len(boxes)): | |
| b1 = boxes[i] | |
| b2 = boxes[j] | |
| x_overlap = min( | |
| b1["left"] + b1["width"], b2["left"] + b2["width"] | |
| ) - max(b1["left"], b2["left"]) | |
| y_overlap = min( | |
| b1["top"] + b1["height"], b2["top"] + b2["height"] | |
| ) - max(b1["top"], b2["top"]) | |
| if x_overlap > 0 and y_overlap > 0: | |
| if is_vertical: | |
| if b1["top"] < b2["top"]: | |
| new_h = max(1, b2["top"] - b1["top"]) | |
| b1["height"] = new_h | |
| else: | |
| if b1["left"] < b2["left"]: | |
| b1_right = b1["left"] + b1["width"] | |
| b2_right = b2["left"] + b2["width"] | |
| left_slice_width = max(0, b2["left"] - b1["left"]) | |
| right_slice_width = max(0, b1_right - b2_right) | |
| if ( | |
| b1_right > b2_right | |
| and right_slice_width > left_slice_width | |
| ): | |
| b1["left"] = b2_right | |
| b1["width"] = right_slice_width | |
| else: | |
| b1["width"] = max(1, left_slice_width) | |
| cleaned_output = { | |
| k: [] for k in ["text", "left", "top", "width", "height", "conf"] | |
| } | |
| if is_vertical: | |
| boxes.sort(key=lambda b: (b["top"], b["left"])) | |
| else: | |
| boxes.sort(key=lambda b: (b["left"], -b["width"])) | |
| for box in boxes: | |
| for key in cleaned_output.keys(): | |
| cleaned_output[key].append(box[key]) | |
| return cleaned_output | |
| def _is_geometry_valid( | |
| self, | |
| boxes: List[Tuple[int, int, int, int]], | |
| words: List[str], | |
| expected_height: float = 0, | |
| ) -> bool: | |
| """ | |
| Validates if the detected boxes are physically plausible. | |
| [FIX] Improved robustness for punctuation and mixed-case text. | |
| """ | |
| if len(boxes) != len(words): | |
| return False | |
| baseline = expected_height | |
| # Use median only if provided expected height is unreliable | |
| if baseline < 5: | |
| heights = [b[3] for b in boxes] | |
| if heights: | |
| baseline = np.median(heights) | |
| if baseline < 5: | |
| return True | |
| for i, box in enumerate(boxes): | |
| word = words[i] | |
| # [FIX] Check for punctuation/symbols. They are allowed to be small. | |
| # If word is just punctuation, skip geometry checks | |
| is_punctuation = not any(c.isalnum() for c in word) | |
| if is_punctuation: | |
| continue | |
| # Standard checks for alphanumeric words | |
| num_chars = len(word) | |
| if num_chars < 1: | |
| continue | |
| width = box[2] | |
| height = box[3] | |
| # [FIX] Only reject height if it's REALLY small compared to baseline | |
| # A period might be small, but we skipped that check above. | |
| # This check ensures a real word like "The" isn't 2 pixels tall. | |
| if height < (baseline * 0.20): | |
| return False | |
| avg_char_width = width / num_chars | |
| min_expected = baseline * 0.20 | |
| # Only reject if it fails BOTH absolute (4px) and relative checks | |
| if avg_char_width < min_expected and avg_char_width < 4: | |
| # Exception: If the word is 1 char long (e.g. "I", "l", "1"), allow it to be skinny. | |
| if num_chars == 1 and avg_char_width >= 2: | |
| continue | |
| return False | |
| return True | |
| def segment( | |
| self, | |
| line_data: Dict[str, List], | |
| line_image: np.ndarray, | |
| min_space_factor=MIN_SPACE_FACTOR, | |
| match_tolerance=MATCH_TOLERANCE, | |
| image_name: str = None, | |
| ) -> Tuple[Dict[str, List], bool]: | |
| if ( | |
| line_image is None | |
| or not isinstance(line_image, np.ndarray) | |
| or line_image.size == 0 | |
| ): | |
| return ({}, False) | |
| # Allow grayscale (2 dims) or color (3 dims) | |
| if len(line_image.shape) < 2: | |
| return ({}, False) | |
| if not line_data or not line_data.get("text") or len(line_data["text"]) == 0: | |
| return ({}, False) | |
| line_text = line_data["text"][0] | |
| words = line_text.split() | |
| # Early return if 1 or fewer words | |
| if len(words) <= 1: | |
| img_h, img_w = line_image.shape[:2] | |
| one_word_result = self.fallback_segmenter.convert_line_to_word_level( | |
| line_data, img_w, img_h | |
| ) | |
| return (one_word_result, False) | |
| line_number = line_data["line"][0] | |
| safe_image_name = _sanitize_filename(image_name or "image", max_length=50) | |
| safe_line_number = _sanitize_filename(str(line_number), max_length=10) | |
| safe_shortened_line_text = _sanitize_filename(line_text, max_length=10) | |
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: | |
| os.makedirs(self.output_folder, exist_ok=True) | |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_original.png" | |
| os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) | |
| cv2.imwrite(output_path, line_image) | |
| if len(line_image.shape) == 3: | |
| gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY) | |
| else: | |
| gray = line_image.copy() | |
| # ======================================================================== | |
| # IMAGE PREPROCESSING (Deskew / Rotate) | |
| # ======================================================================== | |
| oriented_gray, M_orient = self._correct_orientation(gray) | |
| deskewed_gray, M_skew = self._deskew_image(oriented_gray) | |
| # Combine matrices: M_total = M_skew * M_orient | |
| M_orient_3x3 = np.vstack([M_orient, [0, 0, 1]]) | |
| M_skew_3x3 = np.vstack([M_skew, [0, 0, 1]]) | |
| M_total_3x3 = M_skew_3x3 @ M_orient_3x3 | |
| M = M_total_3x3[0:2, :] # Extract 2x3 affine matrix | |
| # Apply transformation to the original color image | |
| h, w = deskewed_gray.shape | |
| deskewed_line_image = cv2.warpAffine( | |
| line_image, | |
| M, | |
| (w, h), | |
| flags=cv2.INTER_CUBIC, | |
| borderMode=cv2.BORDER_REPLICATE, | |
| ) | |
| # [FIX] Create Local Line Data that matches the deskewed/rotated image dimensions. | |
| # This prevents the fallback segmenter from using vertical dimensions on a horizontal image. | |
| local_line_data = { | |
| "text": line_data["text"], | |
| "conf": line_data["conf"], | |
| "left": [0], # Local coordinate system starts at 0 | |
| "top": [0], | |
| "width": [w], # Use the ROTATED width | |
| "height": [h], # Use the ROTATED height | |
| "line": line_data.get("line", [0]), | |
| } | |
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: | |
| os.makedirs(self.output_folder, exist_ok=True) | |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_deskewed.png" | |
| cv2.imwrite(output_path, deskewed_line_image) | |
| # ======================================================================== | |
| # MAIN SEGMENTATION PIPELINE | |
| # ======================================================================== | |
| approx_char_count = len(line_data["text"][0].replace(" ", "")) | |
| if approx_char_count == 0: | |
| return {}, False | |
| img_h, img_w = deskewed_gray.shape | |
| estimated_char_height = img_h * 0.6 | |
| avg_char_width_approx = img_w / approx_char_count | |
| block_size = int(avg_char_width_approx * BLOCK_SIZE_FACTOR) | |
| if block_size % 2 == 0: | |
| block_size += 1 | |
| if block_size < 3: | |
| block_size = 3 | |
| # --- Binarization --- | |
| binary_adaptive = cv2.adaptiveThreshold( | |
| deskewed_gray, | |
| 255, | |
| cv2.ADAPTIVE_THRESH_GAUSSIAN_C, | |
| cv2.THRESH_BINARY_INV, | |
| block_size, | |
| C_VALUE, | |
| ) | |
| otsu_thresh_val, _ = cv2.threshold( | |
| deskewed_gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU | |
| ) | |
| strict_thresh_val = otsu_thresh_val * 0.75 | |
| _, binary_strict = cv2.threshold( | |
| deskewed_gray, strict_thresh_val, 255, cv2.THRESH_BINARY_INV | |
| ) | |
| binary = cv2.bitwise_and(binary_adaptive, binary_strict) | |
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: | |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_binary.png" | |
| cv2.imwrite(output_path, binary) | |
| # --- Morphological Closing --- | |
| morph_width = max(3, int(avg_char_width_approx * 0.40)) | |
| morph_height = max(2, int(avg_char_width_approx * 0.1)) | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (morph_width, morph_height)) | |
| closed_binary = cv2.morphologyEx(binary, cv2.MORPH_CLOSE, kernel, iterations=1) | |
| # --- Noise Removal --- | |
| num_labels, labels, stats, _ = cv2.connectedComponentsWithStats( | |
| closed_binary, 8, cv2.CV_32S | |
| ) | |
| clean_binary = np.zeros_like(binary) | |
| force_fallback = False | |
| significant_labels = 0 | |
| if num_labels > 1: | |
| # Only count components with area > 3 pixels | |
| significant_labels = np.sum(stats[1:, cv2.CC_STAT_AREA] > 3) | |
| if approx_char_count > 0 and significant_labels > (approx_char_count * 12): | |
| force_fallback = True | |
| if num_labels > 1: | |
| areas = stats[1:, cv2.CC_STAT_AREA] | |
| if len(areas) == 0: | |
| clean_binary = binary | |
| areas = np.array([0]) | |
| else: | |
| p1 = np.percentile(areas, 1) | |
| img_h, img_w = binary.shape | |
| estimated_char_height = img_h * 0.7 | |
| estimated_min_letter_area = max( | |
| 2, int(estimated_char_height * 0.2 * estimated_char_height * 0.15) | |
| ) | |
| area_threshold = max( | |
| MIN_AREA_THRESHOLD, min(p1, estimated_min_letter_area) | |
| ) | |
| # Gap detection logic... | |
| sorted_areas = np.sort(areas) | |
| area_diffs = np.diff(sorted_areas) | |
| if len(sorted_areas) > 10 and len(area_diffs) > 0: | |
| jump_threshold = np.percentile(area_diffs, 95) | |
| significant_jump_thresh = max(10, jump_threshold * 3) | |
| jump_indices = np.where(area_diffs > significant_jump_thresh)[0] | |
| if len(jump_indices) > 0: | |
| gap_idx = jump_indices[0] | |
| area_before_gap = sorted_areas[gap_idx] | |
| final_threshold = max(area_before_gap + 1, area_threshold) | |
| final_threshold = min(final_threshold, 15) | |
| area_threshold = final_threshold | |
| for i in range(1, num_labels): | |
| if stats[i, cv2.CC_STAT_AREA] >= area_threshold: | |
| clean_binary[labels == i] = 255 | |
| else: | |
| clean_binary = binary | |
| # --- Vertical Cropping --- | |
| horizontal_projection = np.sum(clean_binary, axis=1) | |
| y_start = 0 | |
| non_zero_rows = np.where(horizontal_projection > 0)[0] | |
| if len(non_zero_rows) > 0: | |
| p_top = int(np.percentile(non_zero_rows, 5)) | |
| p_bottom = int(np.percentile(non_zero_rows, 95)) | |
| core_height = p_bottom - p_top | |
| trim_pixels = int(core_height * 0.1) | |
| y_start = max(0, p_top + trim_pixels) | |
| y_end = min(clean_binary.shape[0], p_bottom - trim_pixels) | |
| if y_end - y_start < 5: | |
| y_start = p_top | |
| y_end = p_bottom | |
| analysis_image = clean_binary[y_start:y_end, :] | |
| else: | |
| analysis_image = clean_binary | |
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: | |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_line_number}_{safe_shortened_line_text}_clean_binary.png" | |
| cv2.imwrite(output_path, analysis_image) | |
| # --- Adaptive Search --- | |
| best_boxes = None | |
| successful_binary_image = None | |
| if not force_fallback: | |
| words = line_data["text"][0].split() | |
| target = len(words) | |
| backup_boxes_s1 = None | |
| # STAGE 1 | |
| for v_factor in np.arange(INITIAL_VALLEY_THRESHOLD_FACTOR, 0.60, 0.02): | |
| curr_boxes = self._get_boxes_from_profile( | |
| analysis_image, avg_char_width_approx, min_space_factor, v_factor | |
| ) | |
| diff = abs(target - len(curr_boxes)) | |
| is_geom_valid = self._is_geometry_valid( | |
| curr_boxes, words, estimated_char_height | |
| ) | |
| if diff == 0: | |
| if is_geom_valid: | |
| best_boxes = curr_boxes | |
| successful_binary_image = analysis_image | |
| break | |
| else: | |
| if backup_boxes_s1 is None: | |
| backup_boxes_s1 = curr_boxes | |
| if diff == 1 and backup_boxes_s1 is None and is_geom_valid: | |
| backup_boxes_s1 = curr_boxes | |
| # STAGE 2 (if needed) | |
| if best_boxes is None: | |
| backup_boxes_s2 = None | |
| for k_factor in np.arange(INITIAL_KERNEL_WIDTH_FACTOR, 0.5, 0.02): | |
| k_w = max(1, int(avg_char_width_approx * k_factor)) | |
| s2_bin = cv2.morphologyEx( | |
| clean_binary, cv2.MORPH_CLOSE, np.ones((1, k_w), np.uint8) | |
| ) | |
| s2_img = ( | |
| s2_bin[y_start:y_end, :] if len(non_zero_rows) > 0 else s2_bin | |
| ) | |
| if s2_img is None or s2_img.size == 0: | |
| continue | |
| curr_boxes = self._get_boxes_from_profile( | |
| s2_img, | |
| avg_char_width_approx, | |
| min_space_factor, | |
| MAIN_VALLEY_THRESHOLD_FACTOR, | |
| ) | |
| diff = abs(target - len(curr_boxes)) | |
| is_geom_valid = self._is_geometry_valid( | |
| curr_boxes, words, estimated_char_height | |
| ) | |
| if diff == 0 and is_geom_valid: | |
| best_boxes = curr_boxes | |
| successful_binary_image = s2_bin | |
| break | |
| if diff == 1 and backup_boxes_s2 is None and is_geom_valid: | |
| backup_boxes_s2 = curr_boxes | |
| if best_boxes is None: | |
| if backup_boxes_s1 is not None: | |
| best_boxes = backup_boxes_s1 | |
| successful_binary_image = analysis_image | |
| elif backup_boxes_s2 is not None: | |
| best_boxes = backup_boxes_s2 | |
| successful_binary_image = clean_binary | |
| final_output = None | |
| used_fallback = False | |
| if best_boxes is None: | |
| # --- FALLBACK WITH ROTATED DATA --- | |
| used_fallback = True | |
| # [FIX] Use local_line_data (rotated dims) instead of line_data (original dims) | |
| final_output = self.fallback_segmenter.refine_words_bidirectional( | |
| local_line_data, deskewed_line_image | |
| ) | |
| else: | |
| # --- CCA Refinement --- | |
| unlabeled_boxes = best_boxes | |
| if successful_binary_image is analysis_image: | |
| cca_source_image = clean_binary | |
| else: | |
| cca_source_image = successful_binary_image | |
| num_labels, _, stats, _ = cv2.connectedComponentsWithStats( | |
| cca_source_image, 8, cv2.CV_32S | |
| ) | |
| cca_img_h, cca_img_w = cca_source_image.shape[:2] | |
| component_assignments = {} | |
| num_proc = min(len(words), len(unlabeled_boxes)) | |
| min_valid_component_area = estimated_char_height * 2 | |
| for j in range(1, num_labels): | |
| comp_x = stats[j, cv2.CC_STAT_LEFT] | |
| comp_w = stats[j, cv2.CC_STAT_WIDTH] | |
| comp_area = stats[j, cv2.CC_STAT_AREA] | |
| comp_r = comp_x + comp_w | |
| comp_center_x = comp_x + comp_w / 2 | |
| comp_y = stats[j, cv2.CC_STAT_TOP] | |
| comp_h = stats[j, cv2.CC_STAT_HEIGHT] | |
| comp_center_y = comp_y + comp_h / 2 | |
| if comp_center_y < cca_img_h * 0.1 or comp_center_y > cca_img_h * 0.9: | |
| continue | |
| if comp_area < min_valid_component_area: | |
| continue | |
| best_box_idx = None | |
| max_overlap = 0 | |
| best_center_distance = float("inf") | |
| component_center_in_box = False | |
| num_to_process = min(len(words), len(unlabeled_boxes)) | |
| # Assign components to boxes... | |
| for i in range( | |
| num_to_process | |
| ): # Note: ensure num_to_process is defined | |
| box_x, box_y, box_w, box_h = unlabeled_boxes[i] | |
| box_r = box_x + box_w | |
| box_center_x = box_x + box_w / 2 | |
| if comp_w > box_w * 1.5: | |
| continue | |
| if comp_x < box_r and box_x < comp_r: | |
| overlap_start = max(comp_x, box_x) | |
| overlap_end = min(comp_r, box_r) | |
| overlap = overlap_end - overlap_start | |
| if overlap > 0: | |
| center_in_box = box_x <= comp_center_x < box_r | |
| center_distance = abs(comp_center_x - box_center_x) | |
| if center_in_box: | |
| if not component_center_in_box or overlap > max_overlap: | |
| component_center_in_box = True | |
| best_center_distance = center_distance | |
| max_overlap = overlap | |
| best_box_idx = i | |
| elif not component_center_in_box: | |
| if center_distance < best_center_distance or ( | |
| center_distance == best_center_distance | |
| and overlap > max_overlap | |
| ): | |
| best_center_distance = center_distance | |
| max_overlap = overlap | |
| best_box_idx = i | |
| if best_box_idx is not None: | |
| component_assignments[j] = best_box_idx | |
| refined_boxes_list = [] | |
| for i in range(num_proc): | |
| word_label = words[i] | |
| components_in_box = [ | |
| stats[j] for j, b in component_assignments.items() if b == i | |
| ] | |
| use_original_box = False | |
| if not components_in_box: | |
| use_original_box = True | |
| else: | |
| min_x = min(c[cv2.CC_STAT_LEFT] for c in components_in_box) | |
| min_y = min(c[cv2.CC_STAT_TOP] for c in components_in_box) | |
| max_r = max( | |
| c[cv2.CC_STAT_LEFT] + c[cv2.CC_STAT_WIDTH] | |
| for c in components_in_box | |
| ) | |
| max_b = max( | |
| c[cv2.CC_STAT_TOP] + c[cv2.CC_STAT_HEIGHT] | |
| for c in components_in_box | |
| ) | |
| cca_h = max(1, max_b - min_y) | |
| if cca_h < (estimated_char_height * 0.35): | |
| use_original_box = True | |
| if use_original_box: | |
| box_x, box_y, box_w, box_h = unlabeled_boxes[i] | |
| adjusted_box_y = y_start + box_y | |
| refined_boxes_list.append( | |
| { | |
| "text": word_label, | |
| "left": box_x, | |
| "top": adjusted_box_y, | |
| "width": box_w, | |
| "height": box_h, | |
| "conf": line_data["conf"][0], | |
| } | |
| ) | |
| else: | |
| refined_boxes_list.append( | |
| { | |
| "text": word_label, | |
| "left": min_x, | |
| "top": min_y, | |
| "width": max(1, max_r - min_x), | |
| "height": cca_h, | |
| "conf": line_data["conf"][0], | |
| } | |
| ) | |
| # Check validity | |
| cca_check_list = [ | |
| (b["left"], b["top"], b["width"], b["height"]) | |
| for b in refined_boxes_list | |
| ] | |
| if not self._is_geometry_valid( | |
| cca_check_list, words, estimated_char_height | |
| ): | |
| if abs(len(refined_boxes_list) - len(words)) > 1: | |
| best_boxes = None # Trigger fallback | |
| else: | |
| final_output = { | |
| k: [] | |
| for k in ["text", "left", "top", "width", "height", "conf"] | |
| } | |
| for box in refined_boxes_list: | |
| for key in final_output.keys(): | |
| final_output[key].append(box[key]) | |
| else: | |
| final_output = { | |
| k: [] for k in ["text", "left", "top", "width", "height", "conf"] | |
| } | |
| for box in refined_boxes_list: | |
| for key in final_output.keys(): | |
| final_output[key].append(box[key]) | |
| # --- REPEAT FALLBACK IF VALIDATION FAILED --- | |
| if best_boxes is None and not used_fallback: | |
| used_fallback = True | |
| # [FIX] Use local_line_data here too | |
| final_output = self.fallback_segmenter.refine_words_bidirectional( | |
| local_line_data, deskewed_line_image | |
| ) | |
| # ======================================================================== | |
| # COORDINATE TRANSFORMATION (Map back to Original) | |
| # ======================================================================== | |
| M_inv = cv2.invertAffineTransform(M) | |
| remapped_boxes_list = [] | |
| for i in range(len(final_output["text"])): | |
| left, top = final_output["left"][i], final_output["top"][i] | |
| width, height = final_output["width"][i], final_output["height"][i] | |
| # Map the 4 corners | |
| corners = np.array( | |
| [ | |
| [left, top], | |
| [left + width, top], | |
| [left + width, top + height], | |
| [left, top + height], | |
| ], | |
| dtype="float32", | |
| ) | |
| corners_expanded = np.expand_dims(corners, axis=1) | |
| original_corners = cv2.transform(corners_expanded, M_inv) | |
| squeezed_corners = original_corners.squeeze(axis=1) | |
| # Get axis aligned bounding box in original space | |
| min_x = int(np.min(squeezed_corners[:, 0])) | |
| max_x = int(np.max(squeezed_corners[:, 0])) | |
| min_y = int(np.min(squeezed_corners[:, 1])) | |
| max_y = int(np.max(squeezed_corners[:, 1])) | |
| remapped_boxes_list.append( | |
| { | |
| "text": final_output["text"][i], | |
| "left": min_x, | |
| "top": min_y, | |
| "width": max_x - min_x, | |
| "height": max_y - min_y, | |
| "conf": final_output["conf"][i], | |
| } | |
| ) | |
| remapped_output = {k: [] for k in final_output.keys()} | |
| for box in remapped_boxes_list: | |
| for key in remapped_output.keys(): | |
| remapped_output[key].append(box[key]) | |
| img_h, img_w = line_image.shape[:2] | |
| remapped_output = self._enforce_logical_constraints( | |
| remapped_output, img_w, img_h | |
| ) | |
| # ======================================================================== | |
| # FINAL SAFETY NET | |
| # ======================================================================== | |
| words = line_data["text"][0].split() | |
| target_count = len(words) | |
| current_count = len(remapped_output["text"]) | |
| has_collapsed_boxes = any(w < 3 for w in remapped_output["width"]) | |
| if current_count > 0: | |
| total_text_len = sum(len(t) for t in remapped_output["text"]) | |
| total_box_width = sum(remapped_output["width"]) | |
| avg_width_pixels = total_box_width / max(1, total_text_len) | |
| else: | |
| avg_width_pixels = 0 | |
| is_suspiciously_thin = avg_width_pixels < 4 | |
| if current_count != target_count or is_suspiciously_thin or has_collapsed_boxes: | |
| used_fallback = True | |
| # [FIX] Do NOT use original line_image/line_data here. | |
| # Use the local_line_data + deskewed_line_image pipeline, | |
| # then transform back using M_inv (same as above). | |
| # 1. Run fallback on rotated data | |
| temp_local_output = self.fallback_segmenter.refine_words_bidirectional( | |
| local_line_data, deskewed_line_image | |
| ) | |
| # 2. If bidirectional failed to split correctly, use purely mathematical split on rotated data | |
| if len(temp_local_output["text"]) != target_count: | |
| h, w = deskewed_line_image.shape[:2] | |
| temp_local_output = self.fallback_segmenter.convert_line_to_word_level( | |
| local_line_data, w, h | |
| ) | |
| # 3. Transform the result back to original coordinates (M_inv) | |
| # (Repeating the transformation logic for the safety net result) | |
| remapped_boxes_list = [] | |
| for i in range(len(temp_local_output["text"])): | |
| left, top = temp_local_output["left"][i], temp_local_output["top"][i] | |
| width, height = ( | |
| temp_local_output["width"][i], | |
| temp_local_output["height"][i], | |
| ) | |
| corners = np.array( | |
| [ | |
| [left, top], | |
| [left + width, top], | |
| [left + width, top + height], | |
| [left, top + height], | |
| ], | |
| dtype="float32", | |
| ) | |
| corners_expanded = np.expand_dims(corners, axis=1) | |
| original_corners = cv2.transform(corners_expanded, M_inv) | |
| squeezed_corners = original_corners.squeeze(axis=1) | |
| min_x = int(np.min(squeezed_corners[:, 0])) | |
| max_x = int(np.max(squeezed_corners[:, 0])) | |
| min_y = int(np.min(squeezed_corners[:, 1])) | |
| max_y = int(np.max(squeezed_corners[:, 1])) | |
| remapped_boxes_list.append( | |
| { | |
| "text": temp_local_output["text"][i], | |
| "left": min_x, | |
| "top": min_y, | |
| "width": max_x - min_x, | |
| "height": max_y - min_y, | |
| "conf": temp_local_output["conf"][i], | |
| } | |
| ) | |
| remapped_output = {k: [] for k in temp_local_output.keys()} | |
| for box in remapped_boxes_list: | |
| for key in remapped_output.keys(): | |
| remapped_output[key].append(box[key]) | |
| if SAVE_WORD_SEGMENTER_OUTPUT_IMAGES: | |
| output_path = f"{self.output_folder}/word_segmentation/{safe_image_name}_{safe_shortened_line_text}_final_boxes.png" | |
| os.makedirs(f"{self.output_folder}/word_segmentation", exist_ok=True) | |
| output_image_vis = line_image.copy() | |
| for i in range(len(remapped_output["text"])): | |
| x, y, w, h = ( | |
| int(remapped_output["left"][i]), | |
| int(remapped_output["top"][i]), | |
| int(remapped_output["width"][i]), | |
| int(remapped_output["height"][i]), | |
| ) | |
| cv2.rectangle(output_image_vis, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
| cv2.imwrite(output_path, output_image_vis) | |
| return remapped_output, used_fallback | |
| class HybridWordSegmenter: | |
| """ | |
| Implements a two-step approach for word segmentation: | |
| 1. Proportional estimation based on text. | |
| 2. Image-based refinement with a "Bounded Scan" to prevent | |
| over-correction. | |
| """ | |
| def convert_line_to_word_level( | |
| self, line_data: Dict[str, List], image_width: int, image_height: int | |
| ) -> Dict[str, List]: | |
| """ | |
| Step 1: Converts line-level OCR results to word-level by using a | |
| robust proportional estimation method. | |
| Guarantees output box count equals input word count. | |
| """ | |
| output = { | |
| "text": list(), | |
| "left": list(), | |
| "top": list(), | |
| "width": list(), | |
| "height": list(), | |
| "conf": list(), | |
| } | |
| if not line_data or not line_data.get("text"): | |
| return output | |
| i = 0 # Assuming a single line | |
| line_text = line_data["text"][i] | |
| line_left = float(line_data["left"][i]) | |
| line_top = float(line_data["top"][i]) | |
| line_width = float(line_data["width"][i]) | |
| line_height = float(line_data["height"][i]) | |
| line_conf = line_data["conf"][i] | |
| if not line_text.strip(): | |
| return output | |
| words = line_text.split() | |
| if not words: | |
| return output | |
| num_chars = len("".join(words)) | |
| num_spaces = len(words) - 1 | |
| if num_chars == 0: | |
| return output | |
| if (num_chars * 2 + num_spaces) > 0: | |
| char_space_ratio = 2.0 | |
| estimated_space_width = line_width / ( | |
| num_chars * char_space_ratio + num_spaces | |
| ) | |
| avg_char_width = estimated_space_width * char_space_ratio | |
| else: | |
| avg_char_width = line_width / (num_chars if num_chars > 0 else 1) | |
| estimated_space_width = avg_char_width | |
| # [SAFETY CHECK] Ensure we never estimate a character width of ~0 | |
| avg_char_width = max(3.0, avg_char_width) | |
| min_word_width = max(5.0, avg_char_width * 0.5) | |
| current_left = line_left | |
| for word in words: | |
| raw_word_width = len(word) * avg_char_width | |
| # Force the box to have a legible size | |
| word_width = max(min_word_width, raw_word_width) | |
| clamped_left = max(0, min(current_left, image_width)) | |
| # We do NOT clamp the width against image_width here because that | |
| # causes the "0 width" bug if current_left is at the edge. | |
| # It is better to have a box go off-screen than be 0-width. | |
| output["text"].append(word) | |
| output["left"].append(clamped_left) | |
| output["top"].append(line_top) | |
| output["width"].append(word_width) | |
| output["height"].append(line_height) | |
| output["conf"].append(line_conf) | |
| current_left += word_width + estimated_space_width | |
| return output | |
| def _run_single_pass( | |
| self, | |
| initial_boxes: List[Dict], | |
| vertical_projection: np.ndarray, | |
| max_scan_distance: int, | |
| img_w: int, | |
| direction: str = "ltr", | |
| ) -> List[Dict]: | |
| """ | |
| Helper function to run one pass of refinement. | |
| IMPROVED: Uses local minima detection for cursive script where | |
| perfect zero-gaps (white space) might not exist. | |
| """ | |
| refined_boxes = [box.copy() for box in initial_boxes] | |
| if direction == "ltr": | |
| last_corrected_right_edge = 0 | |
| indices = range(len(refined_boxes)) | |
| else: # rtl | |
| next_corrected_left_edge = img_w | |
| indices = range(len(refined_boxes) - 1, -1, -1) | |
| for i in indices: | |
| box = refined_boxes[i] | |
| left = int(box["left"]) | |
| right = int(box["left"] + box["width"]) | |
| left = max(0, min(left, img_w - 1)) | |
| right = max(0, min(right, img_w - 1)) | |
| new_left, new_right = left, right | |
| # --- Boundary search with improved gap detection --- | |
| # Priority 1: True gap (zero projection) | |
| # Priority 2: Valley with lowest ink density (thinnest connection) | |
| if direction == "ltr" or direction == "both": # Scan right logic | |
| if right < img_w: | |
| scan_limit = min(img_w, right + max_scan_distance) | |
| search_range = range(right, scan_limit) | |
| best_x = right | |
| min_density = float("inf") | |
| found_zero = False | |
| # Look for the best cut in the window | |
| for x in search_range: | |
| density = vertical_projection[x] | |
| if density == 0: | |
| new_right = x | |
| found_zero = True | |
| break | |
| if density < min_density: | |
| min_density = density | |
| best_x = x | |
| if not found_zero: | |
| # No clear gap found, cut at thinnest point (minimum density) | |
| new_right = best_x | |
| if direction == "rtl" or direction == "both": # Scan left logic | |
| if left > 0: | |
| scan_limit = max(0, left - max_scan_distance) | |
| search_range = range(left, scan_limit, -1) | |
| best_x = left | |
| min_density = float("inf") | |
| found_zero = False | |
| for x in search_range: | |
| density = vertical_projection[x] | |
| if density == 0: | |
| new_left = x | |
| found_zero = True | |
| break | |
| if density < min_density: | |
| min_density = density | |
| best_x = x | |
| if not found_zero: | |
| new_left = best_x | |
| # --- Directional de-overlapping (strict stitching) --- | |
| if direction == "ltr": | |
| if new_left < last_corrected_right_edge: | |
| new_left = last_corrected_right_edge | |
| # Ensure valid width | |
| if new_right <= new_left: | |
| new_right = new_left + 1 | |
| last_corrected_right_edge = new_right | |
| else: # rtl | |
| if new_right > next_corrected_left_edge: | |
| new_right = next_corrected_left_edge | |
| # Ensure valid width | |
| if new_left >= new_right: | |
| new_left = new_right - 1 | |
| next_corrected_left_edge = new_left | |
| box["left"] = new_left | |
| box["width"] = max(1, new_right - new_left) | |
| return refined_boxes | |
| def refine_words_bidirectional( | |
| self, | |
| line_data: Dict[str, List], | |
| line_image: np.ndarray, | |
| ) -> Dict[str, List]: | |
| """ | |
| Refines boxes using a more robust bidirectional scan and averaging. | |
| Includes ADAPTIVE NOISE REMOVAL to filter specks based on font size. | |
| """ | |
| if line_image is None: | |
| return line_data | |
| # Early return if 1 or fewer words | |
| if line_data and line_data.get("text"): | |
| words = line_data["text"][0].split() | |
| if len(words) <= 1: | |
| img_h, img_w = line_image.shape[:2] | |
| return self.convert_line_to_word_level(line_data, img_w, img_h) | |
| # --- PRE-PROCESSING: Stricter Binarization --- | |
| gray = cv2.cvtColor(line_image, cv2.COLOR_BGR2GRAY) | |
| # 1. Calculate standard Otsu threshold first | |
| otsu_thresh_val, _ = cv2.threshold( | |
| gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU | |
| ) | |
| # 2. Apply "Strictness Factor" to remove dark noise | |
| # 0.75 means "Only keep pixels that are in the darkest 75% of what Otsu thought was foreground" | |
| # This effectively filters out light-gray noise shadows. | |
| strict_thresh_val = otsu_thresh_val * 0.75 | |
| _, binary = cv2.threshold(gray, strict_thresh_val, 255, cv2.THRESH_BINARY_INV) | |
| img_h, img_w = binary.shape | |
| # [NEW STEP 1] Morphological Opening | |
| # Physically erodes small protrusions and dust (2x2 pixels or smaller) | |
| kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) | |
| binary_clean = cv2.morphologyEx(binary, cv2.MORPH_OPEN, kernel) | |
| # [NEW STEP 2] Adaptive Component Filtering | |
| # Instead of hardcoded pixels, we filter relative to the line's text size. | |
| num_labels, labels, stats, _ = cv2.connectedComponentsWithStats( | |
| binary_clean, 8, cv2.CV_32S | |
| ) | |
| # Get heights of all components (excluding background) | |
| heights = stats[1:, cv2.CC_STAT_HEIGHT] | |
| if len(heights) > 0: | |
| # Calculate Median Height of "significant" parts (ignore tiny noise for the median calculation) | |
| # We assume valid text is at least 20% of the image height | |
| significant_heights = heights[heights > img_h * 0.2] | |
| if len(significant_heights) > 0: | |
| median_h = np.median(significant_heights) | |
| else: | |
| median_h = np.median(heights) | |
| # Define Thresholds based on Text Size | |
| # 1. Main Threshold: Keep parts taller than 30% of median letter height | |
| min_height_thresh = median_h * 0.30 | |
| clean_binary = np.zeros_like(binary) | |
| for i in range(1, num_labels): | |
| h = stats[i, cv2.CC_STAT_HEIGHT] | |
| w = stats[i, cv2.CC_STAT_WIDTH] | |
| area = stats[i, cv2.CC_STAT_AREA] | |
| # Logic: Keep the component IF: | |
| # A. It is tall enough to be a letter part (h > threshold) | |
| # B. OR it is a "Dot" (Period / i-dot): | |
| # - Height is small (< threshold) | |
| # - Width is ALSO small (roughly square, prevents flat dash/scratch noise) | |
| # - Area is reasonable (> 2px) | |
| is_tall_enough = h > min_height_thresh | |
| is_dot = ( | |
| (h <= min_height_thresh) and (w <= min_height_thresh) and (area > 2) | |
| ) | |
| if is_tall_enough or is_dot: | |
| clean_binary[labels == i] = 255 | |
| # Use the adaptively cleaned image for projection | |
| vertical_projection = np.sum(clean_binary, axis=0) | |
| else: | |
| # Fallback if no components found (unlikely) | |
| vertical_projection = np.sum(binary, axis=0) | |
| # --- Rest of logic remains the same --- | |
| char_blobs = [] | |
| in_blob = False | |
| blob_start = 0 | |
| for x, col_sum in enumerate(vertical_projection): | |
| if col_sum > 0 and not in_blob: | |
| blob_start = x | |
| in_blob = True | |
| elif col_sum == 0 and in_blob: | |
| char_blobs.append((blob_start, x)) | |
| in_blob = False | |
| if in_blob: | |
| char_blobs.append((blob_start, img_w)) | |
| if not char_blobs: | |
| return self.convert_line_to_word_level(line_data, img_w, img_h) | |
| # [PREVIOUS FIX] Bounded Scan Distance | |
| total_chars = len("".join(words)) | |
| if total_chars > 0: | |
| geom_avg_char_width = img_w / total_chars | |
| else: | |
| geom_avg_char_width = 10 | |
| blob_avg_char_width = np.mean([end - start for start, end in char_blobs]) | |
| safe_avg_char_width = min(blob_avg_char_width, geom_avg_char_width * 1.5) | |
| max_scan_distance = int(safe_avg_char_width * 2.0) | |
| # [PREVIOUS FIX] Safety Floor | |
| min_safe_box_width = max(4, int(safe_avg_char_width * 0.5)) | |
| estimated_data = self.convert_line_to_word_level(line_data, img_w, img_h) | |
| if not estimated_data["text"]: | |
| return estimated_data | |
| initial_boxes = [] | |
| for i in range(len(estimated_data["text"])): | |
| initial_boxes.append( | |
| { | |
| "text": estimated_data["text"][i], | |
| "left": estimated_data["left"][i], | |
| "top": estimated_data["top"][i], | |
| "width": estimated_data["width"][i], | |
| "height": estimated_data["height"][i], | |
| "conf": estimated_data["conf"][i], | |
| } | |
| ) | |
| # --- STEP 1 & 2: Perform bidirectional refinement passes --- | |
| ltr_boxes = self._run_single_pass( | |
| initial_boxes, vertical_projection, max_scan_distance, img_w, "ltr" | |
| ) | |
| rtl_boxes = self._run_single_pass( | |
| initial_boxes, vertical_projection, max_scan_distance, img_w, "rtl" | |
| ) | |
| # --- STEP 3: Combine results using best edge from each pass --- | |
| combined_boxes = [box.copy() for box in initial_boxes] | |
| for i in range(len(combined_boxes)): | |
| final_left = ltr_boxes[i]["left"] | |
| rtl_right = rtl_boxes[i]["left"] + rtl_boxes[i]["width"] | |
| combined_boxes[i]["left"] = final_left | |
| combined_boxes[i]["width"] = max(min_safe_box_width, rtl_right - final_left) | |
| # --- STEP 4: Contiguous stitching to eliminate gaps --- | |
| for i in range(len(combined_boxes) - 1): | |
| if combined_boxes[i + 1]["left"] <= combined_boxes[i]["left"]: | |
| combined_boxes[i + 1]["left"] = ( | |
| combined_boxes[i]["left"] + min_safe_box_width | |
| ) | |
| for i in range(len(combined_boxes) - 1): | |
| curr = combined_boxes[i] | |
| nxt = combined_boxes[i + 1] | |
| gap_width = nxt["left"] - curr["left"] | |
| curr["width"] = max(min_safe_box_width, gap_width) | |
| # Convert back to output dict | |
| final_output = {k: [] for k in estimated_data.keys()} | |
| for box in combined_boxes: | |
| if box["width"] >= min_safe_box_width: | |
| for key in final_output.keys(): | |
| final_output[key].append(box[key]) | |
| return final_output | |