Spaces:
Sleeping
Sleeping
| import re | |
| import json | |
| import numpy as np | |
| import os | |
| from typing import Dict, List, Optional, Tuple | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| import pickle | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| from docx import Document | |
| import logging | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate | |
| from docx import Document | |
| from docx.shared import RGBColor | |
| import glob | |
| import logging | |
| from datetime import datetime | |
| EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-MiniLM-L6-v2") | |
| GPT_MODEL = os.getenv("GPT_MODEL", "gpt-5") | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
| # Configuration du logging | |
| def setup_logging(log_file: str = None): | |
| """Configure le système de logging""" | |
| if log_file is None: | |
| log_file = f"medical_parser_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
| # Créer le dossier logs s'il n'existe pas | |
| log_dir = "logs" | |
| if not os.path.exists(log_dir): | |
| os.makedirs(log_dir) | |
| log_path = os.path.join(log_dir, log_file) | |
| # Configuration du logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler(log_path, encoding='utf-8'), | |
| logging.StreamHandler() # Pour afficher aussi dans la console | |
| ] | |
| ) | |
| return log_path | |
| # Initialiser le logging | |
| log_path = setup_logging() | |
| logger = logging.getLogger(__name__) | |
| # Configuration du logging | |
| #logging.basicConfig(level=logging.INFO) | |
| #logger = logging.getLogger(__name__) | |
| class TemplateInfo: | |
| """Structure pour stocker les informations d'un template""" | |
| id: str | |
| type: str | |
| has_asr_zone: bool | |
| asr_tag_position: int | |
| detected_sections: List[str] | |
| medecin: str | |
| embedding: np.ndarray | |
| filepath: str | |
| content: str | |
| asr_context: str = "" | |
| sections_data: Dict = field(default_factory=dict) | |
| user_fields: List[str] = field(default_factory=list) | |
| class MedicalTemplateParser: | |
| """Parser pour templates médicaux avec base vectorielle et GPT""" | |
| def __init__(self, model_name: str = EMBEDDING_MODEL): | |
| """ | |
| Initialise le parser avec un modèle d'embedding et GPT | |
| Args: | |
| model_name: Nom du modèle SentenceTransformer à utiliser | |
| """ | |
| self.model = SentenceTransformer(model_name) | |
| self.templates: Dict[str, TemplateInfo] = {} | |
| self.vector_index = None | |
| self.template_ids = [] | |
| # Initialiser GPT pour l'analyse des sections | |
| self.llm = None | |
| self.section_classifier = None | |
| self._initialize_gpt() | |
| # Types de documents médicaux | |
| self.document_types = { | |
| "compte_rendu_imagerie": ["imagerie", "scanner", "IRM", "échographie", "radiologie", "TECHNIQUE", "RESULTATS"], | |
| "lettre_confrere": ["confrère", "cher", "collègue", "salutations", "cordialement"], | |
| "resultats_laboratoire": ["laboratoire", "analyses", "biologie", "résultats", "valeurs"], | |
| "demande_examen": ["demande", "prescription", "examen", "bilan"], | |
| "ordonnance": ["ordonnance", "prescription", "posologie", "traitement"] | |
| } | |
| def _initialize_gpt(self): | |
| """Initialise le modèle GPT pour l'analyse des sections""" | |
| api_key = OPENAI_API_KEY | |
| if not api_key: | |
| logger.warning("OPENAI_API_KEY non définie. L'analyse GPT ne sera pas disponible.") | |
| return | |
| try: | |
| self.llm = ChatOpenAI( | |
| model=GPT_MODEL, | |
| temperature=0, | |
| max_tokens=4000, | |
| api_key=api_key | |
| ) | |
| # Définir le prompt pour l'analyse des sections | |
| section_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """ | |
| Vous êtes un expert en analyse de documents médicaux. Je vais vous fournir le texte complet d'un rapport médical. | |
| IMPORTANT : Réponds UNIQUEMENT avec un JSON valide. Aucun texte supplémentaire, aucune explication, aucune balise markdown. | |
| Votre tâche est de : | |
| 1. **Identifier le type de document** : | |
| - Si le document contient "Destinataire :" → c'est une "lettre médicale" | |
| - Si le document contient des mots-clés d'examen (SCANNER, IRM, ECHOGRAPHIE, RADIO, etc.) → c'est un "rapport médical" | |
| - Si le document contient "TITRE" seul → c'est un "rapport médical" (pas type "TITRE") | |
| - Sinon → "rapport médical" par défaut | |
| 2. **Extraire les informations du centre médical** (si elles existent) : | |
| - Chercher le nom du centre/association/hôpital | |
| - Chercher l'adresse si présente | |
| - Chercher le téléphone si présent | |
| - Chercher le service médical (ex: "Service D'IMAGERIE MEDICALE") | |
| - Chercher l'équipement médical mentionné | |
| - Ces informations doivent être stockées dans un champ "center_info" séparé | |
| - Si aucune information de centre n'est trouvée, laisser center_info vide ou null | |
| 3. **Identifier le médecin** (si mentionné) : | |
| - Chercher les signatures en fin de document (ex: "Dr Eric AUBERTON") | |
| - Chercher les mentions "PRESCRIPTEUR :" suivi du nom | |
| - Chercher les formules comme "DR M NOM Prénom" où NOM/Prénom sont des champs à remplir | |
| - Si aucun médecin n'est identifié, retourner "Non spécifié" | |
| 4. **Identifier les sections à remplir** : | |
| Une section est définie par : | |
| - Un champ d'information suivi de deux-points : "Patient:", "Initiales:", "Date:", "Médecin:", etc. | |
| - Un titre de section suivi de deux-points : "Indication:", "Technique:", "Résultats:", "Conclusion:", "Compte-rendu:", etc. | |
| - Une ligne seule en majuscules représentant un champ à remplir : "TITRE", "CLINIQUE", "TECHNIQUE", "RÉSULTATS", "CONCLUSION" | |
| - Pour les lettres : "Destinataire:" et le contenu principal de la lettre (zone ASR_VOX) | |
| **EXCLUSIONS importantes** : | |
| - Les titres encadrés de tirets (ex: "- EXAMEN TOMODENSITOMETRIQUE THORACIQUE -") → document_type uniquement | |
| - Les informations d'en-tête du centre médical → center_info uniquement | |
| - Les informations administratives fixes → ne pas traiter comme sections | |
| 5. **Pour chaque section identifiée** : | |
| - Extraire son contenu en collectant toutes les lignes suivantes jusqu'à la prochaine section | |
| - Identifier les champs à remplir par l'utilisateur : | |
| * Les balises <ASR_VOX> indiquent des champs à remplir | |
| * Les balises <ASR> indiquent des champs à remplir | |
| * Les textes génériques comme "xxx", "xxxx", "XXX" indiquent des champs à remplir | |
| * Les formules conditionnelles comme "SI(Civilité Nom usuel médecin..." indiquent des champs à remplir | |
| * Les balises [NOM_PATIENT], [DATE], [MEDECIN], etc. indiquent des champs à remplir | |
| * Les champs vides après ":" indiquent des champs à remplir | |
| * Les mots "NOM", "Prénom" dans le contexte médical indiquent des champs à remplir | |
| 6. **Gestion spéciale des lettres médicales** : | |
| - "Destinataire:" est une section à remplir | |
| - Le contenu principal (zone ASR_VOX) doit être identifié comme "Contenu" ou "Corps de lettre" | |
| 7. **Identifier les zones ASR** et leur position dans le document | |
| 8. **Retourner un objet JSON valide** avec cette structure exacte : | |
| {{ | |
| "document_type": "rapport médical|lettre médicale|autre", | |
| "center_info": {{ | |
| "name": "nom du centre/association si trouvé, sinon null", | |
| "address": "adresse complète si trouvée, sinon null", | |
| "phone": "téléphone si trouvé, sinon null", | |
| "service": "service médical si trouvé, sinon null", | |
| "equipment": "équipement mentionné si trouvé, sinon null" | |
| }}, | |
| "physician": "nom du médecin identifié ou 'Non spécifié'", | |
| "asr_zones": [ | |
| {{ | |
| "tag": "balise ASR trouvée", | |
| "position": "position approximative dans le texte", | |
| "context": "contexte autour de la balise" | |
| }} | |
| ], | |
| "sections": {{ | |
| "nom_section": {{ | |
| "content": "contenu brut de la section", | |
| "has_user_fields": true, | |
| "user_fields": ["liste des champs à remplir"] | |
| }} | |
| }} | |
| }} | |
| **Règles importantes** : | |
| - Extraire UNIQUEMENT les informations qui existent réellement dans le document | |
| - Les informations administratives du centre ne sont PAS des sections à remplir (si elles existent) | |
| - Si aucune information de centre n'est trouvée, laisser les champs center_info à null | |
| - "TITRE" seul indique un rapport médical, pas un type "TITRE" | |
| - Les lettres ont un traitement spécial avec Destinataire + Contenu principal | |
| - Identifier le médecin quand c'est possible, sinon "Non spécifié" | |
| - Distinguer clairement les champs à remplir des informations fixes | |
| - Adaptation flexible : tous les documents n'ont pas la même structure | |
| Répondez UNIQUEMENT avec le JSON—aucun commentaire supplémentaire. | |
| """), | |
| ("human", "Voici le texte complet du rapport médical :\n\n{document_text}\n\nExtrayez toutes les sections, identifiez les champs à remplir et les zones ASR.") | |
| ]) | |
| self.section_classifier = section_prompt | self.llm | |
| print("✅ GPT initialisé avec succès") | |
| logger.info(f"✅ GPT initialisé avec succès") | |
| except Exception as e: | |
| logger.info(f"❌ Erreur lors de l'initialisation de GPT: {e}") | |
| self.llm = None | |
| self.section_classifier = None | |
| def extract_text_from_docx(self, filepath: str) -> Tuple[str, Dict]: | |
| """ | |
| Extrait le texte d'un fichier Word en préservant la structure | |
| Args: | |
| filepath: Chemin vers le fichier DOCX | |
| Returns: | |
| Tuple[str, Dict]: (texte_complet, informations_structure) | |
| """ | |
| logger.info(f"📄 Extraction du texte DOCX: {os.path.basename(filepath)}") | |
| try: | |
| doc = Document(filepath) | |
| text_content = [] | |
| structure_info = { | |
| "paragraphs": [], | |
| "tables": [], | |
| "headers": [], | |
| "footers": [], | |
| "styles": [], | |
| "formatting": [] | |
| } | |
| # Traiter les paragraphes | |
| for i, paragraph in enumerate(doc.paragraphs): | |
| para_text = paragraph.text.strip() | |
| if para_text: # Ignorer les paragraphes vides | |
| text_content.append(para_text) | |
| # Collecter les informations de structure | |
| para_info = { | |
| "index": i, | |
| "text": para_text, | |
| "style": paragraph.style.name if paragraph.style else "Normal", | |
| "alignment": str(paragraph.alignment) if paragraph.alignment else "None", | |
| "runs": [] | |
| } | |
| # Analyser les runs (formatage) | |
| for run in paragraph.runs: | |
| if run.text.strip(): | |
| run_info = { | |
| "text": run.text, | |
| "bold": run.bold, | |
| "italic": run.italic, | |
| "underline": run.underline, | |
| "font_name": run.font.name if run.font.name else "Default", | |
| "font_size": run.font.size.pt if run.font.size else None, | |
| "color": self._get_color_info(run.font.color) if run.font.color else None | |
| } | |
| para_info["runs"].append(run_info) | |
| structure_info["paragraphs"].append(para_info) | |
| structure_info["styles"].append(paragraph.style.name if paragraph.style else "Normal") | |
| # Traiter les tableaux | |
| for table_idx, table in enumerate(doc.tables): | |
| table_info = { | |
| "index": table_idx, | |
| "rows": len(table.rows), | |
| "cols": len(table.columns), | |
| "content": [] | |
| } | |
| table_text = [] | |
| for row_idx, row in enumerate(table.rows): | |
| row_data = [] | |
| row_text = [] | |
| for cell_idx, cell in enumerate(row.cells): | |
| cell_text = cell.text.strip() | |
| row_data.append(cell_text) | |
| row_text.append(cell_text) | |
| if cell_text: | |
| text_content.append(cell_text) | |
| table_info["content"].append(row_data) | |
| table_text.append(" | ".join(row_text)) | |
| structure_info["tables"].append(table_info) | |
| # Ajouter le contenu du tableau au texte principal | |
| text_content.extend(table_text) | |
| # Traiter les en-têtes et pieds de page | |
| for section in doc.sections: | |
| # En-têtes | |
| if section.header: | |
| header_text = [] | |
| for paragraph in section.header.paragraphs: | |
| if paragraph.text.strip(): | |
| header_text.append(paragraph.text.strip()) | |
| text_content.append(paragraph.text.strip()) | |
| if header_text: | |
| structure_info["headers"].append({ | |
| "content": header_text, | |
| "section_index": doc.sections.index(section) | |
| }) | |
| # Pieds de page | |
| if section.footer: | |
| footer_text = [] | |
| for paragraph in section.footer.paragraphs: | |
| if paragraph.text.strip(): | |
| footer_text.append(paragraph.text.strip()) | |
| text_content.append(paragraph.text.strip()) | |
| if footer_text: | |
| structure_info["footers"].append({ | |
| "content": footer_text, | |
| "section_index": doc.sections.index(section) | |
| }) | |
| # Nettoyer les styles dupliqués | |
| structure_info["styles"] = list(set(structure_info["styles"])) | |
| # Créer le texte final | |
| final_text = "\n".join(text_content) | |
| logger.info(f"✅ Texte extrait avec succès:") | |
| logger.info(f" - Paragraphes: {len(structure_info['paragraphs'])}") | |
| logger.info(f" - Tableaux: {len(structure_info['tables'])}") | |
| logger.info(f" - En-têtes: {len(structure_info['headers'])}") | |
| logger.info(f" - Pieds de page: {len(structure_info['footers'])}") | |
| logger.info(f" - Styles utilisés: {len(structure_info['styles'])}") | |
| return final_text, structure_info | |
| except Exception as e: | |
| logger.info(f"❌ Erreur lors de l'extraction du texte DOCX de {filepath}: {e}") | |
| return "", {} | |
| def _get_color_info(self, color): | |
| """Extrait les informations de couleur d'un run""" | |
| try: | |
| if color.rgb: | |
| return f"rgb({color.rgb.red}, {color.rgb.green}, {color.rgb.blue})" | |
| elif color.theme_color: | |
| return f"theme_{color.theme_color}" | |
| else: | |
| return "default" | |
| except: | |
| return "default" | |
| def analyze_document_with_gpt(self, text: str) -> Dict: | |
| """ | |
| Analyse le document avec GPT pour extraire sections et zones ASR | |
| Args: | |
| text: Texte complet du document | |
| Returns: | |
| Dict: Résultats de l'analyse GPT | |
| """ | |
| if not self.section_classifier: | |
| logger.info("⚠️ GPT non disponible, utilisation des méthodes classiques") | |
| return self._fallback_analysis(text) | |
| try: | |
| logger.info("🔍 Analyse du document avec GPT...") | |
| response = self.section_classifier.invoke({"document_text": text}) | |
| result = response.content.strip() | |
| # Vérifier si la réponse est vide | |
| if not result: | |
| logger.info("❌ Réponse GPT vide") | |
| return self._fallback_analysis(text) | |
| logger.info(f"📝 Réponse GPT (premiers 200 caractères): {result[:200]}...") | |
| if result.startswith("```json"): | |
| result = result[7:] # Supprimer ```json | |
| if result.endswith("```"): | |
| result = result[:-3] # Supprimer ``` | |
| # Supprimer les espaces en début et fin | |
| result = result.strip() | |
| # Vérifier que ça commence par { et finit par } | |
| if not result.startswith('{') or not result.endswith('}'): | |
| logger.info(f"❌ Format JSON invalide. Début: '{result[:50]}...' Fin: '...{result[-50:]}'") | |
| return self._fallback_analysis(text) | |
| # Parser la réponse JSON | |
| analysis_data = json.loads(result) | |
| logger.info("✅ Analyse GPT terminée avec succès") | |
| return analysis_data | |
| except json.JSONDecodeError as e: | |
| logger.info(f"❌ Erreur de parsing JSON GPT: {e}") | |
| return self._fallback_analysis(text) | |
| except Exception as e: | |
| logger.info(f"❌ Erreur lors de l'analyse GPT: {e}") | |
| return self._fallback_analysis(text) | |
| def _fallback_analysis(self, text: str) -> Dict: | |
| """Analyse de fallback sans GPT""" | |
| logger.info("📊 Utilisation de l'analyse classique...") | |
| # Détection ASR classique | |
| has_asr, asr_pos, asr_context = self.detect_asr_zone_classic(text) | |
| # Extraction sections classique | |
| sections = self.extract_sections_classic(text) | |
| # Classification type classique | |
| doc_type = self.classify_document_type(text, sections) | |
| return { | |
| "document_type": doc_type, | |
| "asr_zones": [{"tag": "<ASR_VOX>", "position": asr_pos, "context": asr_context}] if has_asr else [], | |
| "sections": {section: {"content": "", "has_user_fields": False, "user_fields": []} for section in sections} | |
| } | |
| def detect_asr_zone_classic(self, text: str) -> Tuple[bool, int, str]: | |
| """ | |
| Détection ASR classique (fallback) | |
| Returns: | |
| (has_asr_zone, position, context_around_asr) | |
| """ | |
| asr_patterns = [ | |
| r"<ASR_VOX>", | |
| r"<ASR>", | |
| r"\[DICTEE\]", | |
| r"\[ASR\]", | |
| r"<!-- ASR -->" | |
| ] | |
| for pattern in asr_patterns: | |
| match = re.search(pattern, text, re.IGNORECASE) | |
| if match: | |
| position = match.start() | |
| start_context = max(0, position - 200) | |
| end_context = min(len(text), position + 200) | |
| context = text[start_context:end_context] | |
| return True, position, context | |
| return False, -1, "" | |
| def extract_sections_classic(self, text: str) -> List[str]: | |
| """Extraction de sections classique (fallback)""" | |
| sections = set() | |
| section_patterns = [ | |
| r"([A-ZÉÈÀÇÊ][A-ZÉÈÀÇÊ\s]{2,}):", | |
| r"([A-ZÉÈÀÇÊ][a-zéèàçê\s]{3,}):", | |
| r"(\d+\.\s*[A-ZÉÈÀÇÊ][a-zéèàçê\s]{3,}):", | |
| ] | |
| for pattern in section_patterns: | |
| matches = re.findall(pattern, text, re.MULTILINE) | |
| for match in matches: | |
| section = match.strip().rstrip(':').strip() | |
| if len(section) > 2 and len(section) < 50: | |
| sections.add(section) | |
| return sorted(list(sections)) | |
| def classify_document_type(self, text: str, sections: List[str]) -> str: | |
| """Classifie le type de document basé sur le contenu et les sections""" | |
| text_lower = text.lower() | |
| sections_lower = [s.lower() for s in sections] | |
| all_text = text_lower + " " + " ".join(sections_lower) | |
| max_score = 0 | |
| best_type = "autre" | |
| for doc_type, keywords in self.document_types.items(): | |
| score = 0 | |
| for keyword in keywords: | |
| if keyword.lower() in all_text: | |
| score += 1 | |
| if doc_type == "compte_rendu_imagerie" and any("technique" in s for s in sections_lower): | |
| score += 2 | |
| if score > max_score: | |
| max_score = score | |
| best_type = doc_type | |
| return best_type | |
| def extract_doctor_name(self, text: str) -> str: | |
| """Extrait le nom du médecin du template""" | |
| doctor_patterns = [ | |
| r"Dr\.?\s+([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)", | |
| r"Docteur\s+([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)", | |
| r"Praticien\s*:\s*([A-ZÉÈÀÇÊ][a-zéèàçê]+\s+[A-ZÉÈÀÇÊ][a-zéèàçê]+)", | |
| ] | |
| for pattern in doctor_patterns: | |
| match = re.search(pattern, text) | |
| if match: | |
| return match.group(1).strip() | |
| return "Non spécifié" | |
| def parse_template(self, filepath: str, template_id: str = None) -> TemplateInfo: | |
| """ | |
| Parse un template et extrait toutes les informations avec GPT | |
| Args: | |
| filepath: Chemin vers le fichier template | |
| template_id: ID unique du template (optionnel) | |
| Returns: | |
| TemplateInfo: Informations structurées du template | |
| """ | |
| if template_id is None: | |
| template_id = Path(filepath).stem | |
| logger.info(f"\n📄 Traitement du fichier: {os.path.basename(filepath)}") | |
| # Extraire le texte selon le type de fichier | |
| if filepath.endswith('.docx'): | |
| text, _ = self.extract_text_from_docx(filepath) # Utiliser la méthode existante | |
| else: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| if not text.strip(): | |
| logger.info("❌ Aucun texte extrait du fichier") | |
| return None | |
| # Analyser avec GPT | |
| analysis_data = self.analyze_document_with_gpt(text) | |
| # Extraire les informations de l'analyse GPT | |
| doc_type = analysis_data.get("document_type", "autre") | |
| sections_data = analysis_data.get("sections", {}) | |
| asr_zones = analysis_data.get("asr_zones", []) | |
| logger.info(f"📋 Type de document détecté: {doc_type}") | |
| logger.info(f"🔍 Zones ASR trouvées: {len(asr_zones)}") | |
| logger.info(f"📑 Sections détectées: {len(sections_data)}") | |
| # Déterminer les informations ASR | |
| has_asr = len(asr_zones) > 0 | |
| asr_pos = asr_zones[0]["position"] if asr_zones else -1 | |
| asr_context = asr_zones[0]["context"] if asr_zones else "" | |
| # Extraire les sections détectées | |
| detected_sections = list(sections_data.keys()) | |
| # Collecter tous les champs utilisateur | |
| user_fields = [] | |
| for section_data in sections_data.values(): | |
| if isinstance(section_data, dict) and section_data.get("has_user_fields"): | |
| user_fields.extend(section_data.get("user_fields", [])) | |
| # Extraire le nom du médecin | |
| medecin = self.extract_doctor_name(text) | |
| logger.info(f"👨⚕️ Médecin détecté: {medecin}") | |
| # Créer le texte pour l'embedding | |
| embedding_text = self.create_embedding_text(text, asr_context, detected_sections, doc_type) | |
| # Générer l'embedding | |
| embedding = self.model.encode([embedding_text])[0] | |
| # Créer l'objet TemplateInfo | |
| template_info = TemplateInfo( | |
| id=template_id, | |
| type=doc_type, | |
| has_asr_zone=has_asr, | |
| asr_tag_position=asr_pos, | |
| detected_sections=detected_sections, | |
| medecin=medecin, | |
| embedding=embedding, | |
| filepath=filepath, | |
| content=text, | |
| asr_context=asr_context, | |
| sections_data=sections_data, | |
| user_fields=user_fields | |
| ) | |
| logger.info(f"✅ Template {template_id} traité avec succès") | |
| return template_info | |
| def create_embedding_text(self, text: str, asr_context: str, sections: List[str], doc_type: str) -> str: | |
| """ | |
| Crée le texte optimisé pour l'embedding | |
| Args: | |
| text: Texte complet du template | |
| asr_context: Contexte autour de la zone ASR | |
| sections: Sections détectées | |
| doc_type: Type de document | |
| Returns: | |
| str: Texte optimisé pour l'embedding | |
| """ | |
| lines = text.split('\n') | |
| header = ' '.join(lines[:5]) | |
| embedding_parts = [ | |
| f"Type: {doc_type}", | |
| f"Sections: {', '.join(sections[:5])}", | |
| f"Contexte: {header[:200]}", | |
| ] | |
| if asr_context: | |
| embedding_parts.append(f"Zone ASR: {asr_context[:100]}") | |
| return ' | '.join(embedding_parts) | |
| def process_docx_folder(self, folder_path: str) -> List[TemplateInfo]: | |
| """ | |
| Traite tous les fichiers DOCX dans un dossier | |
| Args: | |
| folder_path: Chemin vers le dossier contenant les fichiers DOCX | |
| Returns: | |
| List[TemplateInfo]: Liste des templates traités | |
| """ | |
| logger.info(f"🗂️ Traitement du dossier: {folder_path}") | |
| # Chercher tous les fichiers DOCX | |
| docx_files = glob.glob(os.path.join(folder_path, "*.docx")) | |
| if not docx_files: | |
| logger.info("❌ Aucun fichier DOCX trouvé dans le dossier") | |
| return [] | |
| logger.info(f"📁 {len(docx_files)} fichiers DOCX trouvés") | |
| templates = [] | |
| for i, filepath in enumerate(docx_files, 1): | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"📄 Fichier {i}/{len(docx_files)}: {os.path.basename(filepath)}") | |
| logger.info(f"{'='*60}") | |
| try: | |
| template_info = self.parse_template(filepath) | |
| if template_info: | |
| templates.append(template_info) | |
| self.templates[template_info.id] = template_info | |
| except Exception as e: | |
| logger.info(f"❌ Erreur lors du traitement de {filepath}: {e}") | |
| continue | |
| logger.info(f"\n🎉 Traitement terminé: {len(templates)} templates traités avec succès") | |
| return templates | |
| def build_vector_database(self, templates: List[TemplateInfo]): | |
| """ | |
| Construit la base vectorielle avec FAISS | |
| Args: | |
| templates: Liste des templates parsés | |
| """ | |
| if not templates: | |
| logger.info("❌ Aucun template fourni pour construire la base vectorielle") | |
| return | |
| logger.info(f"🔧 Construction de la base vectorielle avec {len(templates)} templates...") | |
| embeddings = np.array([template.embedding for template in templates]) | |
| dimension = embeddings.shape[1] | |
| self.vector_index = faiss.IndexFlatIP(dimension) | |
| faiss.normalize_L2(embeddings) | |
| self.vector_index.add(embeddings) | |
| self.template_ids = [template.id for template in templates] | |
| logger.info(f"✅ Base vectorielle construite avec succès") | |
| def search_similar_templates(self, query_text: str, k: int = 5) -> List[Tuple[str, float]]: | |
| """ | |
| Recherche les templates similaires à une requête | |
| Args: | |
| query_text: Texte de la requête | |
| k: Nombre de résultats à retourner | |
| Returns: | |
| List[Tuple[str, float]]: Liste des (template_id, score) les plus similaires | |
| """ | |
| if self.vector_index is None: | |
| logger.info("❌ Base vectorielle non construite") | |
| return [] | |
| logger.info(f"🔍 Recherche pour: '{query_text}'") | |
| query_embedding = self.model.encode([query_text]) | |
| faiss.normalize_L2(query_embedding) | |
| scores, indices = self.vector_index.search(query_embedding, k) | |
| results = [] | |
| for i, (score, idx) in enumerate(zip(scores[0], indices[0])): | |
| if idx < len(self.template_ids): | |
| template_id = self.template_ids[idx] | |
| results.append((template_id, float(score))) | |
| return results | |
| def save_database(self, filepath: str): | |
| """Sauvegarde la base vectorielle et les templates""" | |
| logger.info(f"💾 Sauvegarde de la base de données...") | |
| database_data = { | |
| 'templates': self.templates, | |
| 'template_ids': self.template_ids, | |
| 'model_name': self.model.get_sentence_embedding_dimension() | |
| } | |
| with open(filepath, 'wb') as f: | |
| pickle.dump(database_data, f) | |
| if self.vector_index is not None: | |
| faiss.write_index(self.vector_index, filepath.replace('.pkl', '.faiss')) | |
| logger.info(f"✅ Base de données sauvegardée dans {filepath}") | |
| def load_database(self, filepath: str): | |
| """Charge la base vectorielle et les templates""" | |
| logger.info(f"📂 Chargement de la base de données depuis {filepath}...") | |
| with open(filepath, 'rb') as f: | |
| database_data = pickle.load(f) | |
| self.templates = database_data['templates'] | |
| self.template_ids = database_data['template_ids'] | |
| faiss_path = filepath.replace('.pkl', '.faiss') | |
| if Path(faiss_path).exists(): | |
| self.vector_index = faiss.read_index(faiss_path) | |
| logger.info(f"✅ Base de données chargée avec succès") | |
| def get_template_info(self, template_id: str) -> Optional[TemplateInfo]: | |
| """Récupère les informations d'un template par son ID""" | |
| return self.templates.get(template_id) | |
| def print_template_summary(self, template_id: str): | |
| """Affiche un résumé des informations d'un template""" | |
| template = self.get_template_info(template_id) | |
| if template: | |
| logger.info(f"\n{'='*60}") | |
| logger.info(f"📋 Template: {template.id}") | |
| logger.info(f"{'='*60}") | |
| logger.info(f"📄 Type: {template.type}") | |
| logger.info(f"👨⚕️ Médecin: {template.medecin}") | |
| logger.info(f"🎤 Zone ASR: {'✅ Oui' if template.has_asr_zone else '❌ Non'}") | |
| logger.info(f"📑 Sections détectées ({len(template.detected_sections)}): {', '.join(template.detected_sections)}") | |
| logger.info(f"⚠️ Champs utilisateur ({len(template.user_fields)}): {', '.join(template.user_fields[:3])}{'...' if len(template.user_fields) > 3 else ''}") | |
| logger.info(f"📁 Fichier: {os.path.basename(template.filepath)}") | |
| logger.info(f"{'='*60}") | |
| # Afficher les détails des sections | |
| for section_name, section_data in template.sections_data.items(): | |
| if isinstance(section_data, dict): | |
| logger.info(f"📋 Section: {section_name}") | |
| if section_data.get("has_user_fields"): | |
| fields = section_data.get('user_fields', []) | |
| logger.info(f" ⚠️ Champs à remplir: {', '.join(fields)}") | |
| else: | |
| logger.info(f" ✅ Section complète") | |
| def print_global_summary(self): | |
| """Affiche un résumé global de tous les templates""" | |
| logger.info(f"\n{'='*80}") | |
| logger.info(f"📊 RÉSUMÉ GLOBAL - {len(self.templates)} TEMPLATES TRAITÉS") | |
| logger.info(f"{'='*80}") | |
| # Statistiques par type | |
| types_count = {} | |
| asr_count = 0 | |
| total_sections = 0 | |
| total_user_fields = 0 | |
| for template in self.templates.values(): | |
| types_count[template.type] = types_count.get(template.type, 0) + 1 | |
| if template.has_asr_zone: | |
| asr_count += 1 | |
| total_sections += len(template.detected_sections) | |
| total_user_fields += len(template.user_fields) | |
| logger.info(f"📈 Statistiques générales:") | |
| logger.info(f" - Total templates: {len(self.templates)}") | |
| logger.info(f" - Templates avec ASR: {asr_count}") | |
| logger.info(f" - Total sections: {total_sections}") | |
| logger.info(f" - Total champs utilisateur: {total_user_fields}") | |
| logger.info(f"\n📊 Répartition par type:") | |
| for doc_type, count in types_count.items(): | |
| logger.info(f" - {doc_type}: {count}") | |
| logger.info(f"\n📋 Templates individuels:") | |
| for template_id in sorted(self.templates.keys()): | |
| template = self.templates[template_id] | |
| asr_icon = "🎤" if template.has_asr_zone else "❌" | |
| logger.info(f" {asr_icon} {template_id} ({template.type}) - {len(template.detected_sections)} sections - {template.medecin}") | |
| def main(): | |
| """Fonction principale pour traiter un dossier de fichiers docx""" | |
| # Chemin vers le dossier contenant les fichiers docx | |
| docx_folder = input("Entrez le chemin vers le dossier contenant les fichiers docx: ").strip() | |
| if not os.path.exists(docx_folder): | |
| logger.info(f"❌ Le dossier {docx_folder} n'existe pas") | |
| return | |
| logger.info(f"\n🚀 Démarrage du traitement des fichiers docx...") | |
| logger.info(f"📁 Dossier source: {docx_folder}") | |
| # Initialiser le parser | |
| parser = MedicalTemplateParser() | |
| # CORRECTION: Utiliser process_docx_folder au lieu de extract_text_from_docx | |
| templates = parser.process_docx_folder(docx_folder) | |
| if not templates: | |
| logger.info("❌ Aucun template traité avec succès") | |
| return | |
| # Construire la base vectorielle | |
| parser.build_vector_database(templates) | |
| # Afficher le résumé global | |
| parser.print_global_summary() | |
| # Afficher les détails de chaque template | |
| logger.info(f"\n{'='*80}") | |
| logger.info(f"📄 DÉTAILS DES TEMPLATES") | |
| logger.info(f"{'='*80}") | |
| for template_id in sorted(parser.templates.keys()): | |
| parser.print_template_summary(template_id) | |
| # Tester la recherche | |
| logger.info(f"\n{'='*80}") | |
| logger.info(f"🔍 TEST DE RECHERCHE") | |
| logger.info(f"{'='*80}") | |
| test_queries = [ | |
| "échographie abdominale", | |
| "scanner thoracique", | |
| "compte rendu imagerie", | |
| "résultats laboratoire" | |
| ] | |
| for query in test_queries: | |
| logger.info(f"\n🔍 Recherche pour: '{query}'") | |
| results = parser.search_similar_templates(query, k=3) | |
| if results: | |
| logger.info("📊 Résultats:") | |
| for i, (template_id, score) in enumerate(results, 1): | |
| template = parser.get_template_info(template_id) | |
| logger.info(f" {i}. {template_id} (score: {score:.3f}) - {template.type} - {template.medecin}") | |
| else: | |
| logger.info("❌ Aucun résultat trouvé") | |
| # Sauvegarder la base | |
| save_path = os.path.join(docx_folder, 'medical_templates.pkl') | |
| parser.save_database(save_path) | |
| logger.info(f"\n✅ Traitement terminé avec succès!") | |
| logger.info(f"💾 Base de données sauvegardée: {save_path}") | |
| if __name__ == "__main__": | |
| main() |