document_redaction / tools /find_duplicate_tabular.py
seanpedrickcase's picture
Sync: Merge pull request #108 from seanpedrick-case/dev
546be9b
import os
import time
from pathlib import Path
from typing import Dict, List, Tuple
import gradio as gr
import pandas as pd
from gradio import Progress
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from tools.config import (
DO_INITIAL_TABULAR_DATA_CLEAN,
MAX_SIMULTANEOUS_FILES,
MAX_TABLE_ROWS,
REMOVE_DUPLICATE_ROWS,
)
from tools.data_anonymise import initial_clean
from tools.helper_functions import OUTPUT_FOLDER, read_file
from tools.load_spacy_model_custom_recognisers import nlp
from tools.secure_path_utils import secure_join
def clean_and_stem_text_series(
df: pd.DataFrame,
column: str,
do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN,
):
"""
Clean and stem text columns in a data frame for tabular data
"""
# Function to apply lemmatisation and remove stopwords
def _apply_lemmatization(text):
doc = nlp(text)
# Keep only alphabetic tokens and remove stopwords
lemmatized_words = [
token.lemma_ for token in doc if token.is_alpha and not token.is_stop
]
return " ".join(lemmatized_words)
# Always create text_clean column first
if do_initial_clean_dup:
df["text_clean"] = initial_clean(df[column])
else:
df["text_clean"] = df[column]
df["text_clean"] = df["text_clean"].apply(_apply_lemmatization)
df["text_clean"] = df[
"text_clean"
].str.lower() # .str.replace(r'[^\w\s]', '', regex=True)
return df
def convert_tabular_data_to_analysis_format(
df: pd.DataFrame, file_name: str, text_columns: List[str] = None
) -> List[Tuple[str, pd.DataFrame]]:
"""
Convert tabular data (CSV/XLSX) to the format needed for duplicate analysis.
Args:
df (pd.DataFrame): The input DataFrame
file_name (str): Name of the file
text_columns (List[str], optional): Columns to analyze for duplicates.
If None, uses all string columns.
Returns:
List[Tuple[str, pd.DataFrame]]: List containing (file_name, processed_df) tuple
"""
# if text_columns is None:
# # Auto-detect text columns (string type columns)
# print(f"No text columns given for {file_name}")
# return []
# text_columns = df.select_dtypes(include=['object', 'string']).columns.tolist()
text_columns = [col for col in text_columns if col in df.columns]
if not text_columns:
print(f"No text columns found in {file_name}")
return list()
# Create a copy to avoid modifying original
df_copy = df.copy()
# Create a combined text column from all text columns
df_copy["combined_text"] = (
df_copy[text_columns].fillna("").astype(str).agg(" ".join, axis=1)
)
# Add row identifier
df_copy["row_id"] = df_copy.index
# Create the format expected by the duplicate detection system
# Using 'row_number' as row number and 'text' as the combined text
processed_df = pd.DataFrame(
{
"row_number": df_copy["row_id"],
"text": df_copy["combined_text"],
"file": file_name,
}
)
# Add original row data for reference
for col in text_columns:
processed_df[f"original_{col}"] = df_copy[col]
return [(file_name, processed_df)]
def find_duplicate_cells_in_tabular_data(
input_files: List[str],
similarity_threshold: float = 0.95,
min_word_count: int = 3,
text_columns: List[str] = [],
output_folder: str = OUTPUT_FOLDER,
do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN,
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
in_excel_tabular_sheets: str = "",
progress: Progress = Progress(track_tqdm=True),
) -> Tuple[pd.DataFrame, List[str], Dict[str, pd.DataFrame]]:
"""
Find duplicate cells/text in tabular data files (CSV, XLSX, Parquet).
Args:
input_files (List[str]): List of file paths to analyze
similarity_threshold (float): Minimum similarity score to consider duplicates
min_word_count (int): Minimum word count for text to be considered
text_columns (List[str], optional): Specific columns to analyze
output_folder (str, optional): Output folder for results
do_initial_clean_dup (bool, optional): Whether to do initial clean of text
progress (Progress): Progress tracking object
Returns:
Tuple containing:
- results_df: DataFrame with duplicate matches
- output_paths: List of output file paths
- full_data_by_file: Dictionary of processed data by file
"""
if not input_files:
raise gr.Error("Please upload files to analyze.")
progress(0.1, desc="Loading and processing files...")
all_data_to_process = list()
full_data_by_file = dict()
file_paths = list()
# Process each file
for file_path in input_files:
try:
if file_path.endswith(".xlsx") or file_path.endswith(".xls"):
temp_df = pd.DataFrame()
# Try finding each sheet in the given list until a match is found
for sheet_name in in_excel_tabular_sheets:
temp_df = read_file(file_path, excel_sheet_name=sheet_name)
# If sheet was successfully_loaded
if not temp_df.empty:
if temp_df.shape[0] > MAX_TABLE_ROWS:
out_message = f"Number of rows in {file_path} for sheet {sheet_name} is greater than {MAX_TABLE_ROWS}. Please submit a smaller file."
print(out_message)
raise Exception(out_message)
file_name = os.path.basename(file_path) + "_" + sheet_name
file_paths.append(file_path)
# Convert to analysis format
processed_data = convert_tabular_data_to_analysis_format(
temp_df, file_name, text_columns
)
if processed_data:
all_data_to_process.extend(processed_data)
full_data_by_file[file_name] = processed_data[0][1]
temp_df = pd.DataFrame()
else:
temp_df = read_file(file_path)
if temp_df.shape[0] > MAX_TABLE_ROWS:
out_message = f"Number of rows in {file_path} is greater than {MAX_TABLE_ROWS}. Please submit a smaller file."
print(out_message)
raise Exception(out_message)
file_name = os.path.basename(file_path)
file_paths.append(file_path)
# Convert to analysis format
processed_data = convert_tabular_data_to_analysis_format(
temp_df, file_name, text_columns
)
if processed_data:
all_data_to_process.extend(processed_data)
full_data_by_file[file_name] = processed_data[0][1]
except Exception as e:
print(f"Error processing {file_path}: {e}")
continue
if not all_data_to_process:
raise gr.Error("No valid data found in uploaded files.")
progress(0.2, desc="Combining data...")
# Combine all data
combined_df = pd.concat(
[data[1] for data in all_data_to_process], ignore_index=True
)
combined_df = combined_df.drop_duplicates(subset=["row_number", "file"])
progress(0.3, desc="Cleaning and preparing text...")
# Clean and prepare text
combined_df = clean_and_stem_text_series(
combined_df, "text", do_initial_clean_dup=do_initial_clean_dup
)
# Filter by minimum word count
combined_df["word_count"] = (
combined_df["text_clean"].str.split().str.len().fillna(0)
)
combined_df = combined_df[combined_df["word_count"] >= min_word_count].copy()
if len(combined_df) < 2:
return pd.DataFrame(), [], full_data_by_file
progress(0.4, desc="Calculating similarities...")
# Calculate similarities
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform(combined_df["text_clean"])
similarity_matrix = cosine_similarity(tfidf_matrix, dense_output=False)
# Find similar pairs
coo_matrix = similarity_matrix.tocoo()
similar_pairs = [
(r, c, v)
for r, c, v in zip(coo_matrix.row, coo_matrix.col, coo_matrix.data)
if r < c and v >= similarity_threshold
]
if not similar_pairs:
gr.Info("No duplicate cells found.")
return pd.DataFrame(), [], full_data_by_file
progress(0.7, desc="Processing results...")
# Create results DataFrame
results_data = []
for row1, row2, similarity in similar_pairs:
row1_data = combined_df.iloc[row1]
row2_data = combined_df.iloc[row2]
results_data.append(
{
"File1": row1_data["file"],
"Row1": int(row1_data["row_number"]),
"File2": row2_data["file"],
"Row2": int(row2_data["row_number"]),
"Similarity_Score": round(similarity, 3),
"Text1": (
row1_data["text"][:200] + "..."
if len(row1_data["text"]) > 200
else row1_data["text"]
),
"Text2": (
row2_data["text"][:200] + "..."
if len(row2_data["text"]) > 200
else row2_data["text"]
),
"Original_Index1": row1,
"Original_Index2": row2,
}
)
results_df = pd.DataFrame(results_data)
results_df = results_df.sort_values(["File1", "Row1", "File2", "Row2"])
progress(0.9, desc="Saving results...")
# Save results
output_paths = save_tabular_duplicate_results(
results_df,
output_folder,
file_paths,
remove_duplicate_rows=remove_duplicate_rows,
in_excel_tabular_sheets=in_excel_tabular_sheets,
)
gr.Info(f"Found {len(results_df)} duplicate cell matches")
return results_df, output_paths, full_data_by_file
def save_tabular_duplicate_results(
results_df: pd.DataFrame,
output_folder: str,
file_paths: List[str],
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
in_excel_tabular_sheets: List[str] = [],
) -> List[str]:
"""
Save tabular duplicate detection results to files.
Args:
results_df (pd.DataFrame): Results DataFrame
output_folder (str): Output folder path
file_paths (List[str]): List of file paths
remove_duplicate_rows (bool): Whether to remove duplicate rows
in_excel_tabular_sheets (str): Name of the Excel sheet to save the results to
Returns:
List[str]: List of output file paths
"""
output_paths = list()
output_folder_path = Path(output_folder)
output_folder_path.mkdir(exist_ok=True)
if results_df.empty:
print("No duplicate matches to save.")
return list()
# Save main results
results_file = output_folder_path / "tabular_duplicate_results.csv"
results_df.to_csv(results_file, index=False, encoding="utf-8-sig")
output_paths.append(str(results_file))
# Group results by original file to handle Excel files properly
excel_files_processed = dict() # Track which Excel files have been processed
# Save per-file duplicate lists
for file_name, group in results_df.groupby("File2"):
# Check for matches with original file names
for original_file in file_paths:
original_file_name = os.path.basename(original_file)
if original_file_name in file_name:
original_file_extension = os.path.splitext(original_file)[-1]
if original_file_extension in [".xlsx", ".xls"]:
# Split the string using secure regex to handle both .xlsx_ and .xls_ delimiters
from tools.secure_regex_utils import safe_split_filename
parts = safe_split_filename(
os.path.basename(file_name), [".xlsx_", ".xls_"]
)
# The sheet name is the last part after splitting
file_sheet_name = parts[-1]
file_path = original_file
# Initialize Excel file tracking if not already done
if file_path not in excel_files_processed:
excel_files_processed[file_path] = {
"sheets_data": dict(),
"all_sheets": list(),
"processed_sheets": set(),
}
# Read the original Excel file to get all sheet names
if not excel_files_processed[file_path]["all_sheets"]:
try:
excel_file = pd.ExcelFile(file_path)
excel_files_processed[file_path][
"all_sheets"
] = excel_file.sheet_names
except Exception as e:
print(f"Error reading Excel file {file_path}: {e}")
continue
# Read the current sheet
df = read_file(file_path, excel_sheet_name=file_sheet_name)
# Create duplicate rows file for this sheet
file_stem = Path(file_name).stem
duplicate_rows_file = (
output_folder_path
/ f"{file_stem}_{file_sheet_name}_duplicate_rows.csv"
)
# Get unique row numbers to remove
rows_to_remove = sorted(group["Row2"].unique())
duplicate_df = pd.DataFrame({"Row_to_Remove": rows_to_remove})
duplicate_df.to_csv(duplicate_rows_file, index=False)
output_paths.append(str(duplicate_rows_file))
# Process the sheet data
df_cleaned = df.copy()
df_cleaned["duplicated"] = False
df_cleaned.loc[rows_to_remove, "duplicated"] = True
if remove_duplicate_rows:
df_cleaned = df_cleaned.drop(index=rows_to_remove)
# Store the processed sheet data
excel_files_processed[file_path]["sheets_data"][
file_sheet_name
] = df_cleaned
excel_files_processed[file_path]["processed_sheets"].add(
file_sheet_name
)
else:
file_sheet_name = ""
file_path = original_file
print("file_path after match:", file_path)
file_base_name = os.path.basename(file_path)
df = read_file(file_path)
file_stem = Path(file_name).stem
duplicate_rows_file = (
output_folder_path / f"{file_stem}_duplicate_rows.csv"
)
# Get unique row numbers to remove
rows_to_remove = sorted(group["Row2"].unique())
duplicate_df = pd.DataFrame({"Row_to_Remove": rows_to_remove})
duplicate_df.to_csv(duplicate_rows_file, index=False)
output_paths.append(str(duplicate_rows_file))
df_cleaned = df.copy()
df_cleaned["duplicated"] = False
df_cleaned.loc[rows_to_remove, "duplicated"] = True
if remove_duplicate_rows:
df_cleaned = df_cleaned.drop(index=rows_to_remove)
file_ext = os.path.splitext(file_name)[-1]
if file_ext in [".parquet"]:
output_path = secure_join(
output_folder, f"{file_base_name}_deduplicated.parquet"
)
df_cleaned.to_parquet(output_path, index=False)
else:
output_path = secure_join(
output_folder, f"{file_base_name}_deduplicated.csv"
)
df_cleaned.to_csv(
output_path, index=False, encoding="utf-8-sig"
)
output_paths.append(str(output_path))
break
# Process Excel files to create complete deduplicated files
for file_path, file_data in excel_files_processed.items():
try:
# Create output filename
file_base_name = os.path.splitext(os.path.basename(file_path))[0]
file_ext = os.path.splitext(file_path)[-1]
output_path = secure_join(
output_folder, f"{file_base_name}_deduplicated{file_ext}"
)
# Create Excel writer
with pd.ExcelWriter(output_path, engine="openpyxl") as writer:
# Write all sheets
for sheet_name in file_data["all_sheets"]:
if sheet_name in file_data["processed_sheets"]:
# Use the processed (deduplicated) version
file_data["sheets_data"][sheet_name].to_excel(
writer, sheet_name=sheet_name, index=False
)
else:
# Use the original sheet (no duplicates found)
original_df = read_file(file_path, excel_sheet_name=sheet_name)
original_df.to_excel(writer, sheet_name=sheet_name, index=False)
output_paths.append(str(output_path))
print(f"Created deduplicated Excel file: {output_path}")
except Exception as e:
print(f"Error creating deduplicated Excel file for {file_path}: {e}")
continue
return output_paths
def remove_duplicate_rows_from_tabular_data(
file_path: str,
duplicate_rows: List[int],
output_folder: str = OUTPUT_FOLDER,
in_excel_tabular_sheets: List[str] = [],
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
) -> str:
"""
Remove duplicate rows from a tabular data file.
Args:
file_path (str): Path to the input file
duplicate_rows (List[int]): List of row indices to remove
output_folder (str): Output folder for cleaned file
in_excel_tabular_sheets (str): Name of the Excel sheet to save the results to
remove_duplicate_rows (bool): Whether to remove duplicate rows
Returns:
str: Path to the cleaned file
"""
try:
# Load the file
df = read_file(
file_path,
excel_sheet_name=in_excel_tabular_sheets if in_excel_tabular_sheets else "",
)
# Remove duplicate rows (0-indexed)
df_cleaned = df.drop(index=duplicate_rows).reset_index(drop=True)
# Save cleaned file
file_name = os.path.basename(file_path)
file_stem = os.path.splitext(file_name)[0]
file_ext = os.path.splitext(file_name)[-1]
output_path = secure_join(output_folder, f"{file_stem}_deduplicated{file_ext}")
if file_ext in [".xlsx", ".xls"]:
df_cleaned.to_excel(
output_path,
index=False,
sheet_name=in_excel_tabular_sheets if in_excel_tabular_sheets else [],
)
elif file_ext in [".parquet"]:
df_cleaned.to_parquet(output_path, index=False)
else:
df_cleaned.to_csv(output_path, index=False, encoding="utf-8-sig")
return output_path
except Exception as e:
print(f"Error removing duplicates from {file_path}: {e}")
raise
def run_tabular_duplicate_analysis(
files: List[str],
threshold: float,
min_words: int,
text_columns: List[str] = [],
output_folder: str = OUTPUT_FOLDER,
do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN,
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
in_excel_tabular_sheets: List[str] = [],
progress: Progress = Progress(track_tqdm=True),
) -> Tuple[pd.DataFrame, List[str], Dict[str, pd.DataFrame]]:
"""
Main function to run tabular duplicate analysis.
Args:
files (List[str]): List of file paths
threshold (float): Similarity threshold
min_words (int): Minimum word count
text_columns (List[str], optional): Specific columns to analyze
output_folder (str, optional): Output folder for results
progress (Progress): Progress tracking
Returns:
Tuple containing results DataFrame, output paths, and full data by file
"""
return find_duplicate_cells_in_tabular_data(
input_files=files,
similarity_threshold=threshold,
min_word_count=min_words,
text_columns=text_columns if text_columns else [],
output_folder=output_folder,
do_initial_clean_dup=do_initial_clean_dup,
in_excel_tabular_sheets=(
in_excel_tabular_sheets if in_excel_tabular_sheets else []
),
remove_duplicate_rows=remove_duplicate_rows,
)
# Function to update column choices when files are uploaded
def update_tabular_column_choices(files, in_excel_tabular_sheets: List[str] = []):
if not files:
return gr.update(choices=[])
all_columns = set()
for file in files:
try:
file_extension = os.path.splitext(file.name)[-1]
if file_extension in [".xlsx", ".xls"]:
for sheet_name in in_excel_tabular_sheets:
df = read_file(file.name, excel_sheet_name=sheet_name)
text_cols = df.select_dtypes(
include=["object", "string"]
).columns.tolist()
all_columns.update(text_cols)
else:
df = read_file(file.name)
text_cols = df.select_dtypes(
include=["object", "string"]
).columns.tolist()
all_columns.update(text_cols)
# Get text columns
text_cols = df.select_dtypes(include=["object", "string"]).columns.tolist()
all_columns.update(text_cols)
except Exception as e:
print(f"Error reading {file.name}: {e}")
continue
return gr.Dropdown(choices=sorted(list(all_columns)))
# Function to handle tabular duplicate detection
def run_tabular_duplicate_detection(
files,
threshold,
min_words,
text_columns,
output_folder: str = OUTPUT_FOLDER,
do_initial_clean_dup: bool = DO_INITIAL_TABULAR_DATA_CLEAN,
in_excel_tabular_sheets: List[str] = [],
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
):
if not files:
print("No files uploaded")
return pd.DataFrame(), [], gr.Dropdown(choices=[]), 0, "deduplicate"
start_time = time.time()
task_textbox = "deduplicate"
# If output folder doesn't end with a forward slash, add one
if not output_folder.endswith("/"):
output_folder = output_folder + "/"
file_paths = list()
if isinstance(files, str):
# If 'files' is a single string, treat it as a list with one element
file_paths.append(files)
elif isinstance(files, list):
# If 'files' is a list, iterate through its elements
for f_item in files:
if isinstance(f_item, str):
# If an element is a string, it's a direct file path
file_paths.append(f_item)
elif hasattr(f_item, "name"):
# If an element has a '.name' attribute (e.g., a Gradio File object), use its name
file_paths.append(f_item.name)
else:
# Log a warning for unexpected element types within the list
print(
f"Warning: Skipping an element in 'files' list that is neither a string nor has a '.name' attribute: {type(f_item)}"
)
elif hasattr(files, "name"):
# Handle the case where a single file object (e.g., gr.File) is passed directly, not in a list
file_paths.append(files.name)
else:
# Raise an error for any other unexpected type of the 'files' argument itself
raise TypeError(
f"Unexpected type for 'files' argument: {type(files)}. Expected str, list of str/file objects, or a single file object."
)
if len(file_paths) > MAX_SIMULTANEOUS_FILES:
out_message = f"Number of files to deduplicate is greater than {MAX_SIMULTANEOUS_FILES}. Please submit a smaller number of files."
print(out_message)
raise Exception(out_message)
results_df, output_paths, full_data = run_tabular_duplicate_analysis(
files=file_paths,
threshold=threshold,
min_words=min_words,
text_columns=text_columns if text_columns else [],
output_folder=output_folder,
do_initial_clean_dup=do_initial_clean_dup,
in_excel_tabular_sheets=(
in_excel_tabular_sheets if in_excel_tabular_sheets else None
),
remove_duplicate_rows=remove_duplicate_rows,
)
# Update file choices for cleaning
file_choices = list(set([f for f in file_paths]))
end_time = time.time()
processing_time = round(end_time - start_time, 2)
return (
results_df,
output_paths,
gr.Dropdown(choices=file_choices),
processing_time,
task_textbox,
)
# Function to handle row selection for preview
def handle_tabular_row_selection(results_df, evt: gr.SelectData):
if not evt:
return None, "", ""
if not isinstance(results_df, pd.DataFrame):
return None, "", ""
elif results_df.empty:
return None, "", ""
selected_index = evt.index[0]
if selected_index >= len(results_df):
return None, "", ""
row = results_df.iloc[selected_index]
return selected_index, row["Text1"], row["Text2"]
# Function to clean duplicates from selected file
def clean_tabular_duplicates(
file_name,
results_df,
output_folder,
in_excel_tabular_sheets: str = "",
remove_duplicate_rows: bool = REMOVE_DUPLICATE_ROWS,
):
if not file_name or results_df.empty:
return None
# Get duplicate rows for this file
file_duplicates = results_df[results_df["File2"] == file_name]["Row2"].tolist()
if not file_duplicates:
return None
try:
# Find the original file path
# This is a simplified approach - in practice you might want to store file paths
cleaned_file = remove_duplicate_rows_from_tabular_data(
file_path=file_name,
duplicate_rows=file_duplicates,
output_folder=output_folder,
in_excel_tabular_sheets=in_excel_tabular_sheets,
remove_duplicate_rows=remove_duplicate_rows,
)
return cleaned_file
except Exception as e:
print(f"Error cleaning duplicates: {e}")
return None