""" PLOBIN """ import difflib import streamlit as st import streamlit.components.v1 as components import fitz # PyMuPDF import chromadb from sentence_transformers import SentenceTransformer, util import requests import os import re import shutil from collections import Counter import numpy as np from typing import List, Dict, Tuple import base64 from dotenv import load_dotenv import json from difflib import SequenceMatcher import pdfplumber def get_svg_content(svg_path): with open(svg_path, "r", encoding="utf-8") as f: return f.read() plobin_logo_svg = get_svg_content("img/plobin.svg") load_dotenv() GROK_API_KEY = os.getenv("GROK_API_KEY") GROK_API_BASE = "https://api.x.ai/v1" OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") OPENAI_API_BASE = "https://api.openai.com/v1" CHROMA_DIR = "./chroma_db" EMBEDDING_MODEL = 'jhgan/ko-sroberta-multitask' class HighlightConfig: def __init__(self): self.color = [1.0, 1.0, 0.0] st.set_page_config( page_title="PLOBIN", page_icon="img/plobin-left-only.png", layout="wide", initial_sidebar_state="expanded" ) st.markdown(""" """, unsafe_allow_html=True) SPACE_RE = re.compile(r'\s+') def normalize_for_search(text: str) -> str: """ 검색/매칭용 텍스트 정규화: - 양끝 공백 제거 - 소문자 변환 - 모든 공백 문자 제거 (띄어쓰기 차이 무시) """ text = text.strip().lower() text = SPACE_RE.sub('', text) # 모든 공백 날리기 return text def init_session(): if 'processed' not in st.session_state: st.session_state.processed = False if 'vector_db' not in st.session_state: st.session_state.vector_db = None if 'embedder' not in st.session_state: st.session_state.embedder = None if 'chat_history' not in st.session_state: st.session_state.chat_history = [] if 'doc_metadata' not in st.session_state: st.session_state.doc_metadata = {} if 'pdf_bytes' not in st.session_state: st.session_state.pdf_bytes = None if 'pdf_pages_text' not in st.session_state: st.session_state.pdf_pages_text = {} if 'current_highlights' not in st.session_state: st.session_state.current_highlights = [] if 'zoom_level' not in st.session_state: st.session_state.zoom_level = 2.0 if 'highlight_config' not in st.session_state: st.session_state.highlight_config = HighlightConfig() if 'processing_query' not in st.session_state: st.session_state.processing_query = None if 'scroll_to_page' not in st.session_state: st.session_state.scroll_to_page = None def extract_table_image_as_base64(pdf_bytes: bytes, page_num: int, bbox: tuple) -> str: """ PDF 페이지에서 표 영역을 이미지로 추출하여 base64로 인코딩 Args: pdf_bytes: PDF 바이트 데이터 page_num: 페이지 번호 (0부터 시작) bbox: (x0, y0, x1, y1) 표 영역 좌표 Returns: base64 인코딩된 이미지 문자열 """ doc = fitz.open(stream=pdf_bytes, filetype="pdf") page = doc[page_num] # bbox 영역을 이미지로 렌더링 (고해상도) rect = fitz.Rect(bbox) pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0), clip=rect) img_bytes = pix.tobytes("png") doc.close() # base64 인코딩 img_base64 = base64.b64encode(img_bytes).decode('utf-8') return img_base64 def convert_table_to_markdown_with_vision( pdf_bytes: bytes, page_num: int, bbox: tuple, api_key: str ) -> str: """ OpenAI Vision API를 사용하여 표 이미지를 마크다운으로 변환 Args: pdf_bytes: PDF 바이트 데이터 page_num: 페이지 번호 bbox: 표 영역 좌표 api_key: OpenAI API 키 Returns: 마크다운 형식의 표 """ # 표 영역 이미지 추출 img_base64 = extract_table_image_as_base64(pdf_bytes, page_num, bbox) # OpenAI Vision API 호출 prompt = """이 이미지는 PDF 문서의 표입니다. 표의 내용을 정확하게 마크다운 표 형식으로 변환해주세요. 규칙: 1. 셀 병합이 있으면 적절히 처리 2. 중첩된 표가 있으면 텍스트로 표현 3. 빈 셀은 빈 칸으로 유지 4. 표 형식만 반환 (추가 설명 없이) 마크다운 표 형식: | 열1 | 열2 | 열3 | | --- | --- | --- | | 데이터1 | 데이터2 | 데이터3 |""" try: response = requests.post( f"{OPENAI_API_BASE}/chat/completions", headers={ "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" }, json={ "model": "gpt-4o", # gpt-4o 또는 gpt-4o-mini "messages": [ { "role": "user", "content": [ { "type": "text", "text": prompt }, { "type": "image_url", "image_url": { "url": f"data:image/png;base64,{img_base64}", "detail": "high" # "low", "high", "auto" } } ] } ], "temperature": 0.1, "max_tokens": 2000 }, timeout=120 ) if response.status_code == 200: result = response.json() markdown_table = result['choices'][0]['message']['content'] # 코드블록 제거 markdown_table = re.sub(r'```markdown\s*|\s*```', '', markdown_table) markdown_table = re.sub(r'```\s*|\s*```', '', markdown_table) return markdown_table.strip() else: # 에러 상세 출력 error_detail = response.text print(f"OpenAI API 오류: {response.status_code}") print(f"상세: {error_detail}") return f"[표 변환 실패: {response.status_code} - {error_detail[:200]}]" except Exception as e: return f"[표 변환 실패: {str(e)}]" def extract_text_from_pdf(pdf_file) -> Tuple[List[str], List[Dict], bytes, Dict]: """ PDF에서 텍스트와 표를 추출 (표는 Grok Vision API로 처리) """ pdf_bytes = pdf_file.read() doc = fitz.open(stream=pdf_bytes, filetype="pdf") chunks = [] metadata_list = [] pages_text = {} CHUNK_SIZE = 800 OVERLAP_SIZE = 150 # pdfplumber로 PDF 열기 pdf_file.seek(0) with pdfplumber.open(pdf_file) as pdf_plumber: for page_num in range(len(doc)): # PyMuPDF로 텍스트 추출 fitz_page = doc[page_num] text = fitz_page.get_text("text") # pdfplumber로 표 탐지 tables_markdown = [] if page_num < len(pdf_plumber.pages): plumber_page = pdf_plumber.pages[page_num] # 표 탐지 table_settings = { "vertical_strategy": "lines", "horizontal_strategy": "lines", "snap_tolerance": 3, "join_tolerance": 3, } tables = plumber_page.find_tables(table_settings=table_settings) # 각 표를 Vision API로 처리 for idx, table in enumerate(tables): bbox = table.bbox # (x0, y0, x1, y1) # Grok Vision API로 마크다운 변환 markdown_table = convert_table_to_markdown_with_vision( pdf_bytes, page_num, bbox, OPENAI_API_KEY ) tables_markdown.append(f"\n\n**[표 {idx + 1}]**\n{markdown_table}\n") # 텍스트와 표를 결합 combined_content = text if tables_markdown: combined_content += "\n\n" + "\n".join(tables_markdown) pages_text[page_num + 1] = combined_content if not combined_content.strip(): continue # 청크로 분할 lines = [line.strip() for line in combined_content.split('\n') if line.strip()] cleaned_text = '\n'.join(lines) # 표 마커를 기준으로 분할 우선 처리 if "**[표" in cleaned_text: # 표 단위로 분할 table_pattern = r'\*\*\[표 \d+\]\*\*' parts = re.split(f'({table_pattern})', cleaned_text) current_chunk = "" for part in parts: part = part.strip() if not part: continue # 표 섹션인 경우 if re.match(table_pattern, part): if current_chunk: chunks.append(current_chunk.strip()) metadata_list.append({ "page": page_num + 1, "source": pdf_file.name, "chunk_type": "text" }) current_chunk = "" current_chunk = part else: # 표 내용이거나 일반 텍스트 if current_chunk and re.match(table_pattern, current_chunk): # 이전이 표 마커였다면 표 내용 추가 current_chunk += "\n" + part chunks.append(current_chunk.strip()) metadata_list.append({ "page": page_num + 1, "source": pdf_file.name, "chunk_type": "table" }) current_chunk = "" else: # 일반 텍스트 처리 if len(current_chunk) + len(part) > CHUNK_SIZE: if current_chunk: chunks.append(current_chunk.strip()) metadata_list.append({ "page": page_num + 1, "source": pdf_file.name, "chunk_type": "text" }) current_chunk = part else: current_chunk += "\n" + part if current_chunk else part if current_chunk: chunk_type = "table" if re.match(table_pattern, current_chunk) else "text" chunks.append(current_chunk.strip()) metadata_list.append({ "page": page_num + 1, "source": pdf_file.name, "chunk_type": chunk_type }) else: # 표가 없는 경우 일반 텍스트 처리 sentences = re.split(r'([.!?]\s+|\n{2,})', cleaned_text) sentences = [s for s in sentences if s.strip()] current_chunk = "" current_length = 0 for sentence in sentences: sentence_length = len(sentence) if current_length + sentence_length > CHUNK_SIZE and current_chunk: chunks.append(current_chunk.strip()) metadata_list.append({ "page": page_num + 1, "source": pdf_file.name, "chunk_type": "text" }) overlap_text = current_chunk[-OVERLAP_SIZE:] if len(current_chunk) > OVERLAP_SIZE else current_chunk current_chunk = overlap_text + sentence current_length = len(current_chunk) else: current_chunk += sentence current_length += sentence_length if current_chunk.strip(): chunks.append(current_chunk.strip()) metadata_list.append({ "page": page_num + 1, "source": pdf_file.name, "chunk_type": "text" }) doc.close() return chunks, metadata_list, pdf_bytes, pages_text def save_extracted_text_to_file(chunks: List[str], metadata_list: List[Dict], filename: str): """ 추출한 텍스트를 로컬 파일로 저장 """ import os from datetime import datetime # 저장 디렉토리 생성 output_dir = "extracted_text" os.makedirs(output_dir, exist_ok=True) # 파일명 생성 (타임스탬프 포함) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") base_name = os.path.splitext(filename)[0] output_file = os.path.join(output_dir, f"{base_name}_{timestamp}.txt") # 텍스트 저장 with open(output_file, 'w', encoding='utf-8') as f: f.write(f"=" * 80 + "\n") f.write(f"문서명: {filename}\n") f.write(f"추출 시간: {timestamp}\n") f.write(f"총 청크 수: {len(chunks)}\n") f.write(f"=" * 80 + "\n\n") for idx, (chunk, meta) in enumerate(zip(chunks, metadata_list), 1): f.write(f"\n{'='*80}\n") f.write(f"청크 #{idx}\n") f.write(f"페이지: {meta.get('page', 'N/A')}\n") f.write(f"타입: {meta.get('chunk_type', 'text')}\n") f.write(f"{'-'*80}\n") f.write(chunk) f.write(f"\n{'='*80}\n") return output_file @st.cache_resource(show_spinner=False) def load_embedding_model(): return SentenceTransformer(EMBEDDING_MODEL) def create_vector_db(chunks: List[str], metadata_list: List[Dict]): embedder = load_embedding_model() client = chromadb.EphemeralClient( settings=chromadb.Settings( anonymized_telemetry=False, allow_reset=True ) ) try: client.delete_collection("rfx_docs") except Exception: pass collection = client.create_collection( name="rfx_docs", metadata={"hnsw:space": "cosine"} ) batch_size = 32 all_embeddings = [] for i in range(0, len(chunks), batch_size): batch = chunks[i:i + batch_size] embeddings = embedder.encode(batch, show_progress_bar=False, convert_to_numpy=True) all_embeddings.extend(embeddings) ids = [f"doc_{i}" for i in range(len(chunks))] collection.add( embeddings=[emb.tolist() for emb in all_embeddings], documents=chunks, metadatas=metadata_list, ids=ids ) return collection, embedder def extract_keywords_semantic(text: str, embedder, top_n: int = 5) -> List[str]: words_with_numbers = re.findall(r'[가-힣]*\d+[가-힣]*', text) candidate_words = re.findall(r'[가-힣]{2,}', text) if not candidate_words: return words_with_numbers[:top_n] word_freq = Counter(candidate_words) text_embedding = embedder.encode([text], convert_to_numpy=True)[0] word_embeddings = embedder.encode(list(word_freq.keys()), convert_to_numpy=True) similarities = util.cos_sim(text_embedding, word_embeddings)[0].numpy() scored_words = [] for idx, (word, freq) in enumerate(word_freq.items()): semantic_score = similarities[idx] frequency_score = np.log1p(freq) / 10.0 combined_score = 0.7 * semantic_score + 0.3 * frequency_score scored_words.append((word, combined_score)) scored_words.sort(key=lambda x: x[1], reverse=True) result = [] for word in words_with_numbers[:3]: if word and word not in result: result.append(word) for word, score in scored_words: if word not in result: result.append(word) if len(result) >= top_n: break return result[:top_n] def hybrid_search(query: str, collection, embedder, top_k: int = 3) -> Dict: query_embedding = embedder.encode([query], convert_to_numpy=True)[0] vector_results = collection.query( query_embeddings=[query_embedding.tolist()], n_results=20, include=["documents", "metadatas", "distances"] ) keywords = extract_keywords_semantic(query, embedder, top_n=5) hybrid_results = [] for i, doc_id in enumerate(vector_results['ids'][0]): doc = vector_results['documents'][0][i] metadata = vector_results['metadatas'][0][i] vector_score = 1 - vector_results['distances'][0][i] keyword_score = 0 # 원문/정규화 둘 다 준비 doc_lower = doc.lower() doc_norm = normalize_for_search(doc) # 공백 제거 버전 for keyword in keywords: kw_lower = keyword.lower() kw_norm = normalize_for_search(keyword) # 1) 원래 방식: 그대로 포함 여부 # 2) 공백 제거 버전: 붙어 있거나 이상하게 띄어져도 매칭 가능 if kw_lower in doc_lower or kw_norm in doc_norm: keyword_score += 1 keyword_score = keyword_score / len(keywords) if keywords else 0 hybrid_score = 0.7 * vector_score + 0.3 * keyword_score hybrid_results.append({ 'id': doc_id, 'document': doc, 'metadata': metadata, 'hybrid_score': hybrid_score, 'vector_score': vector_score, 'keyword_score': keyword_score }) hybrid_results.sort(key=lambda x: x['hybrid_score'], reverse=True) top_results = hybrid_results[:top_k] return { 'documents': [[r['document'] for r in top_results]], 'metadatas': [[r['metadata'] for r in top_results]], 'scores': [r['hybrid_score'] for r in top_results], 'keywords': keywords } def grok_verify_and_extract(query: str, search_results: Dict, api_key: str) -> Dict: docs = search_results['documents'][0] metas = search_results['metadatas'][0] formatted_docs = [] for i, (doc, meta) in enumerate(zip(docs, metas), 1): formatted_docs.append(f"[문서 {i}] (페이지 {meta['page']})\n{doc}") context = "\n\n".join(formatted_docs) system_prompt = """당신은 RFx 문서 분석 전문가입니다. 주어진 3개의 문서 중에서 사용자 질문과 **가장 관련 있는 단 1개의 핵심 정보**만 선택하세요. **중요 규칙:** 1. 반드시 **1개의 텍스트**만 추출 2. 가장 직접적으로 질문에 답하는 정보 선택 3. 금액, 날짜, 수량 등 구체적인 숫자 정보 우선 4. 추출된 텍스트는 원문 그대로 유지 (150자 이내) 5. JSON 형식으로만 응답 **응답 형식:** { "selected_text": "선택된 텍스트 (원문 그대로)", "page": 페이지번호, "relevance_reason": "이 텍스트를 선택한 이유" }""" user_prompt = f"""<질문> {query} <검색된 문서들> {context} 위 3개 문서에서 질문에 가장 정확하게 답하는 **단 1개의 핵심 정보**를 JSON 형식으로 선택하세요. 선택한 텍스트는 150자 이내로 하세요.""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } payload = { "model": "grok-3", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.1, "max_tokens": 1000, "stream": False } try: response = requests.post( f"{GROK_API_BASE}/chat/completions", headers=headers, json=payload, timeout=120 ) if response.status_code != 200: return {"error": f"API 오류: {response.status_code}"} result = response.json() content = result["choices"][0]["message"]["content"] content = content.replace("```json", "").replace("```", "").strip() extracted_data = json.loads(content) return extracted_data except Exception as e: return {"error": f"오류: {str(e)}"} def build_context(search_results: Dict, max_length: int = 3000) -> str: context_parts = [] current_length = 0 docs = search_results['documents'][0] metas = search_results['metadatas'][0] for i, (doc, meta) in enumerate(zip(docs, metas), 1): part = f"[문서 {i}] (페이지 {meta['page']})\n{doc}\n" part_length = len(part) if current_length + part_length > max_length: remaining = max_length - current_length if remaining > 200: part = f"[문서 {i}] (페이지 {meta['page']})\n{doc[:remaining-50]}...\n" context_parts.append(part) break context_parts.append(part) current_length += part_length return "\n".join(context_parts) def generate_answer(query: str, search_results: Dict, api_key: str) -> str: context = build_context(search_results, max_length=4000) system_prompt = """당신은 자동차 제조업 RFx 문서 전문 분석가입니다. **산업 특화 지침:** 1. **자동차 제조업 은어·약어 해석**: 사용자의 질문에는 자동차 제조업 특유의 은어·약어·전문용어가 포함될 수 있으므로 산업 문맥에 맞게 정확히 해석하라. 2. **언어 혼용 및 비문 대응**: 사용자의 문장은 한국어와 영어가 섞이거나 문법 오류가 있을 수 있으므로 의도를 추론하여 정확히 이해하라. 3. **모호한 질문 자동 보정**: 사용자의 질문이 불완전하거나 모호해도 질문 의도를 추론하여 적절하게 재구성하라. **문서 기반 응답 원칙 (절대 추측 금지):** 1. 제공된 문서를 **매우 꼼꼼히** 읽고 정확한 정보를 찾으세요 2. **반드시 문서에서 근거를 찾아 답변**하고, 문서에 없는 내용은 임의로 추측하지 말고 **"문서에서 관련 정보를 찾을 수 없습니다"**라고 명시하라 3. **문서와 전혀 무관한 질문**(예: 점심 추천, 날씨, 일상 대화 등)은 **"죄송하지만, 제공된 문서에는 해당 질문과 관련된 정보가 포함되어 있지 않습니다."**라고만 답변하고 추가 설명 없이 종료하라 4. 문서에 정보가 있는데도 "없다"고 하지 마세요 **핵심 정보 우선 추출:** - 금액, 수량, 규격, 일정, 요구조건 등 **수치 기반 정보를 최우선**으로 식별하고 정확하게 반환하라 - 숫자, 금액, 날짜 등 구체적인 정보를 우선적으로 찾으세요 **답변 형식:** - 답변 시 반드시 **[페이지 X]** 형태로 출처를 명시하세요 - **절대 중요**: "문서 1", "문서 2" 같은 표기는 절대 사용하지 마세요 - 핵심 답변을 먼저 명확하게 제시 - 마크다운 형식으로만 답변하세요 - 질문에 따라 가장 적절한 구조로 답변하세요 (단계별, 카테고리별, 시간순 등) **원문 인용 규칙 (하이라이트용):** - 핵심 내용을 설명할 때는 큰따옴표("")로 PDF 원문을 그대로 인용하세요 - 큰따옴표 안의 내용은 PDF 원문을 **한 글자도 바꾸지 말고** 그대로 복사 - 문장 종결어("~함", "~임", "~요청함" 등)도 원문 그대로 유지 - 인용 예시: "기술평가 점수가 배점한도(100점)의 85% 이상인 자를 기술평가 적격자로 선정" [페이지 9] - 원문 인용 후 필요하면 부연 설명 추가 가능""" user_prompt = f"""다음 문서들을 매우 꼼꼼히 읽고 질문에 답변하세요. <문서> {context} <질문> {query} **답변 작성 가이드:** 1. **구조화**: 질문 유형에 맞는 가장 읽기 쉬운 구조 선택 - 절차/프로세스 질문 → 단계별 번호 (1, 2, 3...) - 항목 나열 질문 → 불릿 포인트 (• 또는 *) - 비교/선택 질문 → 카테고리별 구분 2. **원문 인용**: 핵심 내용은 큰따옴표로 PDF 원문 그대로 인용 - 예: "기술평가 적격자를 대상으로 가격 입찰을 실시하여, 한국자동차연구원의 예정가격이하 최저가격 투찰자를 낙찰자로 선정" [페이지 9] - 큰따옴표 안 = 원문 그대로 (절대 의역 금지) 3. **출처 표기**: 모든 정보에 [페이지 X] 표기 4. **형식**: 마크다운만 사용, "문서 1" 같은 표기 금지""" headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } payload = { "model": "grok-3", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], "temperature": 0.1, "max_tokens": 2000, "stream": False } try: response = requests.post( f"{GROK_API_BASE}/chat/completions", headers=headers, json=payload, timeout=120 ) if response.status_code != 200: error_detail = "" try: error_data = response.json() error_detail = error_data.get('error', {}).get('message', '') except Exception: error_detail = response.text return f"❌ API 오류 (코드: {response.status_code})\n상세: {error_detail}" result = response.json() return result["choices"][0]["message"]["content"] except Exception as e: return f"❌ 오류: {str(e)}" def highlight_text_in_pdf(pdf_bytes: bytes, highlight_info: List[Dict]) -> bytes: """ PyMuPDF 기반의 하이라이트 함수 - 전체 우선, 실패시에만 분할 """ doc = fitz.open(stream=pdf_bytes, filetype="pdf") yellow_color = [1.0, 1.0, 0.0] def normalize_text(text): """텍스트 정규화""" return re.sub(r'\s+', ' ', text.strip()) def merge_rects(rects, threshold=5): """겹치거나 인접한 사각형들을 병합""" if not rects: return [] # 사각형들을 y좌표로 정렬 sorted_rects = sorted(rects, key=lambda r: (r.y0, r.x0)) merged = [sorted_rects[0]] for rect in sorted_rects[1:]: last = merged[-1] # 같은 라인이고 x가 겹치거나 인접하면 병합 if abs(rect.y0 - last.y0) < threshold: if rect.x0 <= last.x1 + threshold: merged[-1] = fitz.Rect( min(last.x0, rect.x0), min(last.y0, rect.y0), max(last.x1, rect.x1), max(last.y1, rect.y1) ) else: merged.append(rect) # 다른 라인이지만 y가 연속되면 (줄바꿈) elif rect.y0 <= last.y1 + 20: merged.append(rect) else: merged.append(rect) return merged def find_text_across_lines(page, search_text): """줄바꿈을 넘어서 텍스트 찾기 - 공백 무시 비교""" found_rects = [] # 페이지 텍스트 구조 가져오기 blocks = page.get_text("dict")["blocks"] # 모든 라인의 텍스트와 bbox를 수집 lines_info = [] # [(text, bbox), ...] for block in blocks: if "lines" not in block: continue for line in block["lines"]: line_text = "" for span in line["spans"]: line_text += span["text"] if line_text.strip(): lines_info.append((line_text, fitz.Rect(line["bbox"]))) if not lines_info: return [] # 검색어 정규화 - 공백 완전 제거 버전 search_no_space = search_text.lower().replace(" ", "").replace("\n", "") # 연속된 라인들을 합쳐서 검색 for start_idx in range(len(lines_info)): combined_text = "" combined_bboxes = [] for end_idx in range(start_idx, min(start_idx + 5, len(lines_info))): # 최대 5줄 line_text, line_bbox = lines_info[end_idx] combined_text += line_text combined_bboxes.append(line_bbox) # 공백 제거 후 비교 (핵심!) combined_no_space = combined_text.lower().replace(" ", "").replace("\n", "") # 검색어가 포함되어 있는지 확인 if search_no_space in combined_no_space: # 매칭됨 - 해당 라인들의 bbox 반환 for bbox in combined_bboxes: found_rects.append(bbox) print(f" ✅ 라인 매칭 ({start_idx+1}~{end_idx+1}줄): {len(combined_bboxes)}개 영역") return merge_rects(found_rects) return [] def find_text_with_pymupdf(page, search_text): """PyMuPDF로 텍스트 찾기 - 정확하고 깔끔하게""" found_rects = [] search_text = search_text.strip() print(f" 검색 중...") # === 우선순위 1: PyMuPDF 기본 검색 === instances = page.search_for(search_text) if instances: print(f" ✅ 성공 [원본]: {len(instances)}개") return merge_rects(instances) # === 우선순위 2: 정규화 후 검색 === normalized = normalize_text(search_text) if normalized != search_text: instances = page.search_for(normalized) if instances: print(f" ✅ 성공 [정규화]: {len(instances)}개") return merge_rects(instances) # === 우선순위 3: 줄바꿈 넘어서 검색 (라인 매칭) === line_results = find_text_across_lines(page, search_text) if line_results: return line_results print(f" ⚠️ 라인 매칭 실패 → 핵심 구문") # === 우선순위 4: 핵심 구문만 검색 (처음 30자 + 마지막 20자) === if len(search_text) > 50: # 앞부분 front = search_text[:30] front_inst = page.search_for(front) if front_inst: print(f" ✅ 앞부분 매칭: {front[:20]}...") found_rects.extend(front_inst[:1]) # 첫 번째만 # 뒷부분 back = search_text[-20:] back_inst = page.search_for(back) if back_inst: print(f" ✅ 뒷부분 매칭: ...{back[:15]}") found_rects.extend(back_inst[:1]) # 첫 번째만 if found_rects: return merge_rects(found_rects) print(f" ⚠️ 핵심 구문 실패 → 키워드") # === 우선순위 5: 키워드 (최대 2개만) === keywords = re.findall(r'[가-힣]{10,}', search_text) if not keywords: keywords = re.findall(r'[가-힣]{7,}', search_text) if keywords: for kw in keywords[:2]: # 최대 2개만 inst = page.search_for(kw) if inst: print(f" ✅ 키워드: {kw}") found_rects.extend(inst[:1]) # 첫 번째만 if found_rects: return merge_rects(found_rects) # === 우선순위 6: 블록 === print(f" 최후: 블록") blocks = page.get_text("dict")["blocks"] search_norm = normalize_text(search_text.lower()) for block in blocks: if "lines" not in block: continue block_text = "" for line in block["lines"]: for span in line["spans"]: block_text += span["text"] + " " block_norm = normalize_text(block_text.lower()) if search_norm in block_norm: found_rects.append(fitz.Rect(block["bbox"])) print(f" ✅ 블록 일치") break return merge_rects(found_rects) if found_rects else [] print(f"\n{'='*80}") print(f"하이라이트 시작 - 총 {len(highlight_info)}개 항목") print(f"{'='*80}") total_success = 0 total_failed = 0 for idx, item in enumerate(highlight_info, 1): page_num = item['page'] - 1 text_to_highlight = item['text'].strip() if page_num >= len(doc): print(f"\n[{idx}] ❌ 페이지 오류: {page_num + 1}") total_failed += 1 continue page = doc[page_num] print(f"\n[{idx}/{len(highlight_info)}]") print(f" 📄 페이지: {page_num + 1}") print(f" 📝 길이: {len(text_to_highlight)}자") print(f" 💬 내용: {text_to_highlight[:70]}...") # 텍스트 찾기 found_rects = find_text_with_pymupdf(page, text_to_highlight) # 중복 제거 (같은 위치의 사각형) unique_rects = [] for rect in found_rects: is_duplicate = False for existing in unique_rects: # 좌표가 거의 같으면 중복으로 간주 if (abs(rect.x0 - existing.x0) < 5 and abs(rect.y0 - existing.y0) < 5 and abs(rect.x1 - existing.x1) < 5 and abs(rect.y1 - existing.y1) < 5): is_duplicate = True break if not is_duplicate: unique_rects.append(rect) # 하이라이트 추가 highlighted_count = 0 for rect in unique_rects: try: highlight = page.add_highlight_annot(rect) highlight.set_colors(stroke=yellow_color) highlight.update() highlighted_count += 1 except Exception as e: print(f" ✗ 하이라이트 실패: {e}") if highlighted_count > 0: print(f" ✅ 완료: {highlighted_count}개 영역") total_success += 1 else: print(f" ❌ 실패: 텍스트를 찾을 수 없음") total_failed += 1 print(f"\n{'='*80}") print(f"📊 최종 결과: ✅ 성공 {total_success}개 / ❌ 실패 {total_failed}개") print(f"{'='*80}\n") output_bytes = doc.tobytes() doc.close() return output_bytes def extract_highlights_from_grok(grok_result: Dict) -> List[Dict]: if "error" in grok_result: return [] highlights = [] selected_text = grok_result.get("selected_text", "") page = grok_result.get("page", 1) if selected_text and len(selected_text) <= 150: highlights.append({ 'text': selected_text, 'page': page }) return highlights def extract_highlights_from_answer(answer: str) -> List[Dict]: """ 답변에서 하이라이트할 텍스트 추출 [페이지 X] 앞뒤 모두 해당 페이지로 간주 """ highlights = [] print(f"\n{'='*80}") print(f"답변 텍스트 분석 중...") print(f"{'='*80}\n") # [페이지 X] 패턴 찾기 page_pattern = r'\[\s*페이지\s*(\d+)\s*\]' page_matches = list(re.finditer(page_pattern, answer)) print(f"📍 [페이지] 태그 {len(page_matches)}개 발견\n") quoted_matches = [] list_matches = [] # 각 [페이지 X]에 대해 앞뒤 섹션 분석 for i, match in enumerate(page_matches): page_num = match.group(1) tag_start = match.start() tag_end = match.end() # === 섹션 1: [페이지 X] 앞 부분 (같은 단락 내) === # 이전 [페이지] 또는 줄바꿈 2개까지 section_start = 0 if i > 0: section_start = page_matches[i-1].end() # [페이지 X] 앞의 같은 단락 (줄바꿈 2개 전까지) before_section = answer[section_start:tag_start] # 마지막 불릿 포인트나 인용문 찾기 last_para_match = re.search(r'([-*○]\s+.+)$', before_section, re.DOTALL) if last_para_match: before_text = last_para_match.group(1) print(f"--- 페이지 {page_num} 앞부분 (길이: {len(before_text)}자) ---") print(f"{before_text[:150]}...\n") # 큰따옴표 인용문 추출 quotes = re.findall(r'"([^"]+)"', before_text) for quote in quotes: quote_clean = quote.strip() if len(quote_clean) > 10: quoted_matches.append((quote_clean, int(page_num))) print(f" ✓ [앞-인용문] \"{quote_clean[:60]}...\"") # === 섹션 2: [페이지 X] 뒤 부분 (기존 로직) === next_page_pos = len(answer) if i + 1 < len(page_matches): next_page_pos = page_matches[i + 1].start() section = answer[tag_end:next_page_pos] print(f"--- 페이지 {page_num} 뒷부분 (길이: {len(section)}자) ---") print(f"{section[:150]}...\n") # 큰따옴표 인용문 quotes = re.findall(r'"([^"]+)"', section) for quote in quotes: quote_clean = quote.strip() if len(quote_clean) > 10: quoted_matches.append((quote_clean, int(page_num))) print(f" ✓ [뒤-인용문] \"{quote_clean[:60]}...\"") # 리스트 항목 lines = section.split('\n') for line in lines: line_stripped = line.strip() if len(line_stripped) < 3: continue if line_stripped.startswith('**') or line_stripped.startswith('#'): continue item = None if line_stripped.startswith('○'): item = line_stripped[1:].strip() elif line_stripped.startswith('- ') or line_stripped.startswith('* '): item = line_stripped[2:].strip() elif re.match(r'^\d+\.\s+', line_stripped): match_obj = re.match(r'^\d+\.\s+(.+)$', line_stripped) if match_obj: item = match_obj.group(1).strip() if item: item = re.sub(r'\[\s*페이지\s*\d+\s*\]', '', item).strip() item = re.sub(r'\*\*([^*]+)\*\*', r'\1', item).strip() item = re.sub(r'\([""""][^)]+[""""\)]+', '', item).strip() item = re.sub(r'\s*\([^)]{0,50}\)\s*$', '', item).strip() if 3 <= len(item) <= 200: list_matches.append((item, int(page_num))) print(f" ✓ [리스트] {item[:50]}...") print(f"\n{'='*40}") print(f"📝 인용문: {len(quoted_matches)}개") print(f"📋 리스트: {len(list_matches)}개") print(f"{'='*40}\n") # 우선순위 all_matches = [] if quoted_matches and list_matches: all_short = all(len(q[0]) <= 30 for q in quoted_matches) if all_short: print(f"✓ 짧은 인용문 + 리스트 모두") all_matches = quoted_matches + list_matches else: print(f"✓ 인용문만") all_matches = quoted_matches elif quoted_matches: print(f"✓ 인용문만") all_matches = quoted_matches elif list_matches: print(f"✓ 리스트만") all_matches = list_matches # 중복 제거 seen = set() for text, page in all_matches: if text and (text, page) not in seen: highlights.append({ 'text': text, 'page': page }) seen.add((text, page)) print(f"\n{'='*80}") print(f"✅ 최종 추출: {len(highlights)}개") for i, h in enumerate(highlights, 1): print(f" [{i}] 페이지 {h['page']}: {h['text'][:60]}...") print(f"{'='*80}\n") return highlights def render_pdf_with_highlights(pdf_bytes: bytes, highlight_info: List[Dict], zoom_level: float = 2.0): highlighted_pdf = highlight_text_in_pdf(pdf_bytes, highlight_info) doc = fitz.open(stream=highlighted_pdf, filetype="pdf") highlighted_pages = set(h['page'] for h in highlight_info) pdf_html = '
' for page_num in range(len(doc)): page = doc[page_num] pix = page.get_pixmap(matrix=fitz.Matrix(zoom_level, zoom_level)) img_data = pix.tobytes("png") img_base64 = base64.b64encode(img_data).decode() zoom_percentage = int(zoom_level * 50) page_id = f'page-{page_num + 1}' pdf_html += f'
' if (page_num + 1) in highlighted_pages: pdf_html += f'
⭐ 페이지 {page_num + 1}
' else: pdf_html += f'
페이지 {page_num + 1}
' pdf_html += f'' pdf_html += '
' pdf_html += '
' doc.close() return pdf_html def main(): init_session() if not st.session_state.processed: col1, col2, col3 = st.columns([1, 1, 1]) with col2: st.markdown("
", unsafe_allow_html=True) st.image("img/plobin-grey.png", use_container_width=True) st.text(' ') with st.sidebar: st.image("img/plobin-right-only.png", width=85) uploaded_file = st.file_uploader( "드래그하여 파일을 업로드 또는 클릭하여 선택하세요.", type=['pdf'], label_visibility="visible", help="PDF 파일만 업로드 가능합니다 (최대 200MB)" ) if uploaded_file: if st.button("문서 처리 시작", type="primary", use_container_width=True): if not GROK_API_KEY or not OPENAI_API_KEY: st.error("⚠️ GROK_API_KEY 또는 OPENAI_API_KEY가 .env 파일에 설정되지 않았습니다!") st.stop() st.session_state.vector_db = None st.session_state.embedder = None st.session_state.chat_history = [] st.session_state.current_highlights = [] with st.spinner("문서 분석을 시작합니다..."): try: chunks, metadata_list, pdf_bytes, pages_text = extract_text_from_pdf(uploaded_file) with st.spinner("핵심 내용을 파악하고 있습니다..."): collection, embedder = create_vector_db(chunks, metadata_list) st.session_state.vector_db = collection st.session_state.embedder = embedder st.session_state.pdf_bytes = pdf_bytes st.session_state.pdf_pages_text = pages_text st.session_state.processed = True st.session_state.doc_metadata = { "filename": uploaded_file.name, "chunks": len(chunks), "pages": len(set(m['page'] for m in metadata_list)) } # 텍스트 로컬 저장 saved_file = save_extracted_text_to_file( chunks, metadata_list, uploaded_file.name ) st.success(f"문서 처리 완료!") st.rerun() except Exception as e: st.error(f"오류: {str(e)}") if st.session_state.processed: st.markdown("#### 문서 정보") st.info(f"**{st.session_state.doc_metadata['filename']}**") st.info(f"페이지: {st.session_state.doc_metadata['pages']}") # if not st.session_state.processed: # st.markdown(""" #
#

사용 방법

#
#
1
#
PDF 파일을 올려주세요
#
#
#
2
#
문서 처리가 완료될 때까지 잠시만 기다려주세요
#
#
#
3
#
문서 내 궁금한 내용을 물어보세요
#
#
#
4
#
AI가 정확한 답변과 출처를 함께 알려드려요
#
#
# """, unsafe_allow_html=True) if st.session_state.processed: col1, col2 = st.columns([1, 1]) with col1: header_cols = st.columns([7, 1, 1.5, 1]) with header_cols[0]: st.markdown("### ") if st.session_state.pdf_bytes: pdf_html = render_pdf_with_highlights( st.session_state.pdf_bytes, st.session_state.current_highlights, st.session_state.zoom_level ) st.markdown(pdf_html, unsafe_allow_html=True) if st.session_state.scroll_to_page: scroll_js = f""" """ components.html(scroll_js, height=0) st.session_state.scroll_to_page = None with col2: st.markdown('### ', unsafe_allow_html=True) chat_container = st.container(height=650) with chat_container: for msg_idx, msg in enumerate(st.session_state.chat_history): with st.chat_message(msg["role"]): st.markdown(msg["content"]) prompt = st.chat_input("질문을 입력하세요...", key="chat_input") if prompt: st.session_state.chat_history.append({"role": "user", "content": prompt}) st.session_state.processing_query = prompt st.rerun() # main() 함수 내부의 질문 처리 부분 if st.session_state.processing_query: query = st.session_state.processing_query st.session_state.processing_query = None with st.spinner("PLOBIN이 최적의 답변을 찾고 있습니다..."): try: search_results = hybrid_search( query, st.session_state.vector_db, st.session_state.embedder, top_k=3 ) grok_result = grok_verify_and_extract( query, search_results, GROK_API_KEY ) answer = generate_answer( query, search_results, GROK_API_KEY ) # ⭐ 중요: 큰따옴표 안의 텍스트만 추출 print("\n" + "="*80) print("답변에서 인용문 추출 중...") print("="*80) highlights = extract_highlights_from_answer(answer) # grok_result에서 추출한 것은 사용하지 않음 (필요시 주석 해제) # grok_highlights = extract_highlights_from_grok(grok_result) # highlights.extend(grok_highlights) st.session_state.current_highlights = highlights if grok_result and "page" in grok_result and "error" not in grok_result: st.session_state.scroll_to_page = grok_result["page"] chat_data = { "role": "assistant", "content": answer } st.session_state.chat_history.append(chat_data) st.rerun() except Exception as e: error_msg = f"❌ 오류: {str(e)}" st.session_state.chat_history.append({ "role": "assistant", "content": error_msg }) st.rerun() if __name__ == "__main__": main()