|
|
import os |
|
|
import uuid |
|
|
from datetime import datetime, timedelta, timezone |
|
|
from typing import Dict, List, Tuple |
|
|
from xml.etree.ElementTree import Element, SubElement, tostring |
|
|
|
|
|
import defusedxml |
|
|
import defusedxml.ElementTree as defused_etree |
|
|
import defusedxml.minidom as defused_minidom |
|
|
|
|
|
|
|
|
defusedxml.defuse_stdlib() |
|
|
|
|
|
import gradio as gr |
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
import pymupdf |
|
|
from gradio_image_annotation import image_annotator |
|
|
from gradio_image_annotation.image_annotator import AnnotatedImageData |
|
|
from PIL import Image, ImageDraw |
|
|
from pymupdf import Document, Rect |
|
|
|
|
|
from tools.config import ( |
|
|
COMPRESS_REDACTED_PDF, |
|
|
CUSTOM_BOX_COLOUR, |
|
|
INPUT_FOLDER, |
|
|
MAX_IMAGE_PIXELS, |
|
|
OUTPUT_FOLDER, |
|
|
RETURN_PDF_FOR_REVIEW, |
|
|
) |
|
|
from tools.file_conversion import ( |
|
|
convert_annotation_data_to_dataframe, |
|
|
convert_annotation_json_to_review_df, |
|
|
convert_review_df_to_annotation_json, |
|
|
divide_coordinates_by_page_sizes, |
|
|
fill_missing_ids, |
|
|
is_pdf, |
|
|
multiply_coordinates_by_page_sizes, |
|
|
process_single_page_for_image_conversion, |
|
|
remove_duplicate_images_with_blank_boxes, |
|
|
save_pdf_with_or_without_compression, |
|
|
) |
|
|
from tools.file_redaction import redact_page_with_pymupdf |
|
|
from tools.helper_functions import ( |
|
|
_generate_unique_ids, |
|
|
detect_file_type, |
|
|
get_file_name_without_type, |
|
|
) |
|
|
from tools.secure_path_utils import ( |
|
|
secure_file_write, |
|
|
) |
|
|
|
|
|
if not MAX_IMAGE_PIXELS: |
|
|
Image.MAX_IMAGE_PIXELS = None |
|
|
|
|
|
|
|
|
def decrease_page(number: int, all_annotations: dict): |
|
|
""" |
|
|
Decrease page number for review redactions page. |
|
|
""" |
|
|
if not all_annotations: |
|
|
raise Warning("No annotator object loaded") |
|
|
|
|
|
if number > 1: |
|
|
return number - 1, number - 1 |
|
|
elif number <= 1: |
|
|
|
|
|
raise Warning("At first page") |
|
|
else: |
|
|
raise Warning("At first page") |
|
|
|
|
|
|
|
|
def increase_page(number: int, all_annotations: dict): |
|
|
""" |
|
|
Increase page number for review redactions page. |
|
|
""" |
|
|
|
|
|
if not all_annotations: |
|
|
raise Warning("No annotator object loaded") |
|
|
|
|
|
|
|
|
max_pages = len(all_annotations) |
|
|
|
|
|
if number < max_pages: |
|
|
return number + 1, number + 1 |
|
|
|
|
|
|
|
|
else: |
|
|
raise Warning("At last page") |
|
|
|
|
|
|
|
|
def update_zoom( |
|
|
current_zoom_level: int, annotate_current_page: int, decrease: bool = True |
|
|
): |
|
|
if decrease is False: |
|
|
if current_zoom_level >= 70: |
|
|
current_zoom_level -= 10 |
|
|
else: |
|
|
if current_zoom_level < 110: |
|
|
current_zoom_level += 10 |
|
|
|
|
|
return current_zoom_level, annotate_current_page |
|
|
|
|
|
|
|
|
def update_dropdown_list_based_on_dataframe( |
|
|
df: pd.DataFrame, column: str |
|
|
) -> List["str"]: |
|
|
""" |
|
|
Gather unique elements from a string pandas Series, then append 'ALL' to the start and return the list. |
|
|
""" |
|
|
if isinstance(df, pd.DataFrame): |
|
|
|
|
|
if column not in df.columns or df[column].empty or df[column].isna().all(): |
|
|
return ["ALL"] |
|
|
elif column != "page": |
|
|
entities = df[column].astype(str).unique().tolist() |
|
|
entities_for_drop = sorted(entities) |
|
|
entities_for_drop.insert(0, "ALL") |
|
|
else: |
|
|
|
|
|
try: |
|
|
entities = df[column].astype(int).unique() |
|
|
entities_for_drop = sorted(entities) |
|
|
entities_for_drop = [ |
|
|
str(e) for e in entities_for_drop |
|
|
] |
|
|
entities_for_drop.insert(0, "ALL") |
|
|
except ValueError: |
|
|
return ["ALL"] |
|
|
|
|
|
return entities_for_drop |
|
|
else: |
|
|
return ["ALL"] |
|
|
|
|
|
|
|
|
def get_filtered_recogniser_dataframe_and_dropdowns( |
|
|
page_image_annotator_object: AnnotatedImageData, |
|
|
recogniser_dataframe_base: pd.DataFrame, |
|
|
recogniser_dropdown_value: str, |
|
|
text_dropdown_value: str, |
|
|
page_dropdown_value: str, |
|
|
review_df: pd.DataFrame = list(), |
|
|
page_sizes: List[str] = list(), |
|
|
): |
|
|
""" |
|
|
Create a filtered recogniser dataframe and associated dropdowns based on current information in the image annotator and review data frame. |
|
|
""" |
|
|
|
|
|
recogniser_entities_list = ["Redaction"] |
|
|
recogniser_dataframe_out = recogniser_dataframe_base |
|
|
recogniser_dataframe_out_gr = gr.Dataframe() |
|
|
review_dataframe = review_df |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
review_dataframe = convert_annotation_json_to_review_df( |
|
|
page_image_annotator_object, review_df, page_sizes |
|
|
) |
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
review_dataframe, "label" |
|
|
) |
|
|
recogniser_entities_drop = gr.Dropdown( |
|
|
value=recogniser_dropdown_value, |
|
|
choices=recogniser_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
|
|
|
recogniser_entities_list = [ |
|
|
entity |
|
|
for entity in recogniser_entities_for_drop.copy() |
|
|
if entity != "Redaction" and entity != "ALL" |
|
|
] |
|
|
recogniser_entities_list.insert( |
|
|
0, "Redaction" |
|
|
) |
|
|
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
review_dataframe, "text" |
|
|
) |
|
|
text_entities_drop = gr.Dropdown( |
|
|
value=text_dropdown_value, |
|
|
choices=text_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
review_dataframe, "page" |
|
|
) |
|
|
page_entities_drop = gr.Dropdown( |
|
|
value=page_dropdown_value, |
|
|
choices=page_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
recogniser_dataframe_out_gr = gr.Dataframe( |
|
|
review_dataframe[["page", "label", "text", "id"]], |
|
|
show_search="filter", |
|
|
type="pandas", |
|
|
headers=["page", "label", "text", "id"], |
|
|
wrap=True, |
|
|
max_height=400, |
|
|
) |
|
|
|
|
|
recogniser_dataframe_out = review_dataframe[["page", "label", "text", "id"]] |
|
|
|
|
|
except Exception as e: |
|
|
print("Could not extract recogniser information:", e) |
|
|
recogniser_dataframe_out = recogniser_dataframe_base[ |
|
|
["page", "label", "text", "id"] |
|
|
] |
|
|
|
|
|
label_choices = review_dataframe["label"].astype(str).unique().tolist() |
|
|
text_choices = review_dataframe["text"].astype(str).unique().tolist() |
|
|
page_choices = review_dataframe["page"].astype(str).unique().tolist() |
|
|
|
|
|
recogniser_entities_drop = gr.Dropdown( |
|
|
value=recogniser_dropdown_value, |
|
|
choices=label_choices, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
recogniser_entities_list = ["Redaction"] |
|
|
text_entities_drop = gr.Dropdown( |
|
|
value=text_dropdown_value, |
|
|
choices=text_choices, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
page_entities_drop = gr.Dropdown( |
|
|
value=page_dropdown_value, |
|
|
choices=page_choices, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
return ( |
|
|
recogniser_dataframe_out_gr, |
|
|
recogniser_dataframe_out, |
|
|
recogniser_entities_drop, |
|
|
recogniser_entities_list, |
|
|
text_entities_drop, |
|
|
page_entities_drop, |
|
|
) |
|
|
|
|
|
|
|
|
def update_recogniser_dataframes( |
|
|
page_image_annotator_object: AnnotatedImageData, |
|
|
recogniser_dataframe_base: pd.DataFrame, |
|
|
recogniser_entities_dropdown_value: str = "ALL", |
|
|
text_dropdown_value: str = "ALL", |
|
|
page_dropdown_value: str = "ALL", |
|
|
review_df: pd.DataFrame = list(), |
|
|
page_sizes: list[str] = list(), |
|
|
): |
|
|
""" |
|
|
Update recogniser dataframe information that appears alongside the pdf pages on the review screen. |
|
|
""" |
|
|
recogniser_entities_list = ["Redaction"] |
|
|
recogniser_dataframe_out = pd.DataFrame() |
|
|
recogniser_dataframe_out_gr = gr.Dataframe() |
|
|
|
|
|
|
|
|
if recogniser_dataframe_base.empty: |
|
|
( |
|
|
recogniser_dataframe_out_gr, |
|
|
recogniser_dataframe_out, |
|
|
recogniser_entities_drop, |
|
|
recogniser_entities_list, |
|
|
text_entities_drop, |
|
|
page_entities_drop, |
|
|
) = get_filtered_recogniser_dataframe_and_dropdowns( |
|
|
page_image_annotator_object, |
|
|
recogniser_dataframe_base, |
|
|
recogniser_entities_dropdown_value, |
|
|
text_dropdown_value, |
|
|
page_dropdown_value, |
|
|
review_df, |
|
|
page_sizes, |
|
|
) |
|
|
elif recogniser_dataframe_base.iloc[0, 0] == "": |
|
|
( |
|
|
recogniser_dataframe_out_gr, |
|
|
recogniser_dataframe_out, |
|
|
recogniser_entities_dropdown_value, |
|
|
recogniser_entities_list, |
|
|
text_entities_drop, |
|
|
page_entities_drop, |
|
|
) = get_filtered_recogniser_dataframe_and_dropdowns( |
|
|
page_image_annotator_object, |
|
|
recogniser_dataframe_base, |
|
|
recogniser_entities_dropdown_value, |
|
|
text_dropdown_value, |
|
|
page_dropdown_value, |
|
|
review_df, |
|
|
page_sizes, |
|
|
) |
|
|
else: |
|
|
( |
|
|
recogniser_dataframe_out_gr, |
|
|
recogniser_dataframe_out, |
|
|
recogniser_entities_dropdown, |
|
|
recogniser_entities_list, |
|
|
text_dropdown, |
|
|
page_dropdown, |
|
|
) = get_filtered_recogniser_dataframe_and_dropdowns( |
|
|
page_image_annotator_object, |
|
|
recogniser_dataframe_base, |
|
|
recogniser_entities_dropdown_value, |
|
|
text_dropdown_value, |
|
|
page_dropdown_value, |
|
|
review_df, |
|
|
page_sizes, |
|
|
) |
|
|
|
|
|
review_dataframe, text_entities_drop, page_entities_drop = ( |
|
|
update_entities_df_recogniser_entities( |
|
|
recogniser_entities_dropdown_value, |
|
|
recogniser_dataframe_out, |
|
|
page_dropdown_value, |
|
|
text_dropdown_value, |
|
|
) |
|
|
) |
|
|
|
|
|
recogniser_dataframe_out_gr = gr.Dataframe( |
|
|
review_dataframe[["page", "label", "text", "id"]], |
|
|
show_search="filter", |
|
|
type="pandas", |
|
|
headers=["page", "label", "text", "id"], |
|
|
wrap=True, |
|
|
max_height=400, |
|
|
) |
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
recogniser_dataframe_out, "label" |
|
|
) |
|
|
recogniser_entities_drop = gr.Dropdown( |
|
|
value=recogniser_entities_dropdown_value, |
|
|
choices=recogniser_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
recogniser_entities_list_base = ( |
|
|
recogniser_dataframe_out["label"].astype(str).unique().tolist() |
|
|
) |
|
|
|
|
|
|
|
|
recogniser_entities_list = [ |
|
|
entity for entity in recogniser_entities_list_base if entity != "Redaction" |
|
|
] |
|
|
recogniser_entities_list.insert(0, "Redaction") |
|
|
|
|
|
return ( |
|
|
recogniser_entities_list, |
|
|
recogniser_dataframe_out_gr, |
|
|
recogniser_dataframe_out, |
|
|
recogniser_entities_drop, |
|
|
text_entities_drop, |
|
|
page_entities_drop, |
|
|
) |
|
|
|
|
|
|
|
|
def undo_last_removal( |
|
|
backup_review_state: pd.DataFrame, |
|
|
backup_image_annotations_state: list[dict], |
|
|
backup_recogniser_entity_dataframe_base: pd.DataFrame, |
|
|
): |
|
|
|
|
|
if backup_image_annotations_state: |
|
|
return ( |
|
|
backup_review_state, |
|
|
backup_image_annotations_state, |
|
|
backup_recogniser_entity_dataframe_base, |
|
|
) |
|
|
else: |
|
|
raise Warning("No actions have been taken to undo") |
|
|
|
|
|
|
|
|
def update_annotator_page_from_review_df( |
|
|
review_df: pd.DataFrame, |
|
|
image_file_paths: List[ |
|
|
str |
|
|
], |
|
|
page_sizes: List[dict], |
|
|
current_image_annotations_state: List[ |
|
|
str |
|
|
], |
|
|
current_page_annotator: object, |
|
|
selected_recogniser_entity_df_row: pd.DataFrame, |
|
|
input_folder: str, |
|
|
doc_full_file_name_textbox: str, |
|
|
) -> Tuple[ |
|
|
object, List[dict], int, List[dict], pd.DataFrame, int |
|
|
]: |
|
|
""" |
|
|
Update the visible annotation object and related objects with the latest review file information, |
|
|
optimising by processing only the current page's data. |
|
|
""" |
|
|
|
|
|
out_image_annotations_state: List[dict] = list( |
|
|
current_image_annotations_state |
|
|
) |
|
|
out_current_page_annotator: dict = current_page_annotator |
|
|
|
|
|
|
|
|
|
|
|
gradio_annotator_current_page_number: int = 1 |
|
|
annotate_previous_page: int = ( |
|
|
0 |
|
|
) |
|
|
|
|
|
if ( |
|
|
not selected_recogniser_entity_df_row.empty |
|
|
and "page" in selected_recogniser_entity_df_row.columns |
|
|
): |
|
|
try: |
|
|
selected_page = selected_recogniser_entity_df_row["page"].iloc[0] |
|
|
gradio_annotator_current_page_number = int(selected_page) |
|
|
annotate_previous_page = ( |
|
|
gradio_annotator_current_page_number |
|
|
) |
|
|
except (IndexError, ValueError, TypeError): |
|
|
print( |
|
|
"Warning: Could not extract valid page number from selected_recogniser_entity_df_row. Defaulting to page 1." |
|
|
) |
|
|
gradio_annotator_current_page_number = ( |
|
|
1 |
|
|
) |
|
|
|
|
|
|
|
|
if gradio_annotator_current_page_number <= 0: |
|
|
gradio_annotator_current_page_number = 1 |
|
|
|
|
|
page_max_reported = len(page_sizes) |
|
|
if gradio_annotator_current_page_number > page_max_reported: |
|
|
print("current page is greater than highest page:", page_max_reported) |
|
|
gradio_annotator_current_page_number = page_max_reported |
|
|
|
|
|
page_num_reported_zero_indexed = gradio_annotator_current_page_number - 1 |
|
|
|
|
|
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
|
if not page_sizes_df.empty: |
|
|
|
|
|
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
|
|
page_sizes_df.dropna(subset=["page"], inplace=True) |
|
|
if not page_sizes_df.empty: |
|
|
page_sizes_df["page"] = page_sizes_df["page"].astype(int) |
|
|
else: |
|
|
print("Warning: Page sizes DataFrame became empty after processing.") |
|
|
|
|
|
if not review_df.empty: |
|
|
|
|
|
|
|
|
if "page" in review_df.columns: |
|
|
review_df["page"] = ( |
|
|
pd.to_numeric(review_df["page"], errors="coerce").fillna(-1).astype(int) |
|
|
) |
|
|
|
|
|
current_image_path = out_image_annotations_state[ |
|
|
page_num_reported_zero_indexed |
|
|
]["image"] |
|
|
|
|
|
replaced_image_path, page_sizes_df = ( |
|
|
replace_placeholder_image_with_real_image( |
|
|
doc_full_file_name_textbox, |
|
|
current_image_path, |
|
|
page_sizes_df, |
|
|
gradio_annotator_current_page_number, |
|
|
input_folder, |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
page_sizes = page_sizes_df.to_dict(orient="records") |
|
|
review_df.loc[ |
|
|
review_df["page"] == gradio_annotator_current_page_number, "image" |
|
|
] = replaced_image_path |
|
|
images_list = list(page_sizes_df["image_path"]) |
|
|
images_list[page_num_reported_zero_indexed] = replaced_image_path |
|
|
out_image_annotations_state[page_num_reported_zero_indexed][ |
|
|
"image" |
|
|
] = replaced_image_path |
|
|
|
|
|
current_page_review_df = review_df[ |
|
|
review_df["page"] == gradio_annotator_current_page_number |
|
|
].copy() |
|
|
current_page_review_df = multiply_coordinates_by_page_sizes( |
|
|
current_page_review_df, page_sizes_df |
|
|
) |
|
|
|
|
|
else: |
|
|
print( |
|
|
f"Warning: 'page' column not found in review_df. Cannot filter for page {gradio_annotator_current_page_number}. Skipping update from review_df." |
|
|
) |
|
|
current_page_review_df = pd.DataFrame() |
|
|
|
|
|
if not current_page_review_df.empty: |
|
|
|
|
|
|
|
|
current_page_annotations_list = list() |
|
|
|
|
|
|
|
|
expected_annotation_keys = [ |
|
|
"label", |
|
|
"color", |
|
|
"xmin", |
|
|
"ymin", |
|
|
"xmax", |
|
|
"ymax", |
|
|
"text", |
|
|
"id", |
|
|
] |
|
|
|
|
|
|
|
|
for key in expected_annotation_keys: |
|
|
if key not in current_page_review_df.columns: |
|
|
|
|
|
|
|
|
default_value = ( |
|
|
np.nan if key in ["xmin", "ymin", "xmax", "ymax"] else "" |
|
|
) |
|
|
current_page_review_df[key] = default_value |
|
|
|
|
|
|
|
|
|
|
|
current_page_annotations_list_raw = current_page_review_df[ |
|
|
expected_annotation_keys |
|
|
].to_dict(orient="records") |
|
|
|
|
|
current_page_annotations_list = current_page_annotations_list_raw |
|
|
|
|
|
|
|
|
page_state_entry_found = False |
|
|
for i, page_state_entry in enumerate(out_image_annotations_state): |
|
|
|
|
|
|
|
|
from tools.secure_regex_utils import ( |
|
|
safe_extract_page_number_from_filename, |
|
|
) |
|
|
|
|
|
page_no = safe_extract_page_number_from_filename( |
|
|
page_state_entry["image"] |
|
|
) |
|
|
if page_no is None: |
|
|
page_no = 0 |
|
|
|
|
|
if ( |
|
|
"image" in page_state_entry |
|
|
and page_no == page_num_reported_zero_indexed |
|
|
): |
|
|
|
|
|
out_image_annotations_state[i][ |
|
|
"boxes" |
|
|
] = current_page_annotations_list |
|
|
|
|
|
|
|
|
|
|
|
if ( |
|
|
"image" in current_page_review_df.columns |
|
|
and not current_page_review_df.empty |
|
|
): |
|
|
|
|
|
out_image_annotations_state[i]["image"] = ( |
|
|
current_page_review_df["image"].iloc[0] |
|
|
) |
|
|
page_state_entry_found = True |
|
|
break |
|
|
|
|
|
if not page_state_entry_found: |
|
|
print( |
|
|
f"Warning: Entry for page {gradio_annotator_current_page_number} not found in current_image_annotations_state. Cannot update page annotations." |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
current_image_path = None |
|
|
if ( |
|
|
len(out_image_annotations_state) > page_num_reported_zero_indexed |
|
|
and "image" in out_image_annotations_state[page_num_reported_zero_indexed] |
|
|
): |
|
|
current_image_path = out_image_annotations_state[ |
|
|
page_num_reported_zero_indexed |
|
|
]["image"] |
|
|
else: |
|
|
print( |
|
|
f"Warning: Could not get image path from state for page index {page_num_reported_zero_indexed}." |
|
|
) |
|
|
|
|
|
|
|
|
if current_image_path and not page_sizes_df.empty: |
|
|
try: |
|
|
replaced_image_path, page_sizes_df = ( |
|
|
replace_placeholder_image_with_real_image( |
|
|
doc_full_file_name_textbox, |
|
|
current_image_path, |
|
|
page_sizes_df, |
|
|
gradio_annotator_current_page_number, |
|
|
input_folder, |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
if len(out_image_annotations_state) > page_num_reported_zero_indexed: |
|
|
out_image_annotations_state[page_num_reported_zero_indexed][ |
|
|
"image" |
|
|
] = replaced_image_path |
|
|
|
|
|
if "page" in review_df.columns and "image" in review_df.columns: |
|
|
review_df.loc[ |
|
|
review_df["page"] == gradio_annotator_current_page_number, "image" |
|
|
] = replaced_image_path |
|
|
|
|
|
except Exception as e: |
|
|
print( |
|
|
f"Error during image path replacement for page {gradio_annotator_current_page_number}: {e}" |
|
|
) |
|
|
else: |
|
|
print( |
|
|
f"Warning: Page index {page_num_reported_zero_indexed} out of bounds for all_image_annotations list." |
|
|
) |
|
|
|
|
|
|
|
|
if not page_sizes_df.empty: |
|
|
page_sizes = page_sizes_df.to_dict(orient="records") |
|
|
else: |
|
|
page_sizes = list() |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
out_image_annotations_state = remove_duplicate_images_with_blank_boxes( |
|
|
out_image_annotations_state |
|
|
) |
|
|
except Exception as e: |
|
|
print( |
|
|
f"Error during duplicate removal: {e}. Proceeding without duplicate removal." |
|
|
) |
|
|
|
|
|
|
|
|
if len(out_image_annotations_state) > page_num_reported_zero_indexed: |
|
|
out_current_page_annotator = out_image_annotations_state[ |
|
|
page_num_reported_zero_indexed |
|
|
] |
|
|
else: |
|
|
print( |
|
|
f"Warning: Cannot select current page annotator object for index {page_num_reported_zero_indexed}." |
|
|
) |
|
|
out_current_page_annotator = {} |
|
|
|
|
|
|
|
|
final_page_number_returned = gradio_annotator_current_page_number |
|
|
|
|
|
return ( |
|
|
out_current_page_annotator, |
|
|
out_image_annotations_state, |
|
|
final_page_number_returned, |
|
|
page_sizes, |
|
|
review_df, |
|
|
annotate_previous_page, |
|
|
) |
|
|
|
|
|
|
|
|
def _merge_horizontally_adjacent_boxes( |
|
|
df: pd.DataFrame, x_merge_threshold: int = 0.02 |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Merges horizontally adjacent bounding boxes within the same line. |
|
|
|
|
|
Args: |
|
|
df (pd.DataFrame): DataFrame containing annotation boxes with columns |
|
|
like 'page', 'line', 'xmin', 'xmax', etc. |
|
|
x_merge_threshold (int): The maximum pixel gap on the x-axis to |
|
|
consider two boxes as adjacent. |
|
|
|
|
|
Returns: |
|
|
pd.DataFrame: A new DataFrame with adjacent boxes merged. |
|
|
""" |
|
|
if df.empty: |
|
|
return df |
|
|
|
|
|
|
|
|
df_sorted = df.sort_values(by=["page", "line", "xmin"]).copy() |
|
|
|
|
|
|
|
|
|
|
|
prev_xmax = df_sorted["xmax"].shift(1) |
|
|
prev_page = df_sorted["page"].shift(1) |
|
|
prev_line = df_sorted["line"].shift(1) |
|
|
|
|
|
|
|
|
|
|
|
is_adjacent = ( |
|
|
(df_sorted["page"] == prev_page) |
|
|
& (df_sorted["line"] == prev_line) |
|
|
& (df_sorted["xmin"] - prev_xmax <= x_merge_threshold) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
df_sorted["merge_group"] = (~is_adjacent).cumsum() |
|
|
|
|
|
|
|
|
|
|
|
agg_funcs = { |
|
|
"xmin": "min", |
|
|
"ymin": "min", |
|
|
"xmax": "max", |
|
|
"ymax": "max", |
|
|
"text": lambda s: " ".join(s.astype(str)), |
|
|
|
|
|
"page": "first", |
|
|
"line": "first", |
|
|
"image": "first", |
|
|
"label": "first", |
|
|
"color": "first", |
|
|
} |
|
|
|
|
|
merged_df = df_sorted.groupby("merge_group").agg(agg_funcs).reset_index(drop=True) |
|
|
|
|
|
|
|
|
|
|
|
return merged_df |
|
|
|
|
|
|
|
|
def get_and_merge_current_page_annotations( |
|
|
page_sizes: List[Dict], |
|
|
annotate_current_page: int, |
|
|
existing_annotations_list: List[Dict], |
|
|
existing_annotations_df: pd.DataFrame, |
|
|
) -> pd.DataFrame: |
|
|
""" |
|
|
Function to extract and merge annotations for the current page |
|
|
into the main existing_annotations_df. |
|
|
""" |
|
|
current_page_image = page_sizes[annotate_current_page - 1]["image_path"] |
|
|
|
|
|
existing_annotations_current_page = [ |
|
|
item |
|
|
for item in existing_annotations_list |
|
|
if item["image"] == current_page_image |
|
|
] |
|
|
|
|
|
current_page_annotations_df = convert_annotation_data_to_dataframe( |
|
|
existing_annotations_current_page |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
dfs_to_concat = [ |
|
|
df |
|
|
for df in [existing_annotations_df, current_page_annotations_df] |
|
|
if not df.empty |
|
|
] |
|
|
if dfs_to_concat: |
|
|
updated_df = ( |
|
|
pd.concat(dfs_to_concat, ignore_index=True) |
|
|
.sort_values(by=["page", "xmin", "ymin"]) |
|
|
.drop_duplicates(subset=["id"], keep="first") |
|
|
) |
|
|
else: |
|
|
|
|
|
updated_df = pd.DataFrame( |
|
|
columns=[ |
|
|
"image", |
|
|
"page", |
|
|
"label", |
|
|
"color", |
|
|
"xmin", |
|
|
"xmax", |
|
|
"ymin", |
|
|
"ymax", |
|
|
"text", |
|
|
"id", |
|
|
] |
|
|
) |
|
|
|
|
|
return updated_df |
|
|
|
|
|
|
|
|
def create_annotation_objects_from_filtered_ocr_results_with_words( |
|
|
filtered_ocr_results_with_words_df: pd.DataFrame, |
|
|
ocr_results_with_words_df_base: pd.DataFrame, |
|
|
page_sizes: List[Dict], |
|
|
existing_annotations_df: pd.DataFrame, |
|
|
existing_annotations_list: List[Dict], |
|
|
existing_recogniser_entity_df: pd.DataFrame, |
|
|
redaction_label: str = "Redaction", |
|
|
colour_label: str = "(0, 0, 0)", |
|
|
annotate_current_page: int = 1, |
|
|
progress: gr.Progress = gr.Progress(), |
|
|
) -> Tuple[ |
|
|
List[Dict], List[Dict], pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame |
|
|
]: |
|
|
""" |
|
|
This function processes filtered OCR results with words to create new annotation objects. It merges these new annotations with existing ones, ensuring that horizontally adjacent boxes are combined for cleaner redactions. The function also updates the existing recogniser entity DataFrame and returns the updated annotations in both DataFrame and list-of-dicts formats. |
|
|
|
|
|
Args: |
|
|
filtered_ocr_results_with_words_df (pd.DataFrame): A DataFrame containing filtered OCR results with words. |
|
|
ocr_results_with_words_df_base (pd.DataFrame): The base DataFrame of OCR results with words. |
|
|
page_sizes (List[Dict]): A list of dictionaries containing page sizes. |
|
|
existing_annotations_df (pd.DataFrame): A DataFrame of existing annotations. |
|
|
existing_annotations_list (List[Dict]): A list of dictionaries representing existing annotations. |
|
|
existing_recogniser_entity_df (pd.DataFrame): A DataFrame of existing recogniser entities. |
|
|
progress (gr.Progress, optional): A progress tracker. Defaults to gr.Progress(track_tqdm=True). |
|
|
|
|
|
Returns: |
|
|
Tuple[List[Dict], List[Dict], pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: A tuple containing the updated annotations list, updated existing annotations list, updated annotations DataFrame, updated existing annotations DataFrame, updated recogniser entity DataFrame, and the original existing recogniser entity DataFrame. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
fallback_colour = "(0, 0, 0)" |
|
|
|
|
|
existing_annotations_df = get_and_merge_current_page_annotations( |
|
|
page_sizes, |
|
|
annotate_current_page, |
|
|
existing_annotations_list, |
|
|
existing_annotations_df, |
|
|
) |
|
|
|
|
|
try: |
|
|
valid = False |
|
|
if isinstance(colour_label, str): |
|
|
label_str = colour_label.strip() |
|
|
from tools.secure_regex_utils import safe_extract_rgb_values |
|
|
|
|
|
rgb_values = safe_extract_rgb_values(label_str) |
|
|
if rgb_values: |
|
|
r_val, g_val, b_val = rgb_values |
|
|
if 0 <= r_val <= 255 and 0 <= g_val <= 255 and 0 <= b_val <= 255: |
|
|
valid = True |
|
|
elif isinstance(colour_label, (tuple, list)) and len(colour_label) == 3: |
|
|
r_val, g_val, b_val = colour_label |
|
|
if all(isinstance(v, int) for v in (r_val, g_val, b_val)) and all( |
|
|
0 <= v <= 255 for v in (r_val, g_val, b_val) |
|
|
): |
|
|
colour_label = f"({r_val}, {g_val}, {b_val})" |
|
|
valid = True |
|
|
if not valid: |
|
|
colour_label = fallback_colour |
|
|
except Exception: |
|
|
colour_label = fallback_colour |
|
|
|
|
|
progress(0.2, desc="Identifying new redactions to add") |
|
|
print("Identifying new redactions to add") |
|
|
if filtered_ocr_results_with_words_df.empty: |
|
|
print("No new annotations to add.") |
|
|
updated_annotations_df = existing_annotations_df.copy() |
|
|
else: |
|
|
|
|
|
filtered_ocr_results_with_words_df.index = filtered_ocr_results_with_words_df[ |
|
|
"index" |
|
|
] |
|
|
new_annotations_df = ocr_results_with_words_df_base.loc[ |
|
|
filtered_ocr_results_with_words_df.index |
|
|
].copy() |
|
|
|
|
|
if new_annotations_df.empty: |
|
|
print("No new annotations to add.") |
|
|
updated_annotations_df = existing_annotations_df.copy() |
|
|
else: |
|
|
page_to_image_map = { |
|
|
item["page"]: item["image_path"] for item in page_sizes |
|
|
} |
|
|
|
|
|
|
|
|
new_annotations_df = new_annotations_df.assign( |
|
|
image=lambda df: df["page"].map(page_to_image_map), |
|
|
label=redaction_label, |
|
|
color=colour_label, |
|
|
).rename( |
|
|
columns={ |
|
|
"word_x0": "xmin", |
|
|
"word_y0": "ymin", |
|
|
"word_x1": "xmax", |
|
|
"word_y1": "ymax", |
|
|
"word_text": "text", |
|
|
} |
|
|
) |
|
|
|
|
|
progress(0.3, desc="Checking for adjacent annotations to merge...") |
|
|
|
|
|
new_annotations_df = _merge_horizontally_adjacent_boxes(new_annotations_df) |
|
|
|
|
|
progress(0.4, desc="Creating new redaction IDs...") |
|
|
|
|
|
existing_ids = ( |
|
|
set(existing_annotations_df["id"].dropna()) |
|
|
if "id" in existing_annotations_df.columns |
|
|
else set() |
|
|
) |
|
|
num_new_ids = len(new_annotations_df) |
|
|
new_id_list = _generate_unique_ids(num_new_ids, existing_ids) |
|
|
new_annotations_df["id"] = new_id_list |
|
|
|
|
|
annotation_cols = [ |
|
|
"image", |
|
|
"page", |
|
|
"label", |
|
|
"color", |
|
|
"xmin", |
|
|
"ymin", |
|
|
"xmax", |
|
|
"ymax", |
|
|
"text", |
|
|
"id", |
|
|
] |
|
|
new_annotations_df = new_annotations_df[annotation_cols] |
|
|
|
|
|
key_cols = ["page", "label", "xmin", "ymin", "xmax", "ymax", "text"] |
|
|
|
|
|
progress(0.5, desc="Checking for duplicate redactions") |
|
|
|
|
|
if existing_annotations_df.empty or not all( |
|
|
col in existing_annotations_df.columns for col in key_cols |
|
|
): |
|
|
unique_new_df = new_annotations_df |
|
|
else: |
|
|
|
|
|
merged = pd.merge( |
|
|
new_annotations_df, |
|
|
existing_annotations_df[key_cols].drop_duplicates(), |
|
|
on=key_cols, |
|
|
how="left", |
|
|
indicator=True, |
|
|
) |
|
|
unique_new_df = merged[merged["_merge"] == "left_only"].drop( |
|
|
columns=["_merge"] |
|
|
) |
|
|
|
|
|
print(f"Found {len(unique_new_df)} new unique annotations to add.") |
|
|
gr.Info(f"Found {len(unique_new_df)} new unique annotations to add.") |
|
|
|
|
|
dfs_to_concat = [ |
|
|
df for df in [existing_annotations_df, unique_new_df] if not df.empty |
|
|
] |
|
|
if dfs_to_concat: |
|
|
updated_annotations_df = pd.concat(dfs_to_concat, ignore_index=True) |
|
|
else: |
|
|
|
|
|
updated_annotations_df = pd.DataFrame( |
|
|
columns=[ |
|
|
"image", |
|
|
"page", |
|
|
"label", |
|
|
"color", |
|
|
"xmin", |
|
|
"xmax", |
|
|
"ymin", |
|
|
"ymax", |
|
|
"text", |
|
|
"id", |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
updated_recogniser_entity_df = pd.DataFrame() |
|
|
if not updated_annotations_df.empty: |
|
|
updated_recogniser_entity_df = updated_annotations_df[ |
|
|
["page", "label", "text", "id"] |
|
|
] |
|
|
|
|
|
if not page_sizes: |
|
|
print("Warning: page_sizes is empty. No pages to process.") |
|
|
return ( |
|
|
[], |
|
|
existing_annotations_list, |
|
|
pd.DataFrame(), |
|
|
existing_annotations_df, |
|
|
pd.DataFrame(), |
|
|
existing_recogniser_entity_df, |
|
|
) |
|
|
|
|
|
all_pages_df = pd.DataFrame(page_sizes).rename(columns={"image_path": "image"}) |
|
|
|
|
|
if not updated_annotations_df.empty: |
|
|
page_to_image_map = {item["page"]: item["image_path"] for item in page_sizes} |
|
|
updated_annotations_df["image"] = updated_annotations_df["page"].map( |
|
|
page_to_image_map |
|
|
) |
|
|
merged_df = pd.merge( |
|
|
all_pages_df[["image"]], updated_annotations_df, on="image", how="left" |
|
|
) |
|
|
else: |
|
|
merged_df = all_pages_df[["image"]] |
|
|
|
|
|
|
|
|
|
|
|
image_order = all_pages_df["image"].tolist() |
|
|
|
|
|
|
|
|
|
|
|
merged_df["image"] = pd.Categorical( |
|
|
merged_df["image"], categories=image_order, ordered=True |
|
|
) |
|
|
|
|
|
|
|
|
merged_df = merged_df.sort_values("image") |
|
|
|
|
|
final_annotations_list = list() |
|
|
box_cols = ["label", "color", "xmin", "ymin", "xmax", "ymax", "text", "id"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for image_path, group in merged_df.groupby("image", sort=False, observed=False): |
|
|
|
|
|
|
|
|
if pd.isna(group.iloc[0].get("id")): |
|
|
boxes = list() |
|
|
else: |
|
|
valid_box_cols = [col for col in box_cols if col in group.columns] |
|
|
|
|
|
sorted_group = group.sort_values(by=["ymin", "xmin"]) |
|
|
boxes = sorted_group[valid_box_cols].to_dict("records") |
|
|
|
|
|
final_annotations_list.append({"image": image_path, "boxes": boxes}) |
|
|
|
|
|
progress(1.0, desc="Completed annotation processing") |
|
|
|
|
|
return ( |
|
|
final_annotations_list, |
|
|
existing_annotations_list, |
|
|
updated_annotations_df, |
|
|
existing_annotations_df, |
|
|
updated_recogniser_entity_df, |
|
|
existing_recogniser_entity_df, |
|
|
) |
|
|
|
|
|
|
|
|
def exclude_selected_items_from_redaction( |
|
|
review_df: pd.DataFrame, |
|
|
selected_rows_df: pd.DataFrame, |
|
|
image_file_paths: List[str], |
|
|
page_sizes: List[dict], |
|
|
image_annotations_state: dict, |
|
|
recogniser_entity_dataframe_base: pd.DataFrame, |
|
|
): |
|
|
""" |
|
|
Remove selected items from the review dataframe from the annotation object and review dataframe. |
|
|
""" |
|
|
|
|
|
backup_review_state = review_df |
|
|
backup_image_annotations_state = image_annotations_state |
|
|
backup_recogniser_entity_dataframe_base = recogniser_entity_dataframe_base |
|
|
|
|
|
if not selected_rows_df.empty and not review_df.empty: |
|
|
use_id = ( |
|
|
"id" in selected_rows_df.columns |
|
|
and "id" in review_df.columns |
|
|
and not selected_rows_df["id"].isnull().all() |
|
|
and not review_df["id"].isnull().all() |
|
|
) |
|
|
|
|
|
selected_merge_cols = ["id"] if use_id else ["label", "page", "text"] |
|
|
|
|
|
|
|
|
selected_subset = selected_rows_df[selected_merge_cols].drop_duplicates( |
|
|
subset=selected_merge_cols |
|
|
) |
|
|
|
|
|
|
|
|
merged_df = review_df.merge( |
|
|
selected_subset, on=selected_merge_cols, how="left", indicator=True |
|
|
) |
|
|
out_review_df = merged_df[merged_df["_merge"] == "left_only"].drop( |
|
|
columns=["_merge"] |
|
|
) |
|
|
|
|
|
out_image_annotations_state = convert_review_df_to_annotation_json( |
|
|
out_review_df, image_file_paths, page_sizes |
|
|
) |
|
|
|
|
|
out_recogniser_entity_dataframe_base = out_review_df[ |
|
|
["page", "label", "text", "id"] |
|
|
] |
|
|
|
|
|
|
|
|
else: |
|
|
out_review_df = review_df |
|
|
out_recogniser_entity_dataframe_base = recogniser_entity_dataframe_base |
|
|
out_image_annotations_state = image_annotations_state |
|
|
|
|
|
return ( |
|
|
out_review_df, |
|
|
out_image_annotations_state, |
|
|
out_recogniser_entity_dataframe_base, |
|
|
backup_review_state, |
|
|
backup_image_annotations_state, |
|
|
backup_recogniser_entity_dataframe_base, |
|
|
) |
|
|
|
|
|
|
|
|
def replace_annotator_object_img_np_array_with_page_sizes_image_path( |
|
|
all_image_annotations: List[dict], |
|
|
page_image_annotator_object: AnnotatedImageData, |
|
|
page_sizes: List[dict], |
|
|
page: int, |
|
|
): |
|
|
""" |
|
|
Check if the image value in an AnnotatedImageData dict is a placeholder or np.array. If either of these, replace the value with the file path of the image that is hopefully already loaded into the app related to this page. |
|
|
""" |
|
|
|
|
|
page_zero_index = page - 1 |
|
|
|
|
|
if ( |
|
|
isinstance(all_image_annotations[page_zero_index]["image"], np.ndarray) |
|
|
or "placeholder_image" in all_image_annotations[page_zero_index]["image"] |
|
|
or isinstance(page_image_annotator_object["image"], np.ndarray) |
|
|
): |
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
|
page_sizes_df[["page"]] = page_sizes_df[["page"]].apply( |
|
|
pd.to_numeric, errors="coerce" |
|
|
) |
|
|
|
|
|
|
|
|
matching_paths = page_sizes_df.loc[ |
|
|
page_sizes_df["page"] == page, "image_path" |
|
|
].unique() |
|
|
|
|
|
if matching_paths.size > 0: |
|
|
image_path = matching_paths[0] |
|
|
page_image_annotator_object["image"] = image_path |
|
|
all_image_annotations[page_zero_index]["image"] = image_path |
|
|
else: |
|
|
print(f"No image path found for page {page}.") |
|
|
|
|
|
return page_image_annotator_object, all_image_annotations |
|
|
|
|
|
|
|
|
def replace_placeholder_image_with_real_image( |
|
|
doc_full_file_name_textbox: str, |
|
|
current_image_path: str, |
|
|
page_sizes_df: pd.DataFrame, |
|
|
page_num_reported: int, |
|
|
input_folder: str, |
|
|
): |
|
|
"""If image path is still not valid, load in a new image an overwrite it. Then replace all items in the image annotation object for all pages based on the updated information.""" |
|
|
|
|
|
if page_num_reported <= 0: |
|
|
page_num_reported = 1 |
|
|
|
|
|
page_num_reported_zero_indexed = page_num_reported - 1 |
|
|
|
|
|
if not os.path.exists(current_image_path): |
|
|
|
|
|
page_num, replaced_image_path, width, height = ( |
|
|
process_single_page_for_image_conversion( |
|
|
doc_full_file_name_textbox, |
|
|
page_num_reported_zero_indexed, |
|
|
input_folder=input_folder, |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
page_sizes_df.loc[page_sizes_df["page"] == page_num_reported, "image_width"] = ( |
|
|
width |
|
|
) |
|
|
page_sizes_df.loc[ |
|
|
page_sizes_df["page"] == page_num_reported, "image_height" |
|
|
] = height |
|
|
page_sizes_df.loc[page_sizes_df["page"] == page_num_reported, "image_path"] = ( |
|
|
replaced_image_path |
|
|
) |
|
|
|
|
|
else: |
|
|
if ( |
|
|
not page_sizes_df.loc[ |
|
|
page_sizes_df["page"] == page_num_reported, "image_width" |
|
|
] |
|
|
.isnull() |
|
|
.all() |
|
|
): |
|
|
width = page_sizes_df.loc[ |
|
|
page_sizes_df["page"] == page_num_reported, "image_width" |
|
|
].max() |
|
|
height = page_sizes_df.loc[ |
|
|
page_sizes_df["page"] == page_num_reported, "image_height" |
|
|
].max() |
|
|
else: |
|
|
image = Image.open(current_image_path) |
|
|
width = image.width |
|
|
height = image.height |
|
|
|
|
|
page_sizes_df.loc[ |
|
|
page_sizes_df["page"] == page_num_reported, "image_width" |
|
|
] = width |
|
|
page_sizes_df.loc[ |
|
|
page_sizes_df["page"] == page_num_reported, "image_height" |
|
|
] = height |
|
|
|
|
|
page_sizes_df.loc[page_sizes_df["page"] == page_num_reported, "image_path"] = ( |
|
|
current_image_path |
|
|
) |
|
|
|
|
|
replaced_image_path = current_image_path |
|
|
|
|
|
return replaced_image_path, page_sizes_df |
|
|
|
|
|
|
|
|
def update_annotator_object_and_filter_df( |
|
|
all_image_annotations: List[AnnotatedImageData], |
|
|
gradio_annotator_current_page_number: int, |
|
|
recogniser_entities_dropdown_value: str = "ALL", |
|
|
page_dropdown_value: str = "ALL", |
|
|
page_dropdown_redaction_value: str = "1", |
|
|
text_dropdown_value: str = "ALL", |
|
|
recogniser_dataframe_base: pd.DataFrame = None, |
|
|
zoom: int = 100, |
|
|
review_df: pd.DataFrame = None, |
|
|
page_sizes: List[dict] = list(), |
|
|
doc_full_file_name_textbox: str = "", |
|
|
input_folder: str = INPUT_FOLDER, |
|
|
) -> Tuple[ |
|
|
image_annotator, |
|
|
gr.Number, |
|
|
gr.Number, |
|
|
int, |
|
|
str, |
|
|
gr.Dataframe, |
|
|
pd.DataFrame, |
|
|
List[str], |
|
|
List[str], |
|
|
List[dict], |
|
|
List[AnnotatedImageData], |
|
|
]: |
|
|
""" |
|
|
Update a gradio_image_annotation object with new annotation data for the current page |
|
|
and update filter dataframes, optimizing by processing only the current page's data for display. |
|
|
""" |
|
|
|
|
|
zoom_str = str(zoom) + "%" |
|
|
|
|
|
|
|
|
if review_df is None or not isinstance(review_df, pd.DataFrame): |
|
|
review_df = pd.DataFrame( |
|
|
columns=[ |
|
|
"image", |
|
|
"page", |
|
|
"label", |
|
|
"color", |
|
|
"xmin", |
|
|
"ymin", |
|
|
"xmax", |
|
|
"ymax", |
|
|
"text", |
|
|
"id", |
|
|
] |
|
|
) |
|
|
if recogniser_dataframe_base is None: |
|
|
recogniser_dataframe_base = gr.Dataframe( |
|
|
pd.DataFrame(data={"page": [], "label": [], "text": [], "id": []}) |
|
|
) |
|
|
|
|
|
|
|
|
if not all_image_annotations: |
|
|
print("No all_image_annotation object found") |
|
|
|
|
|
|
|
|
blank_annotator = image_annotator( |
|
|
value=None, |
|
|
boxes_alpha=0.1, |
|
|
box_thickness=1, |
|
|
label_list=list(), |
|
|
label_colors=list(), |
|
|
show_label=False, |
|
|
height=zoom_str, |
|
|
width=zoom_str, |
|
|
box_min_size=1, |
|
|
box_selected_thickness=2, |
|
|
handle_size=4, |
|
|
sources=None, |
|
|
show_clear_button=False, |
|
|
show_share_button=False, |
|
|
show_remove_button=False, |
|
|
handles_cursor=True, |
|
|
interactive=True, |
|
|
use_default_label=True, |
|
|
) |
|
|
blank_df_out_gr = gr.Dataframe( |
|
|
pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
|
) |
|
|
blank_df_modified = pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
|
|
|
|
return ( |
|
|
blank_annotator, |
|
|
gr.Number(value=1), |
|
|
gr.Number(value=1), |
|
|
1, |
|
|
recogniser_entities_dropdown_value, |
|
|
blank_df_out_gr, |
|
|
blank_df_modified, |
|
|
[], |
|
|
[], |
|
|
[], |
|
|
[], |
|
|
[], |
|
|
) |
|
|
|
|
|
|
|
|
page_num_reported = max( |
|
|
1, gradio_annotator_current_page_number |
|
|
) |
|
|
page_max_reported = len(all_image_annotations) |
|
|
if page_num_reported > page_max_reported: |
|
|
page_num_reported = page_max_reported |
|
|
|
|
|
page_num_reported_zero_indexed = page_num_reported - 1 |
|
|
|
|
|
if not page_sizes: |
|
|
page_num_reported = 0 |
|
|
|
|
|
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
|
if not page_sizes_df.empty: |
|
|
page_sizes_df["page"] = pd.to_numeric(page_sizes_df["page"], errors="coerce") |
|
|
page_sizes_df.dropna(subset=["page"], inplace=True) |
|
|
if not page_sizes_df.empty: |
|
|
page_sizes_df["page"] = page_sizes_df["page"].astype(int) |
|
|
else: |
|
|
print("Warning: Page sizes DataFrame became empty after processing.") |
|
|
|
|
|
|
|
|
|
|
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
|
|
|
|
page_object_to_update = all_image_annotations[page_num_reported_zero_indexed] |
|
|
|
|
|
|
|
|
updated_page_object, all_image_annotations_after_img_replace = ( |
|
|
replace_annotator_object_img_np_array_with_page_sizes_image_path( |
|
|
all_image_annotations, |
|
|
page_object_to_update, |
|
|
page_sizes, |
|
|
page_num_reported, |
|
|
) |
|
|
) |
|
|
|
|
|
all_image_annotations = all_image_annotations_after_img_replace |
|
|
|
|
|
|
|
|
current_image_path = updated_page_object.get( |
|
|
"image" |
|
|
) |
|
|
|
|
|
if current_image_path and not page_sizes_df.empty: |
|
|
try: |
|
|
replaced_image_path, page_sizes_df = ( |
|
|
replace_placeholder_image_with_real_image( |
|
|
doc_full_file_name_textbox, |
|
|
current_image_path, |
|
|
page_sizes_df, |
|
|
page_num_reported, |
|
|
input_folder=input_folder, |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
|
all_image_annotations[page_num_reported_zero_indexed][ |
|
|
"image" |
|
|
] = replaced_image_path |
|
|
|
|
|
|
|
|
if "page" in review_df.columns and "image" in review_df.columns: |
|
|
|
|
|
review_df["page"] = ( |
|
|
pd.to_numeric(review_df["page"], errors="coerce") |
|
|
.fillna(-1) |
|
|
.astype(int) |
|
|
) |
|
|
review_df.loc[review_df["page"] == page_num_reported, "image"] = ( |
|
|
replaced_image_path |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
print( |
|
|
f"Error during image path replacement for page {page_num_reported}: {e}" |
|
|
) |
|
|
else: |
|
|
print( |
|
|
f"Warning: Page index {page_num_reported_zero_indexed} out of bounds for all_image_annotations list." |
|
|
) |
|
|
|
|
|
|
|
|
if not page_sizes_df.empty: |
|
|
page_sizes = page_sizes_df.to_dict(orient="records") |
|
|
else: |
|
|
page_sizes = list() |
|
|
|
|
|
|
|
|
current_page_image_annotator_object = None |
|
|
if len(all_image_annotations) > page_num_reported_zero_indexed: |
|
|
page_data_for_display = all_image_annotations[page_num_reported_zero_indexed] |
|
|
|
|
|
|
|
|
|
|
|
current_page_annotations_df = convert_annotation_data_to_dataframe( |
|
|
[page_data_for_display] |
|
|
) |
|
|
|
|
|
if not current_page_annotations_df.empty and not page_sizes_df.empty: |
|
|
|
|
|
try: |
|
|
|
|
|
page_size_row = page_sizes_df[ |
|
|
page_sizes_df["page"] == page_num_reported |
|
|
] |
|
|
if not page_size_row.empty: |
|
|
current_page_annotations_df = multiply_coordinates_by_page_sizes( |
|
|
current_page_annotations_df, |
|
|
page_size_row, |
|
|
xmin="xmin", |
|
|
xmax="xmax", |
|
|
ymin="ymin", |
|
|
ymax="ymax", |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
print( |
|
|
f"Warning: Error during coordinate multiplication for page {page_num_reported}: {e}. Using original coordinates." |
|
|
) |
|
|
|
|
|
|
|
|
if "color" not in current_page_annotations_df.columns: |
|
|
current_page_annotations_df["color"] = "(0, 0, 0)" |
|
|
|
|
|
|
|
|
processed_current_page_annotations_list = current_page_annotations_df[ |
|
|
["xmin", "xmax", "ymin", "ymax", "label", "color", "text", "id"] |
|
|
].to_dict(orient="records") |
|
|
|
|
|
|
|
|
current_page_image_annotator_object: AnnotatedImageData = { |
|
|
"image": page_data_for_display.get( |
|
|
"image" |
|
|
), |
|
|
"boxes": processed_current_page_annotations_list, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
( |
|
|
recogniser_entities_list, |
|
|
recogniser_dataframe_out_gr, |
|
|
recogniser_dataframe_modified, |
|
|
recogniser_entities_dropdown_value, |
|
|
text_entities_drop, |
|
|
page_entities_drop, |
|
|
) = update_recogniser_dataframes( |
|
|
all_image_annotations, |
|
|
recogniser_dataframe_base, |
|
|
recogniser_entities_dropdown_value, |
|
|
text_dropdown_value, |
|
|
page_dropdown_value, |
|
|
review_df.copy(), |
|
|
page_sizes, |
|
|
) |
|
|
|
|
|
recogniser_colour_list = [ |
|
|
(0, 0, 0) for _ in range(len(recogniser_entities_list)) |
|
|
] |
|
|
|
|
|
except Exception as e: |
|
|
print( |
|
|
f"Error calling update_recogniser_dataframes: {e}. Returning empty/default filter data." |
|
|
) |
|
|
recogniser_entities_list = list() |
|
|
recogniser_colour_list = list() |
|
|
recogniser_dataframe_out_gr = gr.Dataframe( |
|
|
pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
|
) |
|
|
recogniser_dataframe_modified = pd.DataFrame( |
|
|
columns=["page", "label", "text", "id"] |
|
|
) |
|
|
text_entities_drop = list() |
|
|
page_entities_drop = list() |
|
|
|
|
|
|
|
|
if page_sizes: |
|
|
page_number_reported_gradio_comp = gr.Number( |
|
|
label="Current page", |
|
|
value=page_num_reported, |
|
|
precision=0, |
|
|
maximum=len(page_sizes), |
|
|
minimum=1, |
|
|
) |
|
|
else: |
|
|
page_number_reported_gradio_comp = gr.Number( |
|
|
label="Current page", value=0, precision=0, maximum=9999, minimum=0 |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if current_page_image_annotator_object is None: |
|
|
|
|
|
|
|
|
print("Warning: Could not prepare annotator object for the current page.") |
|
|
out_image_annotator = image_annotator( |
|
|
value=None, interactive=False |
|
|
) |
|
|
else: |
|
|
if current_page_image_annotator_object["image"].startswith("placeholder_image"): |
|
|
current_page_image_annotator_object["image"], page_sizes_df = ( |
|
|
replace_placeholder_image_with_real_image( |
|
|
doc_full_file_name_textbox, |
|
|
current_page_image_annotator_object["image"], |
|
|
page_sizes_df, |
|
|
gradio_annotator_current_page_number, |
|
|
input_folder, |
|
|
) |
|
|
) |
|
|
|
|
|
out_image_annotator = image_annotator( |
|
|
value=current_page_image_annotator_object, |
|
|
boxes_alpha=0.1, |
|
|
box_thickness=1, |
|
|
label_list=recogniser_entities_list, |
|
|
label_colors=recogniser_colour_list, |
|
|
show_label=False, |
|
|
height=zoom_str, |
|
|
width=zoom_str, |
|
|
box_min_size=1, |
|
|
box_selected_thickness=2, |
|
|
handle_size=4, |
|
|
sources=None, |
|
|
show_clear_button=False, |
|
|
show_share_button=False, |
|
|
show_remove_button=False, |
|
|
handles_cursor=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
page_entities_drop_redaction_list = list() |
|
|
all_pages_in_doc_list = [str(i) for i in range(1, len(page_sizes) + 1)] |
|
|
page_entities_drop_redaction_list.extend(all_pages_in_doc_list) |
|
|
|
|
|
page_entities_drop_redaction = gr.Dropdown( |
|
|
value=page_dropdown_redaction_value, |
|
|
choices=page_entities_drop_redaction_list, |
|
|
label="Page", |
|
|
allow_custom_value=True, |
|
|
) |
|
|
|
|
|
return ( |
|
|
out_image_annotator, |
|
|
page_number_reported_gradio_comp, |
|
|
page_number_reported_gradio_comp, |
|
|
page_num_reported, |
|
|
recogniser_entities_dropdown_value, |
|
|
recogniser_dataframe_out_gr, |
|
|
recogniser_dataframe_modified, |
|
|
text_entities_drop, |
|
|
page_entities_drop, |
|
|
page_entities_drop_redaction, |
|
|
page_sizes, |
|
|
all_image_annotations, |
|
|
) |
|
|
|
|
|
|
|
|
def update_all_page_annotation_object_based_on_previous_page( |
|
|
page_image_annotator_object: AnnotatedImageData, |
|
|
current_page: int, |
|
|
previous_page: int, |
|
|
all_image_annotations: List[AnnotatedImageData], |
|
|
page_sizes: List[dict] = list(), |
|
|
clear_all: bool = False, |
|
|
): |
|
|
""" |
|
|
Overwrite image annotations on the page we are moving from with modifications. |
|
|
""" |
|
|
|
|
|
if current_page > len(page_sizes): |
|
|
raise Warning("Selected page is higher than last page number") |
|
|
elif current_page <= 0: |
|
|
raise Warning("Selected page is lower than first page") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
previous_page_zero_index = previous_page - 1 |
|
|
|
|
|
if not current_page: |
|
|
current_page = 1 |
|
|
|
|
|
|
|
|
page_image_annotator_object, all_image_annotations = ( |
|
|
replace_annotator_object_img_np_array_with_page_sizes_image_path( |
|
|
all_image_annotations, |
|
|
page_image_annotator_object, |
|
|
page_sizes, |
|
|
previous_page, |
|
|
) |
|
|
) |
|
|
|
|
|
if clear_all is False: |
|
|
all_image_annotations[previous_page_zero_index] = page_image_annotator_object |
|
|
else: |
|
|
all_image_annotations[previous_page_zero_index]["boxes"] = list() |
|
|
|
|
|
|
|
|
|
|
|
return all_image_annotations, current_page, current_page |
|
|
|
|
|
|
|
|
def apply_redactions_to_review_df_and_files( |
|
|
page_image_annotator_object: AnnotatedImageData, |
|
|
file_paths: List[str], |
|
|
doc: Document, |
|
|
all_image_annotations: List[AnnotatedImageData], |
|
|
current_page: int, |
|
|
review_file_state: pd.DataFrame, |
|
|
output_folder: str = OUTPUT_FOLDER, |
|
|
save_pdf: bool = True, |
|
|
page_sizes: List[dict] = list(), |
|
|
COMPRESS_REDACTED_PDF: bool = COMPRESS_REDACTED_PDF, |
|
|
input_folder: str = INPUT_FOLDER, |
|
|
progress=gr.Progress(track_tqdm=True), |
|
|
): |
|
|
""" |
|
|
Applies the modified redaction annotations from the UI to the PyMuPDF document |
|
|
and exports the updated review files, including the redacted PDF and associated logs. |
|
|
|
|
|
Args: |
|
|
page_image_annotator_object (AnnotatedImageData): The annotation data for the current page, |
|
|
potentially including user modifications. |
|
|
file_paths (List[str]): A list of file paths associated with the document, typically |
|
|
including the original PDF and any generated image paths. |
|
|
doc (Document): The PyMuPDF Document object representing the PDF file. |
|
|
all_image_annotations (List[AnnotatedImageData]): A list containing annotation data |
|
|
for all pages of the document. |
|
|
current_page (int): The 1-based index of the page currently being processed or viewed. |
|
|
review_file_state (pd.DataFrame): A Pandas DataFrame holding the current state of |
|
|
redaction reviews, reflecting user selections. |
|
|
output_folder (str, optional): The directory where output files (redacted PDFs, |
|
|
log files) will be saved. Defaults to OUTPUT_FOLDER. |
|
|
save_pdf (bool, optional): If True, the redacted PDF will be saved. Defaults to True. |
|
|
page_sizes (List[dict], optional): A list of dictionaries, each containing size |
|
|
information (e.g., width, height) for a page. |
|
|
Defaults to an empty list. |
|
|
COMPRESS_REDACTED_PDF (bool, optional): If True, the output PDF will be compressed. |
|
|
Defaults to COMPRESS_REDACTED_PDF. |
|
|
input_folder (str, optional): The directory where input files are located and where |
|
|
page images should be saved. Defaults to INPUT_FOLDER. |
|
|
progress (gr.Progress, optional): Gradio progress object for tracking task progress. |
|
|
Defaults to gr.Progress(track_tqdm=True). |
|
|
|
|
|
Returns: |
|
|
Tuple[Document, List[AnnotatedImageData], List[str], List[str], pd.DataFrame]: |
|
|
- doc: The updated PyMuPDF Document object (potentially redacted). |
|
|
- all_image_annotations: The updated list of all image annotations. |
|
|
- output_files: A list of paths to the generated output files (e.g., redacted PDF). |
|
|
- output_log_files: A list of paths to any generated log files. |
|
|
- review_df: The final Pandas DataFrame representing the review state. |
|
|
""" |
|
|
|
|
|
output_files = list() |
|
|
output_log_files = list() |
|
|
pdf_doc = list() |
|
|
review_df = review_file_state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
page_image_annotator_object = all_image_annotations[current_page - 1] |
|
|
|
|
|
|
|
|
page_image_annotator_object, all_image_annotations = ( |
|
|
replace_annotator_object_img_np_array_with_page_sizes_image_path( |
|
|
all_image_annotations, page_image_annotator_object, page_sizes, current_page |
|
|
) |
|
|
) |
|
|
page_image_annotator_object["image"] = all_image_annotations[current_page - 1][ |
|
|
"image" |
|
|
] |
|
|
|
|
|
if not page_image_annotator_object: |
|
|
print("No image annotations object found for page") |
|
|
return doc, all_image_annotations, output_files, output_log_files, review_df |
|
|
|
|
|
if isinstance(file_paths, str): |
|
|
file_paths = [file_paths] |
|
|
|
|
|
for file_path in file_paths: |
|
|
file_name_without_ext = get_file_name_without_type(file_path) |
|
|
file_name_with_ext = os.path.basename(file_path) |
|
|
|
|
|
file_extension = os.path.splitext(file_path)[1].lower() |
|
|
|
|
|
if save_pdf is True: |
|
|
|
|
|
if (is_pdf(file_path) is False) & (file_extension not in ".csv"): |
|
|
image = Image.open(file_paths[-1]) |
|
|
|
|
|
draw = ImageDraw.Draw(image) |
|
|
|
|
|
for img_annotation_box in page_image_annotator_object["boxes"]: |
|
|
coords = [ |
|
|
img_annotation_box["xmin"], |
|
|
img_annotation_box["ymin"], |
|
|
img_annotation_box["xmax"], |
|
|
img_annotation_box["ymax"], |
|
|
] |
|
|
|
|
|
fill = img_annotation_box["color"] |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(fill, (list, tuple)) and len(fill) == 3: |
|
|
|
|
|
if isinstance(fill, list): |
|
|
fill = tuple(fill) |
|
|
|
|
|
|
|
|
valid_rgb = True |
|
|
converted_fill = [] |
|
|
|
|
|
for c in fill: |
|
|
if isinstance(c, (int, float)): |
|
|
|
|
|
if isinstance(c, float) and 0 <= c <= 1: |
|
|
converted_fill.append(int(c * 255)) |
|
|
|
|
|
elif isinstance(c, int) and 0 <= c <= 255: |
|
|
converted_fill.append(c) |
|
|
|
|
|
elif isinstance(c, float) and c > 1: |
|
|
converted_fill.append(int(c)) |
|
|
else: |
|
|
valid_rgb = False |
|
|
break |
|
|
else: |
|
|
valid_rgb = False |
|
|
break |
|
|
|
|
|
if valid_rgb: |
|
|
fill = tuple(converted_fill) |
|
|
else: |
|
|
print( |
|
|
f"Invalid color values: {fill}. Defaulting to CUSTOM_BOX_COLOUR." |
|
|
) |
|
|
fill = CUSTOM_BOX_COLOUR |
|
|
else: |
|
|
print( |
|
|
f"Invalid fill format: {fill}. Defaulting to CUSTOM_BOX_COLOUR." |
|
|
) |
|
|
fill = CUSTOM_BOX_COLOUR |
|
|
|
|
|
|
|
|
if image.mode not in ("RGB", "RGBA"): |
|
|
image = image.convert("RGB") |
|
|
|
|
|
draw = ImageDraw.Draw(image) |
|
|
|
|
|
draw.rectangle(coords, fill=fill) |
|
|
|
|
|
output_image_path = ( |
|
|
output_folder + file_name_without_ext + "_redacted.png" |
|
|
) |
|
|
image.save(output_folder + file_name_without_ext + "_redacted.png") |
|
|
|
|
|
output_files.append(output_image_path) |
|
|
|
|
|
doc = [image] |
|
|
|
|
|
elif file_extension in ".csv": |
|
|
pdf_doc = list() |
|
|
|
|
|
|
|
|
elif is_pdf(file_path) is True: |
|
|
pdf_doc = pymupdf.open(file_path) |
|
|
orig_pdf_file_path = file_path |
|
|
|
|
|
output_files.append(orig_pdf_file_path) |
|
|
|
|
|
number_of_pages = pdf_doc.page_count |
|
|
original_cropboxes = list() |
|
|
|
|
|
|
|
|
review_pdf_doc = None |
|
|
if RETURN_PDF_FOR_REVIEW: |
|
|
review_pdf_doc = pymupdf.open(file_path) |
|
|
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
|
page_sizes_df[["page"]] = page_sizes_df[["page"]].apply( |
|
|
pd.to_numeric, errors="coerce" |
|
|
) |
|
|
|
|
|
for i in progress.tqdm( |
|
|
range(0, number_of_pages), |
|
|
desc="Saving redacted pages to file", |
|
|
unit="pages", |
|
|
): |
|
|
|
|
|
image_loc = all_image_annotations[i]["image"] |
|
|
|
|
|
|
|
|
if isinstance(image_loc, np.ndarray): |
|
|
image = Image.fromarray(image_loc.astype("uint8")) |
|
|
elif isinstance(image_loc, Image.Image): |
|
|
image = image_loc |
|
|
elif isinstance(image_loc, str): |
|
|
if not os.path.exists(image_loc): |
|
|
image = page_sizes_df.loc[ |
|
|
page_sizes_df["page"] == i, "image_path" |
|
|
] |
|
|
try: |
|
|
image = Image.open(image_loc) |
|
|
except Exception: |
|
|
image = None |
|
|
|
|
|
pymupdf_page = pdf_doc.load_page(i) |
|
|
original_cropboxes.append(pymupdf_page.cropbox) |
|
|
pymupdf_page.set_cropbox(pymupdf_page.mediabox) |
|
|
|
|
|
|
|
|
if RETURN_PDF_FOR_REVIEW and review_pdf_doc: |
|
|
review_pymupdf_page = review_pdf_doc.load_page(i) |
|
|
review_pymupdf_page.set_cropbox(review_pymupdf_page.mediabox) |
|
|
|
|
|
|
|
|
review_pymupdf_page = redact_page_with_pymupdf( |
|
|
page=review_pymupdf_page, |
|
|
page_annotations=all_image_annotations[i], |
|
|
image=image, |
|
|
original_cropbox=original_cropboxes[-1], |
|
|
page_sizes_df=page_sizes_df, |
|
|
return_pdf_for_review=True, |
|
|
return_pdf_end_of_redaction=False, |
|
|
input_folder=input_folder, |
|
|
) |
|
|
|
|
|
|
|
|
pymupdf_page = redact_page_with_pymupdf( |
|
|
page=pymupdf_page, |
|
|
page_annotations=all_image_annotations[i], |
|
|
image=image, |
|
|
original_cropbox=original_cropboxes[-1], |
|
|
page_sizes_df=page_sizes_df, |
|
|
return_pdf_for_review=False, |
|
|
return_pdf_end_of_redaction=False, |
|
|
input_folder=input_folder, |
|
|
) |
|
|
else: |
|
|
print("File type not recognised.") |
|
|
|
|
|
progress(0.9, "Saving output files") |
|
|
|
|
|
if pdf_doc: |
|
|
|
|
|
out_pdf_file_path = ( |
|
|
output_folder + file_name_without_ext + "_redacted.pdf" |
|
|
) |
|
|
save_pdf_with_or_without_compression( |
|
|
pdf_doc, out_pdf_file_path, COMPRESS_REDACTED_PDF |
|
|
) |
|
|
output_files.append(out_pdf_file_path) |
|
|
|
|
|
|
|
|
if RETURN_PDF_FOR_REVIEW and review_pdf_doc: |
|
|
out_review_pdf_file_path = ( |
|
|
output_folder |
|
|
+ file_name_without_ext |
|
|
+ "_redactions_for_review.pdf" |
|
|
) |
|
|
print("Saving PDF file for review:", out_review_pdf_file_path) |
|
|
save_pdf_with_or_without_compression( |
|
|
review_pdf_doc, out_review_pdf_file_path, COMPRESS_REDACTED_PDF |
|
|
) |
|
|
output_files.append(out_review_pdf_file_path) |
|
|
|
|
|
else: |
|
|
print("PDF input not found. Outputs not saved to PDF.") |
|
|
|
|
|
|
|
|
else: |
|
|
if is_pdf(file_path) is True: |
|
|
orig_pdf_file_path = file_path |
|
|
output_files.append(orig_pdf_file_path) |
|
|
|
|
|
try: |
|
|
|
|
|
review_df = convert_annotation_json_to_review_df( |
|
|
all_image_annotations, review_file_state.copy(), page_sizes=page_sizes |
|
|
) |
|
|
|
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
|
page_sizes_df.loc[:, "page"] = pd.to_numeric( |
|
|
page_sizes_df["page"], errors="coerce" |
|
|
) |
|
|
review_df = divide_coordinates_by_page_sizes(review_df, page_sizes_df) |
|
|
|
|
|
review_df = review_df[ |
|
|
[ |
|
|
"image", |
|
|
"page", |
|
|
"label", |
|
|
"color", |
|
|
"xmin", |
|
|
"ymin", |
|
|
"xmax", |
|
|
"ymax", |
|
|
"text", |
|
|
"id", |
|
|
] |
|
|
] |
|
|
|
|
|
out_review_file_file_path = ( |
|
|
output_folder + file_name_with_ext + "_review_file.csv" |
|
|
) |
|
|
|
|
|
review_df.to_csv(out_review_file_file_path, index=None) |
|
|
output_files.append(out_review_file_file_path) |
|
|
|
|
|
except Exception as e: |
|
|
print( |
|
|
"In apply redactions function, could not save annotations to csv file:", |
|
|
e, |
|
|
) |
|
|
|
|
|
return doc, all_image_annotations, output_files, output_log_files, review_df |
|
|
|
|
|
|
|
|
def get_boxes_json(annotations: AnnotatedImageData): |
|
|
return annotations["boxes"] |
|
|
|
|
|
|
|
|
def update_all_entity_df_dropdowns( |
|
|
df: pd.DataFrame, |
|
|
label_dropdown_value: str, |
|
|
page_dropdown_value: str, |
|
|
text_dropdown_value: str, |
|
|
): |
|
|
""" |
|
|
Update all dropdowns based on rows that exist in a dataframe |
|
|
""" |
|
|
|
|
|
if isinstance(label_dropdown_value, str): |
|
|
label_dropdown_value = [label_dropdown_value] |
|
|
if isinstance(page_dropdown_value, str): |
|
|
page_dropdown_value = [page_dropdown_value] |
|
|
if isinstance(text_dropdown_value, str): |
|
|
text_dropdown_value = [text_dropdown_value] |
|
|
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
if not label_dropdown_value[0]: |
|
|
label_dropdown_value[0] = "ALL" |
|
|
if not text_dropdown_value[0]: |
|
|
text_dropdown_value[0] = "ALL" |
|
|
if not page_dropdown_value[0]: |
|
|
page_dropdown_value[0] = "1" |
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "label" |
|
|
) |
|
|
recogniser_entities_drop = gr.Dropdown( |
|
|
value=label_dropdown_value[0], |
|
|
choices=recogniser_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "text" |
|
|
) |
|
|
text_entities_drop = gr.Dropdown( |
|
|
value=text_dropdown_value[0], |
|
|
choices=text_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "page" |
|
|
) |
|
|
page_entities_drop = gr.Dropdown( |
|
|
value=page_dropdown_value[0], |
|
|
choices=page_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
return recogniser_entities_drop, text_entities_drop, page_entities_drop |
|
|
|
|
|
|
|
|
def update_entities_df_recogniser_entities( |
|
|
choice: str, df: pd.DataFrame, page_dropdown_value: str, text_dropdown_value: str |
|
|
): |
|
|
""" |
|
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
|
""" |
|
|
|
|
|
if isinstance(choice, str): |
|
|
choice = [choice] |
|
|
if isinstance(page_dropdown_value, str): |
|
|
page_dropdown_value = [page_dropdown_value] |
|
|
if isinstance(text_dropdown_value, str): |
|
|
text_dropdown_value = [text_dropdown_value] |
|
|
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
|
|
|
if "ALL" not in page_dropdown_value: |
|
|
filtered_df = filtered_df[ |
|
|
filtered_df["page"].astype(str).isin(page_dropdown_value) |
|
|
] |
|
|
|
|
|
if "ALL" not in text_dropdown_value: |
|
|
filtered_df = filtered_df[ |
|
|
filtered_df["text"].astype(str).isin(text_dropdown_value) |
|
|
] |
|
|
|
|
|
if "ALL" not in choice: |
|
|
filtered_df = filtered_df[filtered_df["label"].astype(str).isin(choice)] |
|
|
|
|
|
if not choice[0]: |
|
|
choice[0] = "ALL" |
|
|
if not text_dropdown_value[0]: |
|
|
text_dropdown_value[0] = "ALL" |
|
|
if not page_dropdown_value[0]: |
|
|
page_dropdown_value[0] = "1" |
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "label" |
|
|
) |
|
|
gr.Dropdown( |
|
|
value=choice[0], |
|
|
choices=recogniser_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "text" |
|
|
) |
|
|
text_entities_drop = gr.Dropdown( |
|
|
value=text_dropdown_value[0], |
|
|
choices=text_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "page" |
|
|
) |
|
|
page_entities_drop = gr.Dropdown( |
|
|
value=page_dropdown_value[0], |
|
|
choices=page_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
return filtered_df, text_entities_drop, page_entities_drop |
|
|
|
|
|
|
|
|
def update_entities_df_page( |
|
|
choice: str, df: pd.DataFrame, label_dropdown_value: str, text_dropdown_value: str |
|
|
): |
|
|
""" |
|
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
|
""" |
|
|
if isinstance(choice, str): |
|
|
choice = [choice] |
|
|
elif not isinstance(choice, list): |
|
|
choice = [str(choice)] |
|
|
if isinstance(label_dropdown_value, str): |
|
|
label_dropdown_value = [label_dropdown_value] |
|
|
elif not isinstance(label_dropdown_value, list): |
|
|
label_dropdown_value = [str(label_dropdown_value)] |
|
|
if isinstance(text_dropdown_value, str): |
|
|
text_dropdown_value = [text_dropdown_value] |
|
|
elif not isinstance(text_dropdown_value, list): |
|
|
text_dropdown_value = [str(text_dropdown_value)] |
|
|
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
|
|
|
if "ALL" not in text_dropdown_value: |
|
|
filtered_df = filtered_df[ |
|
|
filtered_df["text"].astype(str).isin(text_dropdown_value) |
|
|
] |
|
|
|
|
|
if "ALL" not in label_dropdown_value: |
|
|
filtered_df = filtered_df[ |
|
|
filtered_df["label"].astype(str).isin(label_dropdown_value) |
|
|
] |
|
|
|
|
|
if "ALL" not in choice: |
|
|
filtered_df = filtered_df[filtered_df["page"].astype(str).isin(choice)] |
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "label" |
|
|
) |
|
|
recogniser_entities_drop = gr.Dropdown( |
|
|
value=label_dropdown_value[0], |
|
|
choices=recogniser_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "text" |
|
|
) |
|
|
text_entities_drop = gr.Dropdown( |
|
|
value=text_dropdown_value[0], |
|
|
choices=text_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "page" |
|
|
) |
|
|
gr.Dropdown( |
|
|
value=choice[0], |
|
|
choices=page_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
return filtered_df, recogniser_entities_drop, text_entities_drop |
|
|
|
|
|
|
|
|
def update_redact_choice_df_from_page_dropdown(choice: str, df: pd.DataFrame): |
|
|
""" |
|
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
|
""" |
|
|
if isinstance(choice, str): |
|
|
choice = [choice] |
|
|
elif not isinstance(choice, list): |
|
|
choice = [str(choice)] |
|
|
|
|
|
if "index" not in df.columns: |
|
|
df["index"] = df.index |
|
|
|
|
|
filtered_df = df[ |
|
|
[ |
|
|
"page", |
|
|
"line", |
|
|
"word_text", |
|
|
"word_x0", |
|
|
"word_y0", |
|
|
"word_x1", |
|
|
"word_y1", |
|
|
"index", |
|
|
] |
|
|
].copy() |
|
|
|
|
|
|
|
|
if "ALL" not in choice: |
|
|
filtered_df = filtered_df.loc[filtered_df["page"].astype(str).isin(choice)] |
|
|
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "page" |
|
|
) |
|
|
gr.Dropdown( |
|
|
value=choice[0], |
|
|
choices=page_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
return filtered_df |
|
|
|
|
|
|
|
|
def update_entities_df_text( |
|
|
choice: str, df: pd.DataFrame, label_dropdown_value: str, page_dropdown_value: str |
|
|
): |
|
|
""" |
|
|
Update the rows in a dataframe depending on the user choice from a dropdown |
|
|
""" |
|
|
if isinstance(choice, str): |
|
|
choice = [choice] |
|
|
if isinstance(label_dropdown_value, str): |
|
|
label_dropdown_value = [label_dropdown_value] |
|
|
if isinstance(page_dropdown_value, str): |
|
|
page_dropdown_value = [page_dropdown_value] |
|
|
|
|
|
filtered_df = df.copy() |
|
|
|
|
|
|
|
|
if "ALL" not in page_dropdown_value: |
|
|
filtered_df = filtered_df[ |
|
|
filtered_df["page"].astype(str).isin(page_dropdown_value) |
|
|
] |
|
|
|
|
|
if "ALL" not in label_dropdown_value: |
|
|
filtered_df = filtered_df[ |
|
|
filtered_df["label"].astype(str).isin(label_dropdown_value) |
|
|
] |
|
|
|
|
|
if "ALL" not in choice: |
|
|
filtered_df = filtered_df[filtered_df["text"].astype(str).isin(choice)] |
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "label" |
|
|
) |
|
|
recogniser_entities_drop = gr.Dropdown( |
|
|
value=label_dropdown_value[0], |
|
|
choices=recogniser_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "text" |
|
|
) |
|
|
gr.Dropdown( |
|
|
value=choice[0], |
|
|
choices=text_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe( |
|
|
filtered_df, "page" |
|
|
) |
|
|
page_entities_drop = gr.Dropdown( |
|
|
value=page_dropdown_value[0], |
|
|
choices=page_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
return filtered_df, recogniser_entities_drop, page_entities_drop |
|
|
|
|
|
|
|
|
def reset_dropdowns(df: pd.DataFrame): |
|
|
""" |
|
|
Return Gradio dropdown objects with value 'ALL'. |
|
|
""" |
|
|
|
|
|
recogniser_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "label") |
|
|
recogniser_entities_drop = gr.Dropdown( |
|
|
value="ALL", |
|
|
choices=recogniser_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
text_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "text") |
|
|
text_entities_drop = gr.Dropdown( |
|
|
value="ALL", |
|
|
choices=text_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
page_entities_for_drop = update_dropdown_list_based_on_dataframe(df, "page") |
|
|
page_entities_drop = gr.Dropdown( |
|
|
value="ALL", |
|
|
choices=page_entities_for_drop, |
|
|
allow_custom_value=True, |
|
|
interactive=True, |
|
|
) |
|
|
|
|
|
return recogniser_entities_drop, text_entities_drop, page_entities_drop |
|
|
|
|
|
|
|
|
def increase_bottom_page_count_based_on_top(page_number: int): |
|
|
return int(page_number) |
|
|
|
|
|
|
|
|
def df_select_callback_dataframe_row_ocr_with_words( |
|
|
df: pd.DataFrame, evt: gr.SelectData |
|
|
): |
|
|
|
|
|
row_value_page = int(evt.row_value[0]) |
|
|
row_value_line = int(evt.row_value[1]) |
|
|
row_value_text = evt.row_value[2] |
|
|
|
|
|
row_value_x0 = evt.row_value[3] |
|
|
row_value_y0 = evt.row_value[4] |
|
|
row_value_x1 = evt.row_value[5] |
|
|
row_value_y1 = evt.row_value[6] |
|
|
row_value_index = evt.row_value[7] |
|
|
|
|
|
row_value_df = pd.DataFrame( |
|
|
data={ |
|
|
"page": [row_value_page], |
|
|
"line": [row_value_line], |
|
|
"word_text": [row_value_text], |
|
|
"word_x0": [row_value_x0], |
|
|
"word_y0": [row_value_y0], |
|
|
"word_x1": [row_value_x1], |
|
|
"word_y1": [row_value_y1], |
|
|
"index": row_value_index, |
|
|
} |
|
|
) |
|
|
|
|
|
return row_value_df, row_value_text |
|
|
|
|
|
|
|
|
def df_select_callback_dataframe_row(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
|
|
row_value_page = int(evt.row_value[0]) |
|
|
row_value_label = evt.row_value[1] |
|
|
row_value_text = evt.row_value[2] |
|
|
row_value_id = evt.row_value[3] |
|
|
|
|
|
row_value_df = pd.DataFrame( |
|
|
data={ |
|
|
"page": [row_value_page], |
|
|
"label": [row_value_label], |
|
|
"text": [row_value_text], |
|
|
"id": [row_value_id], |
|
|
} |
|
|
) |
|
|
|
|
|
return row_value_df, row_value_text |
|
|
|
|
|
|
|
|
def df_select_callback_textract_api(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
|
|
row_value_job_id = evt.row_value[0] |
|
|
|
|
|
row_value_job_type = evt.row_value[2] |
|
|
|
|
|
row_value_df = pd.DataFrame( |
|
|
data={"job_id": [row_value_job_id], "label": [row_value_job_type]} |
|
|
) |
|
|
|
|
|
return row_value_job_id, row_value_job_type, row_value_df |
|
|
|
|
|
|
|
|
def df_select_callback_cost(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
|
|
row_value_code = evt.row_value[0] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return row_value_code |
|
|
|
|
|
|
|
|
def df_select_callback_ocr(df: pd.DataFrame, evt: gr.SelectData): |
|
|
|
|
|
row_value_page = int(evt.row_value[0]) |
|
|
row_value_text = evt.row_value[1] |
|
|
|
|
|
row_value_df = pd.DataFrame( |
|
|
data={"page": [row_value_page], "text": [row_value_text]} |
|
|
) |
|
|
|
|
|
return row_value_page, row_value_df |
|
|
|
|
|
|
|
|
|
|
|
def store_duplicate_selection(evt: gr.SelectData): |
|
|
if not evt.empty: |
|
|
selected_index = evt.index[0] |
|
|
else: |
|
|
selected_index = None |
|
|
|
|
|
return selected_index |
|
|
|
|
|
|
|
|
def get_all_rows_with_same_text(df: pd.DataFrame, text: str): |
|
|
""" |
|
|
Get all rows with the same text as the selected row |
|
|
""" |
|
|
if text: |
|
|
|
|
|
return df.loc[df["text"] == text] |
|
|
else: |
|
|
return pd.DataFrame(columns=["page", "label", "text", "id"]) |
|
|
|
|
|
|
|
|
def get_all_rows_with_same_text_redact(df: pd.DataFrame, text: str): |
|
|
""" |
|
|
Get all rows with the same text as the selected row for redaction tasks |
|
|
""" |
|
|
if "index" not in df.columns: |
|
|
df["index"] = df.index |
|
|
|
|
|
if text and not df.empty: |
|
|
|
|
|
return df.loc[df["word_text"] == text] |
|
|
else: |
|
|
return pd.DataFrame( |
|
|
columns=[ |
|
|
"page", |
|
|
"line", |
|
|
"label", |
|
|
"word_text", |
|
|
"word_x0", |
|
|
"word_y0", |
|
|
"word_x1", |
|
|
"word_y1", |
|
|
"index", |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
def update_selected_review_df_row_colour( |
|
|
redaction_row_selection: pd.DataFrame, |
|
|
review_df: pd.DataFrame, |
|
|
previous_id: str = "", |
|
|
previous_colour: str = "(0, 0, 0)", |
|
|
colour: str = "(1, 0, 255)", |
|
|
) -> tuple[pd.DataFrame, str, str]: |
|
|
""" |
|
|
Update the colour of a single redaction box based on the values in a selection row |
|
|
(Optimized Version) |
|
|
""" |
|
|
|
|
|
|
|
|
if "color" not in review_df.columns: |
|
|
review_df["color"] = previous_colour if previous_id else "(0, 0, 0)" |
|
|
|
|
|
|
|
|
if "id" not in review_df.columns: |
|
|
|
|
|
|
|
|
|
|
|
print("Warning: 'id' column not found. Calling fill_missing_ids.") |
|
|
review_df = fill_missing_ids( |
|
|
review_df |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if previous_id and previous_id in review_df["id"].values: |
|
|
review_df.loc[review_df["id"] == previous_id, "color"] = previous_colour |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
review_df.loc[review_df["color"] == colour, "color"] = "(0, 0, 0)" |
|
|
|
|
|
if not redaction_row_selection.empty and not review_df.empty: |
|
|
use_id = ( |
|
|
"id" in redaction_row_selection.columns |
|
|
and "id" in review_df.columns |
|
|
and not redaction_row_selection["id"].isnull().all() |
|
|
and not review_df["id"].isnull().all() |
|
|
) |
|
|
|
|
|
selected_merge_cols = ["id"] if use_id else ["label", "page", "text"] |
|
|
|
|
|
|
|
|
|
|
|
merged_reviews = review_df.merge( |
|
|
redaction_row_selection[selected_merge_cols], |
|
|
on=selected_merge_cols, |
|
|
how="inner", |
|
|
) |
|
|
|
|
|
if not merged_reviews.empty: |
|
|
|
|
|
|
|
|
|
|
|
new_previous_colour = str(merged_reviews["color"].iloc[0]) |
|
|
new_previous_id = merged_reviews["id"].iloc[0] |
|
|
|
|
|
|
|
|
|
|
|
if use_id: |
|
|
|
|
|
review_df.loc[review_df["id"].isin(merged_reviews["id"]), "color"] = ( |
|
|
colour |
|
|
) |
|
|
else: |
|
|
|
|
|
|
|
|
def create_merge_key(df, cols): |
|
|
return df[cols].astype(str).agg("_".join, axis=1) |
|
|
|
|
|
review_df_key = create_merge_key(review_df, selected_merge_cols) |
|
|
merged_reviews_key = create_merge_key( |
|
|
merged_reviews, selected_merge_cols |
|
|
) |
|
|
|
|
|
review_df.loc[review_df_key.isin(merged_reviews_key), "color"] = colour |
|
|
|
|
|
previous_colour = new_previous_colour |
|
|
previous_id = new_previous_id |
|
|
else: |
|
|
|
|
|
print("No reviews found matching selection criteria") |
|
|
|
|
|
|
|
|
|
|
|
previous_colour = ( |
|
|
"(0, 0, 0)" |
|
|
) |
|
|
previous_id = "" |
|
|
|
|
|
else: |
|
|
|
|
|
review_df.loc[review_df["color"] == colour, "color"] = "(0, 0, 0)" |
|
|
previous_colour = "(0, 0, 0)" |
|
|
previous_id = "" |
|
|
|
|
|
|
|
|
|
|
|
if set( |
|
|
[ |
|
|
"image", |
|
|
"page", |
|
|
"label", |
|
|
"color", |
|
|
"xmin", |
|
|
"ymin", |
|
|
"xmax", |
|
|
"ymax", |
|
|
"text", |
|
|
"id", |
|
|
] |
|
|
).issubset(review_df.columns): |
|
|
review_df = review_df[ |
|
|
[ |
|
|
"image", |
|
|
"page", |
|
|
"label", |
|
|
"color", |
|
|
"xmin", |
|
|
"ymin", |
|
|
"xmax", |
|
|
"ymax", |
|
|
"text", |
|
|
"id", |
|
|
] |
|
|
] |
|
|
else: |
|
|
print( |
|
|
"Warning: Not all expected columns are present in review_df for reordering." |
|
|
) |
|
|
|
|
|
return review_df, previous_id, previous_colour |
|
|
|
|
|
|
|
|
def update_boxes_color( |
|
|
images: list, redaction_row_selection: pd.DataFrame, colour: tuple = (0, 255, 0) |
|
|
): |
|
|
""" |
|
|
Update the color of bounding boxes in the images list based on redaction_row_selection. |
|
|
|
|
|
Parameters: |
|
|
- images (list): List of dictionaries containing image paths and box metadata. |
|
|
- redaction_row_selection (pd.DataFrame): DataFrame with 'page', 'label', and optionally 'text' columns. |
|
|
- colour (tuple): RGB tuple for the new color. |
|
|
|
|
|
Returns: |
|
|
- Updated list with modified colors. |
|
|
""" |
|
|
|
|
|
selection_set = set( |
|
|
zip(redaction_row_selection["page"], redaction_row_selection["label"]) |
|
|
) |
|
|
|
|
|
for page_idx, image_obj in enumerate(images): |
|
|
if "boxes" in image_obj: |
|
|
for box in image_obj["boxes"]: |
|
|
if (page_idx, box["label"]) in selection_set: |
|
|
box["color"] = colour |
|
|
|
|
|
return images |
|
|
|
|
|
|
|
|
def update_other_annotator_number_from_current(page_number_first_counter: int): |
|
|
return page_number_first_counter |
|
|
|
|
|
|
|
|
def convert_image_coords_to_adobe( |
|
|
pdf_page_width: float, |
|
|
pdf_page_height: float, |
|
|
image_width: float, |
|
|
image_height: float, |
|
|
x1: float, |
|
|
y1: float, |
|
|
x2: float, |
|
|
y2: float, |
|
|
): |
|
|
""" |
|
|
Converts coordinates from image space to Adobe PDF space. |
|
|
|
|
|
Parameters: |
|
|
- pdf_page_width: Width of the PDF page |
|
|
- pdf_page_height: Height of the PDF page |
|
|
- image_width: Width of the source image |
|
|
- image_height: Height of the source image |
|
|
- x1, y1, x2, y2: Coordinates in image space |
|
|
- page_sizes: List of dicts containing sizes of page as pymupdf page or PIL image |
|
|
|
|
|
Returns: |
|
|
- Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space |
|
|
""" |
|
|
|
|
|
|
|
|
scale_width = pdf_page_width / image_width |
|
|
scale_height = pdf_page_height / image_height |
|
|
|
|
|
|
|
|
pdf_x1 = x1 * scale_width |
|
|
pdf_x2 = x2 * scale_width |
|
|
|
|
|
|
|
|
|
|
|
pdf_y1 = pdf_page_height - (y1 * scale_height) |
|
|
pdf_y2 = pdf_page_height - (y2 * scale_height) |
|
|
|
|
|
|
|
|
if pdf_y1 > pdf_y2: |
|
|
pdf_y1, pdf_y2 = pdf_y2, pdf_y1 |
|
|
|
|
|
return pdf_x1, pdf_y1, pdf_x2, pdf_y2 |
|
|
|
|
|
|
|
|
def convert_pymupdf_coords_to_adobe( |
|
|
x1: float, y1: float, x2: float, y2: float, pdf_page_height: float |
|
|
): |
|
|
""" |
|
|
Converts coordinates from PyMuPDF (fitz) space to Adobe PDF space. |
|
|
|
|
|
Parameters: |
|
|
- x1, y1, x2, y2: Coordinates in PyMuPDF space |
|
|
- pdf_page_height: Total height of the PDF page |
|
|
|
|
|
Returns: |
|
|
- Tuple of converted coordinates (x1, y1, x2, y2) in Adobe PDF space |
|
|
""" |
|
|
|
|
|
|
|
|
adobe_y1 = pdf_page_height - y2 |
|
|
adobe_y2 = pdf_page_height - y1 |
|
|
|
|
|
return x1, adobe_y1, x2, adobe_y2 |
|
|
|
|
|
|
|
|
def create_xfdf( |
|
|
review_file_df: pd.DataFrame, |
|
|
pdf_path: str, |
|
|
pymupdf_doc: object, |
|
|
image_paths: List[str] = list(), |
|
|
document_cropboxes: List = list(), |
|
|
page_sizes: List[dict] = list(), |
|
|
): |
|
|
""" |
|
|
Create an xfdf file from a review csv file and a pdf |
|
|
""" |
|
|
xfdf_root = Element( |
|
|
"xfdf", xmlns="http://ns.adobe.com/xfdf/", **{"xml:space": "preserve"} |
|
|
) |
|
|
annots = SubElement(xfdf_root, "annots") |
|
|
|
|
|
if page_sizes: |
|
|
page_sizes_df = pd.DataFrame(page_sizes) |
|
|
if not page_sizes_df.empty and "mediabox_width" not in review_file_df.columns: |
|
|
review_file_df = review_file_df.merge(page_sizes_df, how="left", on="page") |
|
|
if "xmin" in review_file_df.columns and review_file_df["xmin"].max() <= 1: |
|
|
if ( |
|
|
"mediabox_width" in review_file_df.columns |
|
|
and "mediabox_height" in review_file_df.columns |
|
|
): |
|
|
review_file_df["xmin"] = ( |
|
|
review_file_df["xmin"] * review_file_df["mediabox_width"] |
|
|
) |
|
|
review_file_df["xmax"] = ( |
|
|
review_file_df["xmax"] * review_file_df["mediabox_width"] |
|
|
) |
|
|
review_file_df["ymin"] = ( |
|
|
review_file_df["ymin"] * review_file_df["mediabox_height"] |
|
|
) |
|
|
review_file_df["ymax"] = ( |
|
|
review_file_df["ymax"] * review_file_df["mediabox_height"] |
|
|
) |
|
|
elif "image_width" in review_file_df.columns and not page_sizes_df.empty: |
|
|
review_file_df = multiply_coordinates_by_page_sizes( |
|
|
review_file_df, |
|
|
page_sizes_df, |
|
|
xmin="xmin", |
|
|
xmax="xmax", |
|
|
ymin="ymin", |
|
|
ymax="ymax", |
|
|
) |
|
|
|
|
|
for _, row in review_file_df.iterrows(): |
|
|
page_num_reported = int(row["page"]) |
|
|
page_python_format = page_num_reported - 1 |
|
|
pymupdf_page = pymupdf_doc.load_page(page_python_format) |
|
|
|
|
|
if document_cropboxes and page_python_format < len(document_cropboxes): |
|
|
from tools.secure_regex_utils import safe_extract_numbers |
|
|
|
|
|
match = safe_extract_numbers(document_cropboxes[page_python_format]) |
|
|
if match and len(match) == 4: |
|
|
rect_values = list(map(float, match)) |
|
|
pymupdf_page.set_cropbox(Rect(*rect_values)) |
|
|
|
|
|
pdf_page_height = pymupdf_page.mediabox.height |
|
|
redact_annot = SubElement(annots, "redact") |
|
|
redact_annot.set("opacity", "0.500000") |
|
|
redact_annot.set("interior-color", "#000000") |
|
|
|
|
|
now = datetime.now( |
|
|
timezone(timedelta(hours=1)) |
|
|
) |
|
|
date_str = ( |
|
|
now.strftime("D:%Y%m%d%H%M%S") |
|
|
+ now.strftime("%z")[:3] |
|
|
+ "'" |
|
|
+ now.strftime("%z")[3:] |
|
|
+ "'" |
|
|
) |
|
|
redact_annot.set("date", date_str) |
|
|
|
|
|
annot_id = str(uuid.uuid4()) |
|
|
redact_annot.set("name", annot_id) |
|
|
redact_annot.set("page", str(page_python_format)) |
|
|
redact_annot.set("mimetype", "Form") |
|
|
|
|
|
x1_pdf, y1_pdf, x2_pdf, y2_pdf = ( |
|
|
row["xmin"], |
|
|
row["ymin"], |
|
|
row["xmax"], |
|
|
row["ymax"], |
|
|
) |
|
|
adobe_x1, adobe_y1, adobe_x2, adobe_y2 = convert_pymupdf_coords_to_adobe( |
|
|
x1_pdf, y1_pdf, x2_pdf, y2_pdf, pdf_page_height |
|
|
) |
|
|
redact_annot.set( |
|
|
"rect", f"{adobe_x1:.6f},{adobe_y1:.6f},{adobe_x2:.6f},{adobe_y2:.6f}" |
|
|
) |
|
|
|
|
|
redact_annot.set( |
|
|
"subject", str(row["label"]) |
|
|
) |
|
|
redact_annot.set( |
|
|
"title", str(row.get("label", "Unknown")) |
|
|
) |
|
|
|
|
|
contents_richtext = SubElement(redact_annot, "contents-richtext") |
|
|
body_attrs = { |
|
|
"xmlns": "http://www.w3.org/1999/xhtml", |
|
|
"{http://www.xfa.org/schema/xfa-data/1.0/}APIVersion": "Acrobat:25.1.0", |
|
|
"{http://www.xfa.org/schema/xfa-data/1.0/}spec": "2.0.2", |
|
|
} |
|
|
body = SubElement(contents_richtext, "body", attrib=body_attrs) |
|
|
p_element = SubElement(body, "p", dir="ltr") |
|
|
span_attrs = { |
|
|
"dir": "ltr", |
|
|
"style": "font-size:10.0pt;text-align:left;color:#000000;font-weight:normal;font-style:normal", |
|
|
} |
|
|
span_element = SubElement(p_element, "span", attrib=span_attrs) |
|
|
span_element.text = str(row["text"]).strip() |
|
|
|
|
|
pdf_ops_for_black_fill_and_outline = [ |
|
|
"1 w", |
|
|
"0 g", |
|
|
"0 G", |
|
|
"1 0 0 1 0 0 cm", |
|
|
f"{adobe_x1:.2f} {adobe_y1:.2f} m", |
|
|
f"{adobe_x2:.2f} {adobe_y1:.2f} l", |
|
|
f"{adobe_x2:.2f} {adobe_y2:.2f} l", |
|
|
f"{adobe_x1:.2f} {adobe_y2:.2f} l", |
|
|
"h", |
|
|
"B", |
|
|
] |
|
|
data_content_string = "\n".join(pdf_ops_for_black_fill_and_outline) + "\n" |
|
|
data_element = SubElement(redact_annot, "data") |
|
|
data_element.set("MODE", "filtered") |
|
|
data_element.set("encoding", "ascii") |
|
|
data_element.set("length", str(len(data_content_string.encode("ascii")))) |
|
|
data_element.text = data_content_string |
|
|
|
|
|
rough_string = tostring(xfdf_root, encoding="unicode", method="xml") |
|
|
reparsed = defused_minidom.parseString(rough_string) |
|
|
return reparsed.toxml() |
|
|
|
|
|
|
|
|
def convert_df_to_xfdf( |
|
|
input_files: List[str], |
|
|
pdf_doc: Document, |
|
|
image_paths: List[str], |
|
|
output_folder: str = OUTPUT_FOLDER, |
|
|
document_cropboxes: List = list(), |
|
|
page_sizes: List[dict] = list(), |
|
|
): |
|
|
""" |
|
|
Load in files to convert a review file into an Adobe comment file format |
|
|
""" |
|
|
output_paths = list() |
|
|
pdf_name = "" |
|
|
file_path_name = "" |
|
|
|
|
|
if isinstance(input_files, str): |
|
|
file_paths_list = [input_files] |
|
|
else: |
|
|
file_paths_list = input_files |
|
|
|
|
|
|
|
|
file_paths_list = sorted( |
|
|
file_paths_list, |
|
|
key=lambda x: ( |
|
|
os.path.splitext(x)[1] != ".pdf", |
|
|
os.path.splitext(x)[1] != ".json", |
|
|
), |
|
|
) |
|
|
|
|
|
for file in file_paths_list: |
|
|
|
|
|
if isinstance(file, str): |
|
|
file_path = file |
|
|
else: |
|
|
file_path = file.name |
|
|
|
|
|
file_path_name = get_file_name_without_type(file_path) |
|
|
file_path_end = detect_file_type(file_path) |
|
|
|
|
|
if file_path_end == "pdf": |
|
|
pdf_name = os.path.basename(file_path) |
|
|
|
|
|
if file_path_end == "csv" and "review_file" in file_path_name: |
|
|
|
|
|
if not pdf_name: |
|
|
pdf_name = file_path_name |
|
|
|
|
|
review_file_df = pd.read_csv(file_path) |
|
|
|
|
|
|
|
|
if "text" in review_file_df.columns: |
|
|
review_file_df["text"] = review_file_df["text"].fillna("") |
|
|
if "label" in review_file_df.columns: |
|
|
review_file_df["label"] = review_file_df["label"].fillna("") |
|
|
|
|
|
xfdf_content = create_xfdf( |
|
|
review_file_df, |
|
|
pdf_name, |
|
|
pdf_doc, |
|
|
image_paths, |
|
|
document_cropboxes, |
|
|
page_sizes, |
|
|
) |
|
|
|
|
|
|
|
|
secure_file_write( |
|
|
output_folder, |
|
|
file_path_name + "_adobe.xfdf", |
|
|
xfdf_content, |
|
|
encoding="utf-8", |
|
|
) |
|
|
|
|
|
|
|
|
output_path = output_folder + file_path_name + "_adobe.xfdf" |
|
|
|
|
|
output_paths.append(output_path) |
|
|
|
|
|
return output_paths |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convert_adobe_coords_to_image( |
|
|
pdf_page_width: float, |
|
|
pdf_page_height: float, |
|
|
image_width: float, |
|
|
image_height: float, |
|
|
x1: float, |
|
|
y1: float, |
|
|
x2: float, |
|
|
y2: float, |
|
|
): |
|
|
""" |
|
|
Converts coordinates from Adobe PDF space to image space. |
|
|
|
|
|
Parameters: |
|
|
- pdf_page_width: Width of the PDF page |
|
|
- pdf_page_height: Height of the PDF page |
|
|
- image_width: Width of the source image |
|
|
- image_height: Height of the source image |
|
|
- x1, y1, x2, y2: Coordinates in Adobe PDF space |
|
|
|
|
|
Returns: |
|
|
- Tuple of converted coordinates (x1, y1, x2, y2) in image space |
|
|
""" |
|
|
|
|
|
|
|
|
scale_width = image_width / pdf_page_width |
|
|
scale_height = image_height / pdf_page_height |
|
|
|
|
|
|
|
|
image_x1 = x1 * scale_width |
|
|
image_x2 = x2 * scale_width |
|
|
|
|
|
|
|
|
|
|
|
image_y1 = (pdf_page_height - y1) * scale_height |
|
|
image_y2 = (pdf_page_height - y2) * scale_height |
|
|
|
|
|
|
|
|
if image_y1 > image_y2: |
|
|
image_y1, image_y2 = image_y2, image_y1 |
|
|
|
|
|
return image_x1, image_y1, image_x2, image_y2 |
|
|
|
|
|
|
|
|
def parse_xfdf(xfdf_path: str): |
|
|
""" |
|
|
Parse the XFDF file and extract redaction annotations. |
|
|
|
|
|
Parameters: |
|
|
- xfdf_path: Path to the XFDF file |
|
|
|
|
|
Returns: |
|
|
- List of dictionaries containing redaction information |
|
|
""" |
|
|
|
|
|
|
|
|
tree = defused_etree.parse(xfdf_path) |
|
|
root = tree.getroot() |
|
|
|
|
|
|
|
|
namespace = {"xfdf": "http://ns.adobe.com/xfdf/"} |
|
|
|
|
|
redactions = list() |
|
|
|
|
|
|
|
|
for redact in root.findall(".//xfdf:redact", namespaces=namespace): |
|
|
|
|
|
|
|
|
text_content = "" |
|
|
|
|
|
|
|
|
|
|
|
contents_richtext = redact.find( |
|
|
".//xfdf:contents-richtext", namespaces=namespace |
|
|
) |
|
|
|
|
|
if contents_richtext is not None: |
|
|
|
|
|
|
|
|
|
|
|
text_content = "".join(contents_richtext.itertext()).strip() |
|
|
|
|
|
|
|
|
if not text_content: |
|
|
text_content = redact.get("contents", "") |
|
|
|
|
|
redaction_info = { |
|
|
"image": "", |
|
|
"page": int(redact.get("page")) + 1, |
|
|
"xmin": float(redact.get("rect").split(",")[0]), |
|
|
"ymin": float(redact.get("rect").split(",")[1]), |
|
|
"xmax": float(redact.get("rect").split(",")[2]), |
|
|
"ymax": float(redact.get("rect").split(",")[3]), |
|
|
"label": redact.get("title"), |
|
|
"text": text_content, |
|
|
"color": redact.get( |
|
|
"border-color", "(0, 0, 0)" |
|
|
), |
|
|
} |
|
|
redactions.append(redaction_info) |
|
|
|
|
|
return redactions |
|
|
|
|
|
|
|
|
def convert_xfdf_to_dataframe( |
|
|
file_paths_list: List[str], |
|
|
pymupdf_doc: Document, |
|
|
image_paths: List[str], |
|
|
output_folder: str = OUTPUT_FOLDER, |
|
|
input_folder: str = INPUT_FOLDER, |
|
|
): |
|
|
""" |
|
|
Convert redaction annotations from XFDF and associated images into a DataFrame. |
|
|
|
|
|
Parameters: |
|
|
- xfdf_path: Path to the XFDF file |
|
|
- pdf_doc: PyMuPDF document object |
|
|
- image_paths: List of PIL Image objects corresponding to PDF pages |
|
|
- output_folder: Output folder for file save |
|
|
- input_folder: Input folder for image creation |
|
|
|
|
|
Returns: |
|
|
- DataFrame containing redaction information |
|
|
""" |
|
|
output_paths = list() |
|
|
df = pd.DataFrame() |
|
|
pdf_name = "" |
|
|
pdf_path = "" |
|
|
|
|
|
|
|
|
file_paths_list = sorted( |
|
|
file_paths_list, |
|
|
key=lambda x: ( |
|
|
os.path.splitext(x)[1] != ".pdf", |
|
|
os.path.splitext(x)[1] != ".json", |
|
|
), |
|
|
) |
|
|
|
|
|
for file in file_paths_list: |
|
|
|
|
|
if isinstance(file, str): |
|
|
file_path = file |
|
|
else: |
|
|
file_path = file.name |
|
|
|
|
|
file_path_name = get_file_name_without_type(file_path) |
|
|
file_path_end = detect_file_type(file_path) |
|
|
|
|
|
if file_path_end == "pdf": |
|
|
pdf_name = os.path.basename(file_path) |
|
|
pdf_path = file_path |
|
|
|
|
|
|
|
|
output_paths.append(file_path) |
|
|
|
|
|
if file_path_end == "xfdf": |
|
|
|
|
|
if not pdf_name: |
|
|
message = "Original PDF needed to convert from .xfdf format" |
|
|
print(message) |
|
|
raise ValueError(message) |
|
|
xfdf_path = file |
|
|
|
|
|
file_path_name = get_file_name_without_type(xfdf_path) |
|
|
|
|
|
|
|
|
redactions = parse_xfdf(xfdf_path) |
|
|
|
|
|
|
|
|
df = pd.DataFrame(redactions) |
|
|
|
|
|
df.fillna("", inplace=True) |
|
|
|
|
|
for _, row in df.iterrows(): |
|
|
page_python_format = int(row["page"]) - 1 |
|
|
|
|
|
pymupdf_page = pymupdf_doc.load_page(page_python_format) |
|
|
|
|
|
pdf_page_height = pymupdf_page.rect.height |
|
|
pdf_page_width = pymupdf_page.rect.width |
|
|
|
|
|
image_path = image_paths[page_python_format] |
|
|
|
|
|
if isinstance(image_path, str): |
|
|
try: |
|
|
image = Image.open(image_path) |
|
|
except Exception: |
|
|
|
|
|
|
|
|
page_num, out_path, width, height = ( |
|
|
process_single_page_for_image_conversion( |
|
|
pdf_path, page_python_format, input_folder=input_folder |
|
|
) |
|
|
) |
|
|
|
|
|
image = Image.open(out_path) |
|
|
|
|
|
image_page_width, image_page_height = image.size |
|
|
|
|
|
|
|
|
image_x1, image_y1, image_x2, image_y2 = convert_adobe_coords_to_image( |
|
|
pdf_page_width, |
|
|
pdf_page_height, |
|
|
image_page_width, |
|
|
image_page_height, |
|
|
row["xmin"], |
|
|
row["ymin"], |
|
|
row["xmax"], |
|
|
row["ymax"], |
|
|
) |
|
|
|
|
|
df.loc[_, ["xmin", "ymin", "xmax", "ymax"]] = [ |
|
|
image_x1, |
|
|
image_y1, |
|
|
image_x2, |
|
|
image_y2, |
|
|
] |
|
|
|
|
|
|
|
|
df.loc[_, "image"] = image_path |
|
|
|
|
|
out_file_path = output_folder + file_path_name + "_review_file.csv" |
|
|
df.to_csv(out_file_path, index=None) |
|
|
|
|
|
output_paths.append(out_file_path) |
|
|
|
|
|
gr.Info( |
|
|
f"Review file saved to {out_file_path}. Now click on '1. Upload original pdf' to view the pdf with the annotations." |
|
|
) |
|
|
|
|
|
return output_paths |
|
|
|