Spaces:
Build error
Build error
| import os | |
| import re | |
| import glob | |
| import time | |
| from collections import defaultdict | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_core.documents import Document | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| # PyMuPDF ๋ผ์ด๋ธ๋ฌ๋ฆฌ | |
| try: | |
| import fitz # PyMuPDF | |
| PYMUPDF_AVAILABLE = True | |
| print("โ PyMuPDF ๋ผ์ด๋ธ๋ฌ๋ฆฌ ์ฌ์ฉ ๊ฐ๋ฅ") | |
| except ImportError: | |
| PYMUPDF_AVAILABLE = False | |
| print("โ ๏ธ PyMuPDF ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ์ค์น๋์ง ์์. pip install PyMuPDF๋ก ์ค์นํ์ธ์.") | |
| # PDF ์ฒ๋ฆฌ์ฉ | |
| import pytesseract | |
| from PIL import Image | |
| from pdf2image import convert_from_path | |
| import pdfplumber | |
| from pymupdf4llm import LlamaMarkdownReader | |
| # -------------------------------- | |
| # ๋ก๊ทธ ์ถ๋ ฅ | |
| # -------------------------------- | |
| def log(msg): | |
| print(f"[{time.strftime('%H:%M:%S')}] {msg}") | |
| # -------------------------------- | |
| # ํ ์คํธ ์ ์ ํจ์ | |
| # -------------------------------- | |
| def clean_text(text): | |
| return re.sub(r"[^\uAC00-\uD7A3\u1100-\u11FF\u3130-\u318F\w\s.,!?\"'()$:\-]", "", text) | |
| def apply_corrections(text): | |
| corrections = { | |
| 'ยบยฉ': '์ ๋ณด', 'ร': '์', 'ยฝ': '์ด์', 'ร': '', 'ยฉ': '', | |
| 'รขโฌโข': "'", 'รขโฌล': '"', 'รขโฌ': '"' | |
| } | |
| for k, v in corrections.items(): | |
| text = text.replace(k, v) | |
| return text | |
| # -------------------------------- | |
| # HWPX ์ฒ๋ฆฌ (์น์ ๋ณ ์ฒ๋ฆฌ๋ง ์ฌ์ฉ) | |
| # -------------------------------- | |
| def load_hwpx(file_path): | |
| """HWPX ํ์ผ ๋ก๋ฉ (XML ํ์ฑ ๋ฐฉ์๋ง ์ฌ์ฉ)""" | |
| import zipfile | |
| import xml.etree.ElementTree as ET | |
| import chardet | |
| log(f"๐ฅ HWPX ์น์ ๋ณ ์ฒ๋ฆฌ ์์: {file_path}") | |
| start = time.time() | |
| documents = [] | |
| try: | |
| with zipfile.ZipFile(file_path, 'r') as zip_ref: | |
| file_list = zip_ref.namelist() | |
| section_files = [f for f in file_list | |
| if f.startswith('Contents/section') and f.endswith('.xml')] | |
| section_files.sort() # section0.xml, section1.xml ์์๋ก ์ ๋ ฌ | |
| log(f"๐ ๋ฐ๊ฒฌ๋ ์น์ ํ์ผ: {len(section_files)}๊ฐ") | |
| for section_idx, section_file in enumerate(section_files): | |
| with zip_ref.open(section_file) as xml_file: | |
| raw = xml_file.read() | |
| encoding = chardet.detect(raw)['encoding'] or 'utf-8' | |
| try: | |
| text = raw.decode(encoding) | |
| except UnicodeDecodeError: | |
| text = raw.decode("cp949", errors="replace") | |
| tree = ET.ElementTree(ET.fromstring(text)) | |
| root = tree.getroot() | |
| # ๋ค์์คํ์ด์ค ์์ด ํ ์คํธ ์ฐพ๊ธฐ | |
| t_elements = [elem for elem in root.iter() if elem.tag.endswith('}t') or elem.tag == 't'] | |
| body_text = "" | |
| for elem in t_elements: | |
| if elem.text: | |
| body_text += clean_text(elem.text) + " " | |
| # page ๋ฉํ๋ฐ์ดํฐ๋ ๋น ๊ฐ์ผ๋ก ์ค์ | |
| page_value = "" | |
| if body_text.strip(): | |
| documents.append(Document( | |
| page_content=apply_corrections(body_text), | |
| metadata={ | |
| "source": file_path, | |
| "filename": os.path.basename(file_path), | |
| "type": "hwpx_body", | |
| "page": page_value, | |
| "total_sections": len(section_files) | |
| } | |
| )) | |
| log(f"โ ์น์ ํ ์คํธ ์ถ์ถ ์๋ฃ (chars: {len(body_text)})") | |
| # ํ ์ฐพ๊ธฐ | |
| table_elements = [elem for elem in root.iter() if elem.tag.endswith('}table') or elem.tag == 'table'] | |
| if table_elements: | |
| table_text = "" | |
| for table_idx, table in enumerate(table_elements): | |
| table_text += f"[Table {table_idx + 1}]\n" | |
| rows = [elem for elem in table.iter() if elem.tag.endswith('}tr') or elem.tag == 'tr'] | |
| for row in rows: | |
| row_text = [] | |
| cells = [elem for elem in row.iter() if elem.tag.endswith('}tc') or elem.tag == 'tc'] | |
| for cell in cells: | |
| cell_texts = [] | |
| for t_elem in cell.iter(): | |
| if (t_elem.tag.endswith('}t') or t_elem.tag == 't') and t_elem.text: | |
| cell_texts.append(clean_text(t_elem.text)) | |
| row_text.append(" ".join(cell_texts)) | |
| if row_text: | |
| table_text += "\t".join(row_text) + "\n" | |
| if table_text.strip(): | |
| documents.append(Document( | |
| page_content=apply_corrections(table_text), | |
| metadata={ | |
| "source": file_path, | |
| "filename": os.path.basename(file_path), | |
| "type": "hwpx_table", | |
| "page": page_value, | |
| "total_sections": len(section_files) | |
| } | |
| )) | |
| log(f"๐ ํ ์ถ์ถ ์๋ฃ") | |
| # ์ด๋ฏธ์ง ์ฐพ๊ธฐ | |
| if [elem for elem in root.iter() if elem.tag.endswith('}picture') or elem.tag == 'picture']: | |
| documents.append(Document( | |
| page_content="[์ด๋ฏธ์ง ํฌํจ]", | |
| metadata={ | |
| "source": file_path, | |
| "filename": os.path.basename(file_path), | |
| "type": "hwpx_image", | |
| "page": page_value, | |
| "total_sections": len(section_files) | |
| } | |
| )) | |
| log(f"๐ผ๏ธ ์ด๋ฏธ์ง ๋ฐ๊ฒฌ") | |
| except Exception as e: | |
| log(f"โ HWPX ์ฒ๋ฆฌ ์ค๋ฅ: {e}") | |
| duration = time.time() - start | |
| # ๋ฌธ์ ์ ๋ณด ์์ฝ ์ถ๋ ฅ | |
| if documents: | |
| log(f"๐ ์ถ์ถ๋ ๋ฌธ์ ์: {len(documents)}") | |
| log(f"โ HWPX ์ฒ๋ฆฌ ์๋ฃ: {file_path} โฑ๏ธ {duration:.2f}์ด, ์ด {len(documents)}๊ฐ ๋ฌธ์") | |
| return documents | |
| # -------------------------------- | |
| # PDF ์ฒ๋ฆฌ ํจ์๋ค (๊ธฐ์กด๊ณผ ๋์ผ) | |
| # -------------------------------- | |
| def run_ocr_on_image(image: Image.Image, lang='kor+eng'): | |
| return pytesseract.image_to_string(image, lang=lang) | |
| def extract_images_with_ocr(pdf_path, lang='kor+eng'): | |
| try: | |
| images = convert_from_path(pdf_path) | |
| page_ocr_data = {} | |
| for idx, img in enumerate(images): | |
| page_num = idx + 1 | |
| text = run_ocr_on_image(img, lang=lang) | |
| if text.strip(): | |
| page_ocr_data[page_num] = text.strip() | |
| return page_ocr_data | |
| except Exception as e: | |
| print(f"โ ์ด๋ฏธ์ง OCR ์คํจ: {e}") | |
| return {} | |
| def extract_tables_with_pdfplumber(pdf_path): | |
| page_table_data = {} | |
| try: | |
| with pdfplumber.open(pdf_path) as pdf: | |
| for i, page in enumerate(pdf.pages): | |
| page_num = i + 1 | |
| tables = page.extract_tables() | |
| table_text = "" | |
| for t_index, table in enumerate(tables): | |
| if table: | |
| table_text += f"[Table {t_index+1}]\n" | |
| for row in table: | |
| row_text = "\t".join(cell if cell else "" for cell in row) | |
| table_text += row_text + "\n" | |
| if table_text.strip(): | |
| page_table_data[page_num] = table_text.strip() | |
| return page_table_data | |
| except Exception as e: | |
| print(f"โ ํ ์ถ์ถ ์คํจ: {e}") | |
| return {} | |
| def extract_body_text_with_pages(pdf_path): | |
| page_body_data = {} | |
| try: | |
| pdf_processor = LlamaMarkdownReader() | |
| docs = pdf_processor.load_data(file_path=pdf_path) | |
| combined_text = "" | |
| for d in docs: | |
| if isinstance(d, dict) and "text" in d: | |
| combined_text += d["text"] | |
| elif hasattr(d, "text"): | |
| combined_text += d.text | |
| if combined_text.strip(): | |
| chars_per_page = 2000 | |
| start = 0 | |
| page_num = 1 | |
| while start < len(combined_text): | |
| end = start + chars_per_page | |
| if end > len(combined_text): | |
| end = len(combined_text) | |
| page_text = combined_text[start:end] | |
| if page_text.strip(): | |
| page_body_data[page_num] = page_text.strip() | |
| page_num += 1 | |
| if end == len(combined_text): | |
| break | |
| start = end - 100 | |
| except Exception as e: | |
| print(f"โ ๋ณธ๋ฌธ ์ถ์ถ ์คํจ: {e}") | |
| return page_body_data | |
| def load_pdf_with_metadata(pdf_path): | |
| """PDF ํ์ผ์์ ํ์ด์ง๋ณ ์ ๋ณด๋ฅผ ์ถ์ถ""" | |
| log(f"๐ PDF ํ์ด์ง๋ณ ์ฒ๋ฆฌ ์์: {pdf_path}") | |
| start = time.time() | |
| # ๋จผ์ PyPDFLoader๋ก ์ค์ ํ์ด์ง ์ ํ์ธ | |
| try: | |
| from langchain_community.document_loaders import PyPDFLoader | |
| loader = PyPDFLoader(pdf_path) | |
| pdf_pages = loader.load() | |
| actual_total_pages = len(pdf_pages) | |
| log(f"๐ PyPDFLoader๋ก ํ์ธํ ์ค์ ํ์ด์ง ์: {actual_total_pages}") | |
| except Exception as e: | |
| log(f"โ PyPDFLoader ํ์ด์ง ์ ํ์ธ ์คํจ: {e}") | |
| actual_total_pages = 1 | |
| try: | |
| page_tables = extract_tables_with_pdfplumber(pdf_path) | |
| except Exception as e: | |
| page_tables = {} | |
| print(f"โ ํ ์ถ์ถ ์คํจ: {e}") | |
| try: | |
| page_ocr = extract_images_with_ocr(pdf_path) | |
| except Exception as e: | |
| page_ocr = {} | |
| print(f"โ ์ด๋ฏธ์ง OCR ์คํจ: {e}") | |
| try: | |
| page_body = extract_body_text_with_pages(pdf_path) | |
| except Exception as e: | |
| page_body = {} | |
| print(f"โ ๋ณธ๋ฌธ ์ถ์ถ ์คํจ: {e}") | |
| duration = time.time() - start | |
| log(f"โ PDF ํ์ด์ง๋ณ ์ฒ๋ฆฌ ์๋ฃ: {pdf_path} โฑ๏ธ {duration:.2f}์ด") | |
| # ์ค์ ํ์ด์ง ์๋ฅผ ๊ธฐ์ค์ผ๋ก ์ค์ | |
| all_pages = set(page_tables.keys()) | set(page_ocr.keys()) | set(page_body.keys()) | |
| if all_pages: | |
| max_extracted_page = max(all_pages) | |
| # ์ค์ ํ์ด์ง ์์ ์ถ์ถ๋ ํ์ด์ง ์ ์ค ํฐ ๊ฐ ์ฌ์ฉ | |
| total_pages = max(actual_total_pages, max_extracted_page) | |
| else: | |
| total_pages = actual_total_pages | |
| log(f"๐ ์ต์ข ์ค์ ๋ ์ด ํ์ด์ง ์: {total_pages}") | |
| docs = [] | |
| for page_num in sorted(all_pages): | |
| if page_num in page_tables and page_tables[page_num].strip(): | |
| docs.append(Document( | |
| page_content=clean_text(apply_corrections(page_tables[page_num])), | |
| metadata={ | |
| "source": pdf_path, | |
| "filename": os.path.basename(pdf_path), | |
| "type": "table", | |
| "page": page_num, | |
| "total_pages": total_pages | |
| } | |
| )) | |
| log(f"๐ ํ์ด์ง {page_num}: ํ ์ถ์ถ ์๋ฃ") | |
| if page_num in page_body and page_body[page_num].strip(): | |
| docs.append(Document( | |
| page_content=clean_text(apply_corrections(page_body[page_num])), | |
| metadata={ | |
| "source": pdf_path, | |
| "filename": os.path.basename(pdf_path), | |
| "type": "body", | |
| "page": page_num, | |
| "total_pages": total_pages | |
| } | |
| )) | |
| log(f"๐ ํ์ด์ง {page_num}: ๋ณธ๋ฌธ ์ถ์ถ ์๋ฃ") | |
| if page_num in page_ocr and page_ocr[page_num].strip(): | |
| docs.append(Document( | |
| page_content=clean_text(apply_corrections(page_ocr[page_num])), | |
| metadata={ | |
| "source": pdf_path, | |
| "filename": os.path.basename(pdf_path), | |
| "type": "ocr", | |
| "page": page_num, | |
| "total_pages": total_pages | |
| } | |
| )) | |
| log(f"๐ผ๏ธ ํ์ด์ง {page_num}: OCR ์ถ์ถ ์๋ฃ") | |
| if not docs: | |
| docs.append(Document( | |
| page_content="[๋ด์ฉ ์ถ์ถ ์คํจ]", | |
| metadata={ | |
| "source": pdf_path, | |
| "filename": os.path.basename(pdf_path), | |
| "type": "error", | |
| "page": 1, | |
| "total_pages": total_pages | |
| } | |
| )) | |
| # ํ์ด์ง ์ ๋ณด ์์ฝ ์ถ๋ ฅ | |
| if docs: | |
| page_numbers = [doc.metadata.get('page', 0) for doc in docs if doc.metadata.get('page')] | |
| if page_numbers: | |
| log(f"๐ ์ถ์ถ๋ ํ์ด์ง ๋ฒ์: {min(page_numbers)} ~ {max(page_numbers)}") | |
| log(f"๐ ์ถ์ถ๋ ํ์ด์ง๋ณ PDF ๋ฌธ์: {len(docs)}๊ฐ (์ด {total_pages}ํ์ด์ง)") | |
| return docs | |
| # -------------------------------- | |
| # ๋ฌธ์ ๋ก๋ฉ ๋ฐ ๋ถํ | |
| # -------------------------------- | |
| def load_documents(folder_path): | |
| documents = [] | |
| for file in glob.glob(os.path.join(folder_path, "*.hwpx")): | |
| log(f"๐ HWPX ํ์ผ ํ์ธ: {file}") | |
| docs = load_hwpx(file) | |
| documents.extend(docs) | |
| for file in glob.glob(os.path.join(folder_path, "*.pdf")): | |
| log(f"๐ PDF ํ์ผ ํ์ธ: {file}") | |
| documents.extend(load_pdf_with_metadata(file)) | |
| log(f"๐ ๋ฌธ์ ๋ก๋ฉ ์ ์ฒด ์๋ฃ! ์ด ๋ฌธ์ ์: {len(documents)}") | |
| return documents | |
| def split_documents(documents, chunk_size=800, chunk_overlap=100): | |
| log("๐ช ์ฒญํฌ ๋ถํ ์์") | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=chunk_size, | |
| chunk_overlap=chunk_overlap, | |
| length_function=len | |
| ) | |
| chunks = [] | |
| for doc in documents: | |
| split = splitter.split_text(doc.page_content) | |
| for i, chunk in enumerate(split): | |
| enriched_chunk = f"passage: {chunk}" | |
| chunks.append(Document( | |
| page_content=enriched_chunk, | |
| metadata={**doc.metadata, "chunk_index": i} | |
| )) | |
| log(f"โ ์ฒญํฌ ๋ถํ ์๋ฃ: ์ด {len(chunks)}๊ฐ ์์ฑ") | |
| return chunks | |
| # -------------------------------- | |
| # ๋ฉ์ธ ์คํ | |
| # -------------------------------- | |
| if __name__ == "__main__": | |
| folder = "dataset_test" | |
| log("๐ PyMuPDF ๊ธฐ๋ฐ ๋ฌธ์ ์ฒ๋ฆฌ ์์") | |
| docs = load_documents(folder) | |
| log("๐ฆ ๋ฌธ์ ๋ก๋ฉ ์๋ฃ") | |
| # ํ์ด์ง ์ ๋ณด ํ์ธ | |
| log("๐ ํ์ด์ง ์ ๋ณด ์์ฝ:") | |
| page_info = {} | |
| for doc in docs: | |
| source = doc.metadata.get('source', 'unknown') | |
| page = doc.metadata.get('page', 'unknown') | |
| doc_type = doc.metadata.get('type', 'unknown') | |
| if source not in page_info: | |
| page_info[source] = {'pages': set(), 'types': set()} | |
| page_info[source]['pages'].add(page) | |
| page_info[source]['types'].add(doc_type) | |
| for source, info in page_info.items(): | |
| max_page = max(info['pages']) if info['pages'] and isinstance(max(info['pages']), int) else 'unknown' | |
| log(f" ๐ {os.path.basename(source)}: {max_page}ํ์ด์ง, ํ์ : {info['types']}") | |
| chunks = split_documents(docs) | |
| log("๐ก E5-Large-Instruct ์๋ฒ ๋ฉ ์ค๋น ์ค") | |
| embedding_model = HuggingFaceEmbeddings( | |
| model_name="intfloat/e5-large-v2", | |
| model_kwargs={"device": "cuda"} | |
| ) | |
| vectorstore = FAISS.from_documents(chunks, embedding_model) | |
| vectorstore.save_local("vector_db") | |
| log(f"๐ ์ ์ฒด ๋ฌธ์ ์: {len(docs)}") | |
| log(f"๐ ์ฒญํฌ ์ด ์: {len(chunks)}") | |
| log("โ FAISS ์ ์ฅ ์๋ฃ: vector_db") | |
| # ํ์ด์ง ์ ๋ณด๊ฐ ํฌํจ๋ ์ํ ์ถ๋ ฅ | |
| log("\n๐ ์ค์ ํ์ด์ง ์ ๋ณด ํฌํจ ์ํ:") | |
| for i, chunk in enumerate(chunks[:5]): | |
| meta = chunk.metadata | |
| log(f" ์ฒญํฌ {i+1}: {meta.get('type')} | ํ์ด์ง {meta.get('page')} | {os.path.basename(meta.get('source', 'unknown'))}") | |