open-webui-rag-system / document_processor_image_test.py
hugging2021's picture
Upload folder using huggingface_hub
5f3b20a verified
raw
history blame
16.9 kB
import os
import re
import glob
import time
from collections import defaultdict
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
# PyMuPDF ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
try:
import fitz # PyMuPDF
PYMUPDF_AVAILABLE = True
print("โœ… PyMuPDF ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์‚ฌ์šฉ ๊ฐ€๋Šฅ")
except ImportError:
PYMUPDF_AVAILABLE = False
print("โš ๏ธ PyMuPDF ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๊ฐ€ ์„ค์น˜๋˜์ง€ ์•Š์Œ. pip install PyMuPDF๋กœ ์„ค์น˜ํ•˜์„ธ์š”.")
# PDF ์ฒ˜๋ฆฌ์šฉ
import pytesseract
from PIL import Image
from pdf2image import convert_from_path
import pdfplumber
from pymupdf4llm import LlamaMarkdownReader
# --------------------------------
# ๋กœ๊ทธ ์ถœ๋ ฅ
# --------------------------------
def log(msg):
print(f"[{time.strftime('%H:%M:%S')}] {msg}")
# --------------------------------
# ํ…์ŠคํŠธ ์ •์ œ ํ•จ์ˆ˜
# --------------------------------
def clean_text(text):
return re.sub(r"[^\uAC00-\uD7A3\u1100-\u11FF\u3130-\u318F\w\s.,!?\"'()$:\-]", "", text)
def apply_corrections(text):
corrections = {
'ยบยฉ': '์ •๋ณด', 'รŒ': '์˜', 'ยฝ': '์šด์˜', 'รƒ': '', 'ยฉ': '',
'รขโ‚ฌโ„ข': "'", 'รขโ‚ฌล“': '"', 'รขโ‚ฌ': '"'
}
for k, v in corrections.items():
text = text.replace(k, v)
return text
# --------------------------------
# HWPX ์ฒ˜๋ฆฌ (์„น์…˜๋ณ„ ์ฒ˜๋ฆฌ๋งŒ ์‚ฌ์šฉ)
# --------------------------------
def load_hwpx(file_path):
"""HWPX ํŒŒ์ผ ๋กœ๋”ฉ (XML ํŒŒ์‹ฑ ๋ฐฉ์‹๋งŒ ์‚ฌ์šฉ)"""
import zipfile
import xml.etree.ElementTree as ET
import chardet
log(f"๐Ÿ“ฅ HWPX ์„น์…˜๋ณ„ ์ฒ˜๋ฆฌ ์‹œ์ž‘: {file_path}")
start = time.time()
documents = []
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
section_files = [f for f in file_list
if f.startswith('Contents/section') and f.endswith('.xml')]
section_files.sort() # section0.xml, section1.xml ์ˆœ์„œ๋กœ ์ •๋ ฌ
log(f"๐Ÿ“„ ๋ฐœ๊ฒฌ๋œ ์„น์…˜ ํŒŒ์ผ: {len(section_files)}๊ฐœ")
for section_idx, section_file in enumerate(section_files):
with zip_ref.open(section_file) as xml_file:
raw = xml_file.read()
encoding = chardet.detect(raw)['encoding'] or 'utf-8'
try:
text = raw.decode(encoding)
except UnicodeDecodeError:
text = raw.decode("cp949", errors="replace")
tree = ET.ElementTree(ET.fromstring(text))
root = tree.getroot()
# ๋„ค์ž„์ŠคํŽ˜์ด์Šค ์—†์ด ํ…์ŠคํŠธ ์ฐพ๊ธฐ
t_elements = [elem for elem in root.iter() if elem.tag.endswith('}t') or elem.tag == 't']
body_text = ""
for elem in t_elements:
if elem.text:
body_text += clean_text(elem.text) + " "
# page ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋Š” ๋นˆ ๊ฐ’์œผ๋กœ ์„ค์ •
page_value = ""
if body_text.strip():
documents.append(Document(
page_content=apply_corrections(body_text),
metadata={
"source": file_path,
"filename": os.path.basename(file_path),
"type": "hwpx_body",
"page": page_value,
"total_sections": len(section_files)
}
))
log(f"โœ… ์„น์…˜ ํ…์ŠคํŠธ ์ถ”์ถœ ์™„๋ฃŒ (chars: {len(body_text)})")
# ํ‘œ ์ฐพ๊ธฐ
table_elements = [elem for elem in root.iter() if elem.tag.endswith('}table') or elem.tag == 'table']
if table_elements:
table_text = ""
for table_idx, table in enumerate(table_elements):
table_text += f"[Table {table_idx + 1}]\n"
rows = [elem for elem in table.iter() if elem.tag.endswith('}tr') or elem.tag == 'tr']
for row in rows:
row_text = []
cells = [elem for elem in row.iter() if elem.tag.endswith('}tc') or elem.tag == 'tc']
for cell in cells:
cell_texts = []
for t_elem in cell.iter():
if (t_elem.tag.endswith('}t') or t_elem.tag == 't') and t_elem.text:
cell_texts.append(clean_text(t_elem.text))
row_text.append(" ".join(cell_texts))
if row_text:
table_text += "\t".join(row_text) + "\n"
if table_text.strip():
documents.append(Document(
page_content=apply_corrections(table_text),
metadata={
"source": file_path,
"filename": os.path.basename(file_path),
"type": "hwpx_table",
"page": page_value,
"total_sections": len(section_files)
}
))
log(f"๐Ÿ“Š ํ‘œ ์ถ”์ถœ ์™„๋ฃŒ")
# ์ด๋ฏธ์ง€ ์ฐพ๊ธฐ
if [elem for elem in root.iter() if elem.tag.endswith('}picture') or elem.tag == 'picture']:
documents.append(Document(
page_content="[์ด๋ฏธ์ง€ ํฌํ•จ]",
metadata={
"source": file_path,
"filename": os.path.basename(file_path),
"type": "hwpx_image",
"page": page_value,
"total_sections": len(section_files)
}
))
log(f"๐Ÿ–ผ๏ธ ์ด๋ฏธ์ง€ ๋ฐœ๊ฒฌ")
except Exception as e:
log(f"โŒ HWPX ์ฒ˜๋ฆฌ ์˜ค๋ฅ˜: {e}")
duration = time.time() - start
# ๋ฌธ์„œ ์ •๋ณด ์š”์•ฝ ์ถœ๋ ฅ
if documents:
log(f"๐Ÿ“‹ ์ถ”์ถœ๋œ ๋ฌธ์„œ ์ˆ˜: {len(documents)}")
log(f"โœ… HWPX ์ฒ˜๋ฆฌ ์™„๋ฃŒ: {file_path} โฑ๏ธ {duration:.2f}์ดˆ, ์ด {len(documents)}๊ฐœ ๋ฌธ์„œ")
return documents
# --------------------------------
# PDF ์ฒ˜๋ฆฌ ํ•จ์ˆ˜๋“ค (๊ธฐ์กด๊ณผ ๋™์ผ)
# --------------------------------
def run_ocr_on_image(image: Image.Image, lang='kor+eng'):
return pytesseract.image_to_string(image, lang=lang)
def extract_images_with_ocr(pdf_path, lang='kor+eng'):
try:
images = convert_from_path(pdf_path)
page_ocr_data = {}
for idx, img in enumerate(images):
page_num = idx + 1
text = run_ocr_on_image(img, lang=lang)
if text.strip():
page_ocr_data[page_num] = text.strip()
return page_ocr_data
except Exception as e:
print(f"โŒ ์ด๋ฏธ์ง€ OCR ์‹คํŒจ: {e}")
return {}
def extract_tables_with_pdfplumber(pdf_path):
page_table_data = {}
try:
with pdfplumber.open(pdf_path) as pdf:
for i, page in enumerate(pdf.pages):
page_num = i + 1
tables = page.extract_tables()
table_text = ""
for t_index, table in enumerate(tables):
if table:
table_text += f"[Table {t_index+1}]\n"
for row in table:
row_text = "\t".join(cell if cell else "" for cell in row)
table_text += row_text + "\n"
if table_text.strip():
page_table_data[page_num] = table_text.strip()
return page_table_data
except Exception as e:
print(f"โŒ ํ‘œ ์ถ”์ถœ ์‹คํŒจ: {e}")
return {}
def extract_body_text_with_pages(pdf_path):
page_body_data = {}
try:
pdf_processor = LlamaMarkdownReader()
docs = pdf_processor.load_data(file_path=pdf_path)
combined_text = ""
for d in docs:
if isinstance(d, dict) and "text" in d:
combined_text += d["text"]
elif hasattr(d, "text"):
combined_text += d.text
if combined_text.strip():
chars_per_page = 2000
start = 0
page_num = 1
while start < len(combined_text):
end = start + chars_per_page
if end > len(combined_text):
end = len(combined_text)
page_text = combined_text[start:end]
if page_text.strip():
page_body_data[page_num] = page_text.strip()
page_num += 1
if end == len(combined_text):
break
start = end - 100
except Exception as e:
print(f"โŒ ๋ณธ๋ฌธ ์ถ”์ถœ ์‹คํŒจ: {e}")
return page_body_data
def load_pdf_with_metadata(pdf_path):
"""PDF ํŒŒ์ผ์—์„œ ํŽ˜์ด์ง€๋ณ„ ์ •๋ณด๋ฅผ ์ถ”์ถœ"""
log(f"๐Ÿ“‘ PDF ํŽ˜์ด์ง€๋ณ„ ์ฒ˜๋ฆฌ ์‹œ์ž‘: {pdf_path}")
start = time.time()
# ๋จผ์ € PyPDFLoader๋กœ ์‹ค์ œ ํŽ˜์ด์ง€ ์ˆ˜ ํ™•์ธ
try:
from langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader(pdf_path)
pdf_pages = loader.load()
actual_total_pages = len(pdf_pages)
log(f"๐Ÿ“„ PyPDFLoader๋กœ ํ™•์ธํ•œ ์‹ค์ œ ํŽ˜์ด์ง€ ์ˆ˜: {actual_total_pages}")
except Exception as e:
log(f"โŒ PyPDFLoader ํŽ˜์ด์ง€ ์ˆ˜ ํ™•์ธ ์‹คํŒจ: {e}")
actual_total_pages = 1
try:
page_tables = extract_tables_with_pdfplumber(pdf_path)
except Exception as e:
page_tables = {}
print(f"โŒ ํ‘œ ์ถ”์ถœ ์‹คํŒจ: {e}")
try:
page_ocr = extract_images_with_ocr(pdf_path)
except Exception as e:
page_ocr = {}
print(f"โŒ ์ด๋ฏธ์ง€ OCR ์‹คํŒจ: {e}")
try:
page_body = extract_body_text_with_pages(pdf_path)
except Exception as e:
page_body = {}
print(f"โŒ ๋ณธ๋ฌธ ์ถ”์ถœ ์‹คํŒจ: {e}")
duration = time.time() - start
log(f"โœ… PDF ํŽ˜์ด์ง€๋ณ„ ์ฒ˜๋ฆฌ ์™„๋ฃŒ: {pdf_path} โฑ๏ธ {duration:.2f}์ดˆ")
# ์‹ค์ œ ํŽ˜์ด์ง€ ์ˆ˜๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์„ค์ •
all_pages = set(page_tables.keys()) | set(page_ocr.keys()) | set(page_body.keys())
if all_pages:
max_extracted_page = max(all_pages)
# ์‹ค์ œ ํŽ˜์ด์ง€ ์ˆ˜์™€ ์ถ”์ถœ๋œ ํŽ˜์ด์ง€ ์ˆ˜ ์ค‘ ํฐ ๊ฐ’ ์‚ฌ์šฉ
total_pages = max(actual_total_pages, max_extracted_page)
else:
total_pages = actual_total_pages
log(f"๐Ÿ“Š ์ตœ์ข… ์„ค์ •๋œ ์ด ํŽ˜์ด์ง€ ์ˆ˜: {total_pages}")
docs = []
for page_num in sorted(all_pages):
if page_num in page_tables and page_tables[page_num].strip():
docs.append(Document(
page_content=clean_text(apply_corrections(page_tables[page_num])),
metadata={
"source": pdf_path,
"filename": os.path.basename(pdf_path),
"type": "table",
"page": page_num,
"total_pages": total_pages
}
))
log(f"๐Ÿ“Š ํŽ˜์ด์ง€ {page_num}: ํ‘œ ์ถ”์ถœ ์™„๋ฃŒ")
if page_num in page_body and page_body[page_num].strip():
docs.append(Document(
page_content=clean_text(apply_corrections(page_body[page_num])),
metadata={
"source": pdf_path,
"filename": os.path.basename(pdf_path),
"type": "body",
"page": page_num,
"total_pages": total_pages
}
))
log(f"๐Ÿ“„ ํŽ˜์ด์ง€ {page_num}: ๋ณธ๋ฌธ ์ถ”์ถœ ์™„๋ฃŒ")
if page_num in page_ocr and page_ocr[page_num].strip():
docs.append(Document(
page_content=clean_text(apply_corrections(page_ocr[page_num])),
metadata={
"source": pdf_path,
"filename": os.path.basename(pdf_path),
"type": "ocr",
"page": page_num,
"total_pages": total_pages
}
))
log(f"๐Ÿ–ผ๏ธ ํŽ˜์ด์ง€ {page_num}: OCR ์ถ”์ถœ ์™„๋ฃŒ")
if not docs:
docs.append(Document(
page_content="[๋‚ด์šฉ ์ถ”์ถœ ์‹คํŒจ]",
metadata={
"source": pdf_path,
"filename": os.path.basename(pdf_path),
"type": "error",
"page": 1,
"total_pages": total_pages
}
))
# ํŽ˜์ด์ง€ ์ •๋ณด ์š”์•ฝ ์ถœ๋ ฅ
if docs:
page_numbers = [doc.metadata.get('page', 0) for doc in docs if doc.metadata.get('page')]
if page_numbers:
log(f"๐Ÿ“‹ ์ถ”์ถœ๋œ ํŽ˜์ด์ง€ ๋ฒ”์œ„: {min(page_numbers)} ~ {max(page_numbers)}")
log(f"๐Ÿ“Š ์ถ”์ถœ๋œ ํŽ˜์ด์ง€๋ณ„ PDF ๋ฌธ์„œ: {len(docs)}๊ฐœ (์ด {total_pages}ํŽ˜์ด์ง€)")
return docs
# --------------------------------
# ๋ฌธ์„œ ๋กœ๋”ฉ ๋ฐ ๋ถ„ํ• 
# --------------------------------
def load_documents(folder_path):
documents = []
for file in glob.glob(os.path.join(folder_path, "*.hwpx")):
log(f"๐Ÿ“„ HWPX ํŒŒ์ผ ํ™•์ธ: {file}")
docs = load_hwpx(file)
documents.extend(docs)
for file in glob.glob(os.path.join(folder_path, "*.pdf")):
log(f"๐Ÿ“„ PDF ํŒŒ์ผ ํ™•์ธ: {file}")
documents.extend(load_pdf_with_metadata(file))
log(f"๐Ÿ“š ๋ฌธ์„œ ๋กœ๋”ฉ ์ „์ฒด ์™„๋ฃŒ! ์ด ๋ฌธ์„œ ์ˆ˜: {len(documents)}")
return documents
def split_documents(documents, chunk_size=800, chunk_overlap=100):
log("๐Ÿ”ช ์ฒญํฌ ๋ถ„ํ•  ์‹œ์ž‘")
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len
)
chunks = []
for doc in documents:
split = splitter.split_text(doc.page_content)
for i, chunk in enumerate(split):
enriched_chunk = f"passage: {chunk}"
chunks.append(Document(
page_content=enriched_chunk,
metadata={**doc.metadata, "chunk_index": i}
))
log(f"โœ… ์ฒญํฌ ๋ถ„ํ•  ์™„๋ฃŒ: ์ด {len(chunks)}๊ฐœ ์ƒ์„ฑ")
return chunks
# --------------------------------
# ๋ฉ”์ธ ์‹คํ–‰
# --------------------------------
if __name__ == "__main__":
folder = "dataset_test"
log("๐Ÿš€ PyMuPDF ๊ธฐ๋ฐ˜ ๋ฌธ์„œ ์ฒ˜๋ฆฌ ์‹œ์ž‘")
docs = load_documents(folder)
log("๐Ÿ“ฆ ๋ฌธ์„œ ๋กœ๋”ฉ ์™„๋ฃŒ")
# ํŽ˜์ด์ง€ ์ •๋ณด ํ™•์ธ
log("๐Ÿ“„ ํŽ˜์ด์ง€ ์ •๋ณด ์š”์•ฝ:")
page_info = {}
for doc in docs:
source = doc.metadata.get('source', 'unknown')
page = doc.metadata.get('page', 'unknown')
doc_type = doc.metadata.get('type', 'unknown')
if source not in page_info:
page_info[source] = {'pages': set(), 'types': set()}
page_info[source]['pages'].add(page)
page_info[source]['types'].add(doc_type)
for source, info in page_info.items():
max_page = max(info['pages']) if info['pages'] and isinstance(max(info['pages']), int) else 'unknown'
log(f" ๐Ÿ“„ {os.path.basename(source)}: {max_page}ํŽ˜์ด์ง€, ํƒ€์ž…: {info['types']}")
chunks = split_documents(docs)
log("๐Ÿ’ก E5-Large-Instruct ์ž„๋ฒ ๋”ฉ ์ค€๋น„ ์ค‘")
embedding_model = HuggingFaceEmbeddings(
model_name="intfloat/e5-large-v2",
model_kwargs={"device": "cuda"}
)
vectorstore = FAISS.from_documents(chunks, embedding_model)
vectorstore.save_local("vector_db")
log(f"๐Ÿ“Š ์ „์ฒด ๋ฌธ์„œ ์ˆ˜: {len(docs)}")
log(f"๐Ÿ”— ์ฒญํฌ ์ด ์ˆ˜: {len(chunks)}")
log("โœ… FAISS ์ €์žฅ ์™„๋ฃŒ: vector_db")
# ํŽ˜์ด์ง€ ์ •๋ณด๊ฐ€ ํฌํ•จ๋œ ์ƒ˜ํ”Œ ์ถœ๋ ฅ
log("\n๐Ÿ“‹ ์‹ค์ œ ํŽ˜์ด์ง€ ์ •๋ณด ํฌํ•จ ์ƒ˜ํ”Œ:")
for i, chunk in enumerate(chunks[:5]):
meta = chunk.metadata
log(f" ์ฒญํฌ {i+1}: {meta.get('type')} | ํŽ˜์ด์ง€ {meta.get('page')} | {os.path.basename(meta.get('source', 'unknown'))}")