Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| import numpy as np | |
| from typing import Dict, List, Optional, Tuple, Set | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import pickle | |
| import re | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| from langchain_openai import ChatOpenAI | |
| from langchain.prompts import ChatPromptTemplate | |
| # Réutiliser les classes du code existant | |
| from template_db_creation import MedicalTemplateParser, TemplateInfo | |
| class SectionMatch: | |
| """Représente le matching d'une section""" | |
| section_name: str | |
| confidence: float | |
| extracted_content: str | |
| can_fill: bool | |
| missing_info: List[str] | |
| class TemplateMatch: | |
| """Résultat détaillé du matching d'un template""" | |
| template_id: str | |
| template_info: TemplateInfo | |
| overall_score: float | |
| type_match_score: float | |
| physician_match_score: float | |
| center_match_score: float | |
| content_match_score: float | |
| filename_match_score: float # Nouveau score | |
| fillability_score: float | |
| section_matches: Dict[str, SectionMatch] | |
| confidence_level: str | |
| can_be_filled: bool | |
| filling_percentage: float | |
| missing_critical_info: List[str] | |
| extracted_data: Dict[str, str] | |
| filename_indicators: List[str] # Nouveau champ | |
| class FilenameAnalysis: | |
| """Analyse d'un nom de fichier médical""" | |
| original_filename: str | |
| medical_keywords: List[str] | |
| document_type_indicators: List[str] | |
| specialty_indicators: List[str] | |
| center_indicators: List[str] | |
| anatomical_regions: List[str] | |
| procedure_type: Optional[str] | |
| confidence_score: float | |
| class SmartTranscriptionMatcher: | |
| """Système intelligent de matching entre transcriptions et templates médicaux""" | |
| def __init__(self, database_path: str = None): | |
| """Initialise le matcher avec une base de données existante""" | |
| self.parser = MedicalTemplateParser() | |
| self.llm = None | |
| self.content_analyzer = None | |
| self.section_extractor = None | |
| self.filename_analyzer = None # Nouveau | |
| self._initialize_gpt() | |
| self._initialize_filename_keywords() | |
| if database_path and os.path.exists(database_path): | |
| self.load_database(database_path) | |
| else: | |
| logging.warning("Base de données non trouvée ou non spécifiée") | |
| def _initialize_filename_keywords(self): | |
| """Initialise les mots-clés pour l'analyse des noms de fichiers""" | |
| self.filename_keywords = { | |
| # Types d'examens d'imagerie | |
| "imagerie": { | |
| "irm": ["irm", "mri", "resonance"], | |
| "scanner": ["scanner", "tdm", "ct", "tomodensitometrie"], | |
| "echographie": ["echo", "echographie", "doppler", "ultrasound"], | |
| "radiologie": ["radio", "radiologie", "rx", "xray"], | |
| "pet": ["pet", "tep", "scintigraphie"], | |
| "mammographie": ["mammo", "mammographie", "breast"] | |
| }, | |
| # Spécialités médicales | |
| "specialites": { | |
| "cardiologie": ["cardio", "coeur", "heart", "ecg", "holter"], | |
| "neurologie": ["neuro", "brain", "cerveau", "eeg"], | |
| "orthopedic": ["ortho", "os", "bone", "fracture"], | |
| "gynecologie": ["gyneco", "utérus", "ovaire", "pelvien"], | |
| "urologie": ["uro", "vessie", "rein", "prostate"], | |
| "pneumologie": ["pneumo", "poumon", "thorax", "resp"], | |
| "gastro": ["gastro", "abdomen", "foie", "intestin"] | |
| }, | |
| # Régions anatomiques | |
| "anatomie": { | |
| "tete": ["tete", "crane", "cerebral", "encephale"], | |
| "thorax": ["thorax", "poumon", "coeur", "mediastin"], | |
| "abdomen": ["abdomen", "foie", "rate", "pancreas"], | |
| "pelvis": ["pelvis", "pelvien", "utérus", "ovaire", "vessie"], | |
| "membres": ["membre", "bras", "jambe", "genou", "epaule"], | |
| "rachis": ["rachis", "colonne", "vertebral", "lombaire"] | |
| }, | |
| # Types de procédures | |
| "procedures": { | |
| "arteriel": ["arteriel", "artere", "vasculaire"], | |
| "veineux": ["veineux", "veine", "phlebo"], | |
| "fonctionnel": ["fonctionnel", "dynamique", "stress"], | |
| "contraste": ["contraste", "injection", "gadolinium"] | |
| }, | |
| # Centres médicaux (à adapter selon votre contexte) | |
| "centres": { | |
| "roseraie": ["roseraie", "rose"], | |
| "4villes": ["4villes", "quatre"], | |
| "mstruk": ["mstruk", "struktur"], | |
| "radioroseraie": ["radioroseraie"] | |
| } | |
| } | |
| def _initialize_gpt(self): | |
| """Initialise GPT pour l'analyse de contenu""" | |
| api_key = os.getenv('OPENAI_API_KEY') | |
| if not api_key: | |
| logging.warning("OPENAI_API_KEY non définie. L'analyse GPT ne sera pas disponible.") | |
| return | |
| try: | |
| self.llm = ChatOpenAI( | |
| model="gpt-4o", | |
| temperature=0, | |
| max_tokens=4000, | |
| api_key=api_key | |
| ) | |
| # Prompt pour analyser le contenu de la transcription | |
| content_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """Vous êtes un expert en analyse de transcriptions médicales. Analysez la transcription fournie et retournez UNIQUEMENT un JSON valide. | |
| Votre tâche est de : | |
| 1. **Identifier le type de document précis** : | |
| - "compte_rendu_imagerie" : IRM, scanner, échographie, radiologie | |
| - "rapport_biologique" : analyses de laboratoire, résultats biologiques | |
| - "lettre_medicale" : correspondance entre médecins, lettres de sortie | |
| - "compte_rendu_consultation" : consultation médicale, examen clinique | |
| - "rapport_operatoire" : comptes-rendus d'intervention chirurgicale | |
| - "autre" : si aucun type ne correspond clairement | |
| 2. **Extraire les informations d'identification** : | |
| - Médecin/praticien (nom complet si trouvé) | |
| - Centre médical/hôpital/clinique | |
| - Service médical | |
| - Adresse et contacts si mentionnés | |
| 3. **Décomposer en sections structurées** : | |
| - Identifier toutes les sections présentes (Technique, Résultats, Conclusion, etc.) | |
| - Extraire le contenu complet de chaque section | |
| - Identifier les sections manquantes mais attendues pour ce type de document | |
| 4. **Extraire les données médicales spécifiques** : | |
| - Examens/procédures réalisés | |
| - Mesures et valeurs numériques | |
| - Diagnostics et observations | |
| - Traitements ou recommandations | |
| - Dates et références | |
| 5. **Évaluer la complétude** : | |
| - Score de complétude (0-1) | |
| - Informations manquantes importantes | |
| - Qualité de la transcription | |
| Retournez un JSON avec cette structure exacte : | |
| {{ | |
| "document_type": "type identifié", | |
| "identification": {{ | |
| "physician": "nom complet du médecin ou 'Non identifié'", | |
| "center": "nom du centre médical ou 'Non identifié'", | |
| "service": "service médical ou 'Non identifié'", | |
| "address": "adresse complète si trouvée", | |
| "phone": "numéro de téléphone si trouvé" | |
| }}, | |
| "sections": {{ | |
| "nom_section": {{ | |
| "content": "contenu complet de la section", | |
| "confidence": 0.9, | |
| "keywords": ["mots", "clés", "identifiés"] | |
| }} | |
| }}, | |
| "medical_data": {{ | |
| "procedures": ["liste des procédures/examens"], | |
| "measurements": ["mesures avec valeurs numériques"], | |
| "diagnoses": ["diagnostics identifiés"], | |
| "treatments": ["traitements mentionnés"], | |
| "dates": ["dates importantes trouvées"], | |
| "anatomical_regions": ["régions anatomiques concernées"] | |
| }}, | |
| "completeness": {{ | |
| "score": 0.85, | |
| "missing_sections": ["sections manquantes attendues"], | |
| "missing_info": ["informations importantes manquantes"], | |
| "transcription_quality": "excellent|good|fair|poor" | |
| }}, | |
| "key_indicators": ["indicateurs clés pour le matching"] | |
| }}"""), | |
| ("human", "Analysez cette transcription médicale :\n\n{transcription}") | |
| ]) | |
| # Prompt pour extraire le contenu d'une section spécifique | |
| section_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """Vous êtes un expert en extraction d'informations médicales. | |
| On vous donne : | |
| 1. Une transcription médicale complète | |
| 2. Le nom d'une section spécifique à remplir dans un template | |
| 3. La description de ce qui est attendu dans cette section | |
| Votre tâche est d'extraire UNIQUEMENT le contenu pertinent de la transcription pour remplir cette section du template. | |
| Retournez UNIQUEMENT un JSON avec cette structure : | |
| {{ | |
| "extracted_content": "contenu extrait pertinent pour cette section", | |
| "confidence": 0.85, | |
| "can_fill": true/false, | |
| "missing_elements": ["éléments manquants pour compléter la section"], | |
| "source_indicators": ["mots/phrases de la transcription qui justifient l'extraction"] | |
| }} | |
| Si aucun contenu pertinent n'est trouvé, retournez can_fill: false."""), | |
| ("human", """Transcription complète : | |
| {transcription} | |
| Section à remplir : {section_name} | |
| Description attendue : {section_description} | |
| Extrayez le contenu pertinent :""") | |
| ]) | |
| # Nouveau prompt pour analyser les noms de fichiers | |
| filename_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """Vous êtes un expert en analyse de noms de fichiers médicaux. Analysez le nom de fichier fourni et extrayez les informations médicales qu'il contient. | |
| Retournez UNIQUEMENT un JSON avec cette structure : | |
| {{ | |
| "medical_keywords": ["mots-clés médicaux identifiés"], | |
| "document_type_indicators": ["indicateurs du type de document"], | |
| "specialty_indicators": ["indicateurs de spécialité médicale"], | |
| "center_indicators": ["indicateurs de centre médical"], | |
| "anatomical_regions": ["régions anatomiques mentionnées"], | |
| "procedure_type": "type de procédure principal ou null", | |
| "confidence_score": 0.85 | |
| }} | |
| Exemples d'analyse : | |
| - "ECHOGRAPHIE" → document_type_indicators: ["echographie"] | |
| - "ECHODOPPLER" → procedure_type: "echo-doppler" | |
| - "ARTERIEL" → medical_keywords: ["arteriel"] | |
| - "MEMBRES.SUPERIEURS" → anatomical_regions: ["membres supérieurs"] | |
| - "radioroseraie" → center_indicators: ["roseraie"], specialty_indicators: ["radiologie"]"""), | |
| ("human", "Analysez ce nom de fichier médical : {filename}") | |
| ]) | |
| self.content_analyzer = content_prompt | self.llm | |
| self.section_extractor = section_prompt | self.llm | |
| self.filename_analyzer = filename_prompt | self.llm | |
| logging.info("✅ GPT initialisé pour l'analyse intelligente avec noms de fichiers") | |
| except Exception as e: | |
| logging.error(f"❌ Erreur lors de l'initialisation GPT: {e}") | |
| self.llm = None | |
| def analyze_filename(self, filename: str) -> FilenameAnalysis: | |
| """Analyse le nom de fichier pour extraire des informations médicales""" | |
| # Nettoyage du nom de fichier | |
| clean_filename = os.path.basename(filename) | |
| clean_filename = clean_filename.replace('.docx', '').replace('.doc', '').replace('.rtf', '') | |
| # Analyse avec GPT si disponible | |
| if self.filename_analyzer: | |
| try: | |
| response = self.filename_analyzer.invoke({"filename": clean_filename}) | |
| result = response.content.strip() | |
| if result.startswith("```json"): | |
| result = result[7:] | |
| if result.endswith("```"): | |
| result = result[:-3] | |
| gpt_analysis = json.loads(result) | |
| return FilenameAnalysis( | |
| original_filename=filename, | |
| medical_keywords=gpt_analysis.get("medical_keywords", []), | |
| document_type_indicators=gpt_analysis.get("document_type_indicators", []), | |
| specialty_indicators=gpt_analysis.get("specialty_indicators", []), | |
| center_indicators=gpt_analysis.get("center_indicators", []), | |
| anatomical_regions=gpt_analysis.get("anatomical_regions", []), | |
| procedure_type=gpt_analysis.get("procedure_type"), | |
| confidence_score=gpt_analysis.get("confidence_score", 0.0) | |
| ) | |
| except Exception as e: | |
| logging.warning(f"Erreur analyse GPT du nom de fichier: {e}") | |
| # Analyse de fallback | |
| return self._analyze_filename_fallback(filename) | |
| def _analyze_filename_fallback(self, filename: str) -> FilenameAnalysis: | |
| """Analyse de fallback pour les noms de fichiers sans GPT""" | |
| clean_filename = os.path.basename(filename).lower() | |
| clean_filename = clean_filename.replace('.docx', '').replace('.doc', '').replace('.rtf', '') | |
| medical_keywords = [] | |
| document_type_indicators = [] | |
| specialty_indicators = [] | |
| center_indicators = [] | |
| anatomical_regions = [] | |
| procedure_type = None | |
| # Rechercher les mots-clés par catégorie | |
| for category, subcategories in self.filename_keywords.items(): | |
| for subcat, keywords in subcategories.items(): | |
| for keyword in keywords: | |
| if keyword in clean_filename: | |
| if category == "imagerie": | |
| document_type_indicators.append(subcat) | |
| if subcat in ["echographie", "irm", "scanner"]: | |
| procedure_type = subcat | |
| elif category == "specialites": | |
| specialty_indicators.append(subcat) | |
| elif category == "anatomie": | |
| anatomical_regions.append(subcat) | |
| elif category == "centres": | |
| center_indicators.append(subcat) | |
| medical_keywords.append(keyword) | |
| # Recherche de patterns spécifiques | |
| patterns = { | |
| "doppler": r"doppler|echo.*doppler", | |
| "arteriel": r"arteriel|artere", | |
| "veineux": r"veineux|veine", | |
| "membres_superieurs": r"membre.*superieur|bras", | |
| "membres_inferieurs": r"membre.*inferieur|jambe", | |
| "pelvien": r"pelvi|utérus|ovaire", | |
| "radiologie": r"radio" | |
| } | |
| for pattern_name, pattern in patterns.items(): | |
| if re.search(pattern, clean_filename): | |
| if pattern_name == "doppler": | |
| procedure_type = "echo-doppler" | |
| elif pattern_name in ["arteriel", "veineux"]: | |
| medical_keywords.append(pattern_name) | |
| elif "membre" in pattern_name: | |
| anatomical_regions.append(pattern_name.replace("_", " ")) | |
| elif pattern_name == "pelvien": | |
| anatomical_regions.append("pelvis") | |
| elif pattern_name == "radiologie": | |
| specialty_indicators.append("radiologie") | |
| # Calculer un score de confiance basé sur le nombre d'éléments trouvés | |
| total_elements = len(medical_keywords) + len(document_type_indicators) + len(specialty_indicators) | |
| confidence_score = min(1.0, total_elements / 5.0) # Normaliser sur 5 éléments max | |
| return FilenameAnalysis( | |
| original_filename=filename, | |
| medical_keywords=medical_keywords, | |
| document_type_indicators=document_type_indicators, | |
| specialty_indicators=specialty_indicators, | |
| center_indicators=center_indicators, | |
| anatomical_regions=anatomical_regions, | |
| procedure_type=procedure_type, | |
| confidence_score=confidence_score | |
| ) | |
| def calculate_filename_match_score(self, transcription_filename: str, transcription_analysis: Dict, | |
| template_filename: str) -> Tuple[float, List[str]]: | |
| """Calcule le score de correspondance basé sur les noms de fichiers""" | |
| # Analyser les deux noms de fichiers | |
| trans_filename_analysis = self.analyze_filename(transcription_filename) | |
| template_filename_analysis = self.analyze_filename(template_filename) | |
| score_components = [] | |
| matching_indicators = [] | |
| # 1. Correspondance des types de documents | |
| trans_types = set(trans_filename_analysis.document_type_indicators) | |
| template_types = set(template_filename_analysis.document_type_indicators) | |
| if trans_types & template_types: | |
| type_match_score = len(trans_types & template_types) / max(len(trans_types | template_types), 1) | |
| score_components.append(type_match_score * 0.4) # Poids important | |
| matching_indicators.extend(list(trans_types & template_types)) | |
| # 2. Correspondance des spécialités | |
| trans_specialties = set(trans_filename_analysis.specialty_indicators) | |
| template_specialties = set(template_filename_analysis.specialty_indicators) | |
| if trans_specialties & template_specialties: | |
| specialty_match_score = len(trans_specialties & template_specialties) / max(len(trans_specialties | template_specialties), 1) | |
| score_components.append(specialty_match_score * 0.25) | |
| matching_indicators.extend(list(trans_specialties & template_specialties)) | |
| # 3. Correspondance des régions anatomiques | |
| trans_anatomy = set(trans_filename_analysis.anatomical_regions) | |
| template_anatomy = set(template_filename_analysis.anatomical_regions) | |
| if trans_anatomy & template_anatomy: | |
| anatomy_match_score = len(trans_anatomy & template_anatomy) / max(len(trans_anatomy | template_anatomy), 1) | |
| score_components.append(anatomy_match_score * 0.2) | |
| matching_indicators.extend(list(trans_anatomy & template_anatomy)) | |
| # 4. Correspondance des centres médicaux | |
| trans_centers = set(trans_filename_analysis.center_indicators) | |
| template_centers = set(template_filename_analysis.center_indicators) | |
| if trans_centers & template_centers: | |
| center_match_score = len(trans_centers & template_centers) / max(len(trans_centers | template_centers), 1) | |
| score_components.append(center_match_score * 0.1) | |
| matching_indicators.extend(list(trans_centers & template_centers)) | |
| # 5. Correspondance des types de procédures | |
| if (trans_filename_analysis.procedure_type and | |
| template_filename_analysis.procedure_type and | |
| trans_filename_analysis.procedure_type == template_filename_analysis.procedure_type): | |
| score_components.append(0.05) | |
| matching_indicators.append(f"procédure: {trans_filename_analysis.procedure_type}") | |
| # 6. Bonus pour correspondance de mots-clés généraux | |
| trans_keywords = set(trans_filename_analysis.medical_keywords) | |
| template_keywords = set(template_filename_analysis.medical_keywords) | |
| common_keywords = trans_keywords & template_keywords | |
| if common_keywords: | |
| keyword_bonus = min(0.1, len(common_keywords) * 0.02) | |
| score_components.append(keyword_bonus) | |
| matching_indicators.extend(list(common_keywords)) | |
| # Score final | |
| final_score = sum(score_components) | |
| # Bonus si le nom de fichier de transcription contient "radiologie" et c'est cohérent | |
| if ("radiologie" in transcription_filename.lower() and | |
| any("radio" in indicator for indicator in matching_indicators)): | |
| final_score += 0.05 | |
| matching_indicators.append("cohérence radiologie") | |
| return min(1.0, final_score), matching_indicators | |
| def load_database(self, filepath: str): | |
| """Charge la base de données vectorielle""" | |
| self.parser.load_database(filepath) | |
| logging.info(f"✅ Base de données chargée: {len(self.parser.templates)} templates") | |
| def analyze_transcription_detailed(self, transcription: str, transcription_filename: str = "") -> Dict: | |
| """Analyse détaillée d'une transcription avec GPT, en incluant le nom de fichier""" | |
| if not self.content_analyzer: | |
| return self._fallback_analysis(transcription, transcription_filename) | |
| try: | |
| logging.info("🔍 Analyse détaillée de la transcription...") | |
| # Inclure l'analyse du nom de fichier dans le contexte | |
| enhanced_transcription = transcription | |
| if transcription_filename: | |
| enhanced_transcription = f"Nom de fichier: {transcription_filename}\n\nContenu:\n{transcription}" | |
| response = self.content_analyzer.invoke({"transcription": enhanced_transcription}) | |
| result = response.content.strip() | |
| # Nettoyer la réponse | |
| if result.startswith("```json"): | |
| result = result[7:] | |
| if result.endswith("```"): | |
| result = result[:-3] | |
| result = result.strip() | |
| analysis = json.loads(result) | |
| # Ajouter l'analyse du nom de fichier | |
| if transcription_filename: | |
| filename_analysis = self.analyze_filename(transcription_filename) | |
| analysis["filename_analysis"] = { | |
| "medical_keywords": filename_analysis.medical_keywords, | |
| "document_type_indicators": filename_analysis.document_type_indicators, | |
| "specialty_indicators": filename_analysis.specialty_indicators, | |
| "anatomical_regions": filename_analysis.anatomical_regions, | |
| "procedure_type": filename_analysis.procedure_type | |
| } | |
| logging.info("✅ Analyse détaillée terminée") | |
| return analysis | |
| except Exception as e: | |
| logging.error(f"❌ Erreur analyse détaillée: {e}") | |
| return self._fallback_analysis(transcription, transcription_filename) | |
| def _fallback_analysis(self, transcription: str, transcription_filename: str = "") -> Dict: | |
| """Analyse de fallback sans GPT""" | |
| text_lower = transcription.lower() | |
| # Détecter le type de document | |
| document_types = { | |
| "compte_rendu_imagerie": ["irm", "scanner", "échographie", "radiologie", "t1", "t2", "doppler"], | |
| "rapport_biologique": ["laboratoire", "analyse", "biologie", "sang", "urine", "sérum"], | |
| "lettre_medicale": ["lettre", "courrier", "correspondance", "cher confrère"], | |
| "compte_rendu_consultation": ["consultation", "examen clinique", "patient", "antécédents"] | |
| } | |
| detected_type = "autre" | |
| # Vérifier d'abord dans le nom de fichier | |
| if transcription_filename: | |
| filename_lower = transcription_filename.lower() | |
| for doc_type, keywords in document_types.items(): | |
| if sum(1 for kw in keywords if kw in filename_lower) >= 1: | |
| detected_type = doc_type | |
| break | |
| # Sinon vérifier dans le contenu | |
| if detected_type == "autre": | |
| for doc_type, keywords in document_types.items(): | |
| if sum(1 for kw in keywords if kw in text_lower) >= 2: | |
| detected_type = doc_type | |
| break | |
| # Extraire les sections basiques | |
| sections = {} | |
| section_patterns = { | |
| "technique": ["technique", "méthode", "protocole"], | |
| "résultats": ["résultat", "observation", "constatation"], | |
| "conclusion": ["conclusion", "diagnostic", "synthèse"] | |
| } | |
| for section, keywords in section_patterns.items(): | |
| for keyword in keywords: | |
| if keyword in text_lower: | |
| start = text_lower.find(keyword) | |
| end = min(len(transcription), start + 500) | |
| content = transcription[start:end] | |
| sections[section] = { | |
| "content": content, | |
| "confidence": 0.6, | |
| "keywords": [keyword] | |
| } | |
| break | |
| analysis = { | |
| "document_type": detected_type, | |
| "identification": { | |
| "physician": "Non identifié", | |
| "center": "Non identifié", | |
| "service": "Non identifié" | |
| }, | |
| "sections": sections, | |
| "medical_data": { | |
| "procedures": [], | |
| "measurements": re.findall(r'\d+\s*(?:mm|cm|ml)', transcription), | |
| "diagnoses": [], | |
| "treatments": [] | |
| }, | |
| "completeness": { | |
| "score": 0.6, | |
| "transcription_quality": "fair" | |
| } | |
| } | |
| # Ajouter l'analyse du nom de fichier en fallback | |
| if transcription_filename: | |
| filename_analysis = self.analyze_filename(transcription_filename) | |
| analysis["filename_analysis"] = { | |
| "medical_keywords": filename_analysis.medical_keywords, | |
| "document_type_indicators": filename_analysis.document_type_indicators, | |
| "specialty_indicators": filename_analysis.specialty_indicators, | |
| "anatomical_regions": filename_analysis.anatomical_regions, | |
| "procedure_type": filename_analysis.procedure_type | |
| } | |
| return analysis | |
| def calculate_type_match_score(self, transcription_analysis: Dict, template_info: TemplateInfo) -> float: | |
| """Calcule le score de correspondance du type de document""" | |
| transcription_type = transcription_analysis.get("document_type", "") | |
| template_type = template_info.type.lower() | |
| # Mapping des types | |
| type_mappings = { | |
| "compte_rendu_imagerie": ["irm", "scanner", "échographie", "imagerie", "radiologie"], | |
| "rapport_biologique": ["laboratoire", "biologie", "analyse"], | |
| "lettre_medicale": ["lettre", "courrier", "correspondance"], | |
| "compte_rendu_consultation": ["consultation", "examen"] | |
| } | |
| if transcription_type in type_mappings: | |
| expected_keywords = type_mappings[transcription_type] | |
| matches = sum(1 for kw in expected_keywords if kw in template_type) | |
| return min(1.0, matches / len(expected_keywords) * 2) | |
| return 0.3 | |
| def calculate_physician_match_score(self, transcription_analysis: Dict, template_info: TemplateInfo) -> float: | |
| """Calcule le score de correspondance du médecin""" | |
| transcription_physician = transcription_analysis.get("identification", {}).get("physician", "") | |
| template_physician = template_info.medecin | |
| if not transcription_physician or transcription_physician == "Non identifié": | |
| return 0.5 | |
| if not template_physician: | |
| return 0.5 | |
| # Comparaison des noms | |
| trans_words = set(transcription_physician.lower().split()) | |
| temp_words = set(template_physician.lower().split()) | |
| if trans_words & temp_words: | |
| return 1.0 | |
| return 0.0 | |
| def calculate_center_match_score(self, transcription_analysis: Dict, template_info: TemplateInfo) -> float: | |
| """Calcule le score de correspondance du centre médical""" | |
| transcription_center = transcription_analysis.get("identification", {}).get("center", "") | |
| template_center = getattr(template_info, 'centre_medical', '') or getattr(template_info, 'center', '') | |
| if not transcription_center or transcription_center == "Non identifié": | |
| return 0.5 | |
| if not template_center: | |
| return 0.5 | |
| # Comparaison des centres | |
| if transcription_center.lower() in template_center.lower() or template_center.lower() in transcription_center.lower(): | |
| return 1.0 | |
| return 0.0 | |
| def calculate_section_matches(self, transcription: str, transcription_analysis: Dict, template_info: TemplateInfo) -> Dict[str, SectionMatch]: | |
| """Calcule les correspondances pour chaque section du template""" | |
| section_matches = {} | |
| transcription_sections = transcription_analysis.get("sections", {}) | |
| for section_name in template_info.detected_sections: | |
| section_match = self._match_single_section( | |
| section_name, | |
| transcription, | |
| transcription_sections, | |
| template_info | |
| ) | |
| section_matches[section_name] = section_match | |
| return section_matches | |
| def _match_single_section(self, section_name: str, transcription: str, | |
| transcription_sections: Dict, template_info: TemplateInfo) -> SectionMatch: | |
| """Analyse le matching d'une section spécifique""" | |
| section_lower = section_name.lower() | |
| # Rechercher une section correspondante dans l'analyse | |
| best_match_content = "" | |
| best_confidence = 0.0 | |
| for analyzed_section, section_data in transcription_sections.items(): | |
| if isinstance(section_data, dict): | |
| content = section_data.get("content", "") | |
| confidence = section_data.get("confidence", 0.0) | |
| # Vérifier la correspondance par mots-clés | |
| section_keywords = section_lower.split() | |
| analyzed_keywords = analyzed_section.lower().split() | |
| keyword_match = len(set(section_keywords) & set(analyzed_keywords)) / max(len(section_keywords), 1) | |
| if keyword_match > 0.3 and confidence > best_confidence: | |
| best_match_content = content | |
| best_confidence = confidence * keyword_match | |
| # Si GPT est disponible, utiliser l'extraction spécialisée | |
| if self.section_extractor and not best_match_content: | |
| try: | |
| section_description = f"Section {section_name} d'un document médical" | |
| response = self.section_extractor.invoke({ | |
| "transcription": transcription, | |
| "section_name": section_name, | |
| "section_description": section_description | |
| }) | |
| result = response.content.strip() | |
| if result.startswith("```json"): | |
| result = result[7:] | |
| if result.endswith("```"): | |
| result = result[:-3] | |
| extraction_result = json.loads(result) | |
| if extraction_result.get("can_fill", False): | |
| best_match_content = extraction_result.get("extracted_content", "") | |
| best_confidence = extraction_result.get("confidence", 0.0) | |
| except Exception as e: | |
| logging.warning(f"Erreur extraction section {section_name}: {e}") | |
| # Évaluer si la section peut être remplie | |
| can_fill = bool(best_match_content) and len(best_match_content.strip()) > 10 | |
| missing_info = [] if can_fill else [f"Contenu manquant pour {section_name}"] | |
| return SectionMatch( | |
| section_name=section_name, | |
| confidence=best_confidence, | |
| extracted_content=best_match_content, | |
| can_fill=can_fill, | |
| missing_info=missing_info | |
| ) | |
| def calculate_fillability_score(self, section_matches: Dict[str, SectionMatch], template_info: TemplateInfo) -> Tuple[float, float, List[str]]: | |
| """Calcule le score de remplissage possible du template""" | |
| total_sections = len(template_info.detected_sections) | |
| fillable_sections = sum(1 for match in section_matches.values() if match.can_fill) | |
| critical_sections = sum(1 for match in section_matches.values() if match.can_fill and match.confidence > 0.7) | |
| if total_sections == 0: | |
| return 0.0, 0.0, ["Template sans sections"] | |
| fillability_score = fillable_sections / total_sections | |
| filling_percentage = (critical_sections / total_sections) * 100 | |
| missing_critical = [ | |
| match.section_name for match in section_matches.values() | |
| if not match.can_fill | |
| ] | |
| return fillability_score, filling_percentage, missing_critical | |
| def smart_match_transcription(self, transcription: str, transcription_filename: str = "", k: int = 10) -> List[TemplateMatch]: | |
| """Matching intelligent entre transcription et templates avec analyse des noms de fichiers""" | |
| if not self.parser.templates: | |
| logging.error("Aucun template chargé") | |
| return [] | |
| logging.info("Analyse intelligente de la transcription...") | |
| # 1. Analyser la transcription en détail (incluant le nom de fichier) | |
| analysis = self.analyze_transcription_detailed(transcription, transcription_filename) | |
| # 2. Pré-filtrer les templates par type et nom de fichier | |
| candidate_templates = self._filter_templates_by_type_and_filename(analysis, transcription_filename) | |
| if not candidate_templates: | |
| logging.warning("Aucun template candidat trouvé, utilisation de tous les templates") | |
| candidate_templates = list(self.parser.templates.keys()) | |
| logging.info(f"{len(candidate_templates)} templates candidats retenus") | |
| # 3. Analyser chaque template candidat | |
| template_matches = [] | |
| for template_id in candidate_templates: | |
| template_info = self.parser.get_template_info(template_id) | |
| if not template_info: | |
| continue | |
| # Calculer les scores de matching | |
| type_score = self.calculate_type_match_score(analysis, template_info) | |
| physician_score = self.calculate_physician_match_score(analysis, template_info) | |
| center_score = self.calculate_center_match_score(analysis, template_info) | |
| # Nouveau score basé sur les noms de fichiers | |
| filename_score, filename_indicators = self.calculate_filename_match_score( | |
| transcription_filename, analysis, template_info.filepath | |
| ) | |
| # Analyser les correspondances de sections | |
| section_matches = self.calculate_section_matches(transcription, analysis, template_info) | |
| # Calculer le score de remplissage | |
| fillability_score, filling_percentage, missing_critical = self.calculate_fillability_score(section_matches, template_info) | |
| # Calculer le score de contenu (vectoriel) | |
| content_score = self._calculate_content_similarity(transcription, template_id) | |
| # Score global pondéré MODIFIÉ pour inclure le filename score | |
| overall_score = ( | |
| type_score * 0.25 + # Type de document | |
| fillability_score * 0.3 + # Capacité de remplissage | |
| filename_score * 0.2 + # NOUVEAU: Score nom de fichier | |
| content_score * 0.15 + # Similarité de contenu | |
| physician_score * 0.05 + # Médecin | |
| center_score * 0.05 # Centre médical | |
| ) | |
| # Déterminer le niveau de confiance | |
| confidence_level = self._determine_confidence_level(overall_score, fillability_score, analysis) | |
| # Extraire les données pour le remplissage | |
| extracted_data = self._extract_template_data(section_matches) | |
| template_match = TemplateMatch( | |
| template_id=template_id, | |
| template_info=template_info, | |
| overall_score=overall_score, | |
| type_match_score=type_score, | |
| physician_match_score=physician_score, | |
| center_match_score=center_score, | |
| content_match_score=content_score, | |
| filename_match_score=filename_score, # Nouveau | |
| fillability_score=fillability_score, | |
| section_matches=section_matches, | |
| confidence_level=confidence_level, | |
| can_be_filled=fillability_score > 0.6, | |
| filling_percentage=filling_percentage, | |
| missing_critical_info=missing_critical, | |
| extracted_data=extracted_data, | |
| filename_indicators=filename_indicators # Nouveau | |
| ) | |
| template_matches.append(template_match) | |
| # 4. Trier par score global | |
| template_matches.sort(key=lambda x: x.overall_score, reverse=True) | |
| logging.info(f"{len(template_matches)} templates analysés") | |
| return template_matches[:k] | |
| def _filter_templates_by_type_and_filename(self, analysis: Dict, transcription_filename: str) -> List[str]: | |
| """Filtre les templates par type de document et nom de fichier""" | |
| document_type = analysis.get("document_type", "") | |
| filename_analysis = analysis.get("filename_analysis", {}) | |
| # Critères de filtrage élargis | |
| filter_keywords = set() | |
| # Ajouter les mots-clés du type de document | |
| if document_type != "autre": | |
| type_keywords = { | |
| "compte_rendu_imagerie": ["irm", "scanner", "echo", "radio", "imagerie"], | |
| "rapport_biologique": ["labo", "biologie", "analyse", "sang"], | |
| "lettre_medicale": ["lettre", "courrier"], | |
| "compte_rendu_consultation": ["consultation", "examen", "clinique"] | |
| } | |
| filter_keywords.update(type_keywords.get(document_type, [])) | |
| # Ajouter les mots-clés du nom de fichier | |
| if filename_analysis: | |
| filter_keywords.update(filename_analysis.get("medical_keywords", [])) | |
| filter_keywords.update(filename_analysis.get("document_type_indicators", [])) | |
| filter_keywords.update(filename_analysis.get("specialty_indicators", [])) | |
| # Si pas de critères spécifiques, retourner tous les templates | |
| if not filter_keywords: | |
| return list(self.parser.templates.keys()) | |
| # Filtrer les templates | |
| matching_templates = [] | |
| for template_id, template_info in self.parser.templates.items(): | |
| template_filepath_lower = template_info.filepath.lower() | |
| template_type_lower = template_info.type.lower() | |
| # Vérifier les correspondances dans le nom de fichier du template | |
| filename_matches = sum(1 for keyword in filter_keywords if keyword in template_filepath_lower) | |
| type_matches = sum(1 for keyword in filter_keywords if keyword in template_type_lower) | |
| # Prendre le template s'il y a au moins une correspondance | |
| if filename_matches > 0 or type_matches > 0: | |
| matching_templates.append(template_id) | |
| return matching_templates if matching_templates else list(self.parser.templates.keys()) | |
| def _calculate_content_similarity(self, transcription: str, template_id: str) -> float: | |
| """Calcule la similarité de contenu via recherche vectorielle""" | |
| try: | |
| results = self.parser.search_similar_templates(transcription, k=50) | |
| for tid, score in results: | |
| if tid == template_id: | |
| return score | |
| return 0.0 | |
| except Exception as e: | |
| logging.warning(f"Erreur similarité vectorielle: {e}") | |
| return 0.0 | |
| def _determine_confidence_level(self, overall_score: float, fillability_score: float, analysis: Dict) -> str: | |
| """Détermine le niveau de confiance global""" | |
| transcription_quality = analysis.get("completeness", {}).get("transcription_quality", "fair") | |
| # Ajustement selon la qualité de transcription | |
| quality_modifier = { | |
| "excellent": 1.0, | |
| "good": 0.9, | |
| "fair": 0.8, | |
| "poor": 0.6 | |
| }.get(transcription_quality, 0.8) | |
| adjusted_score = overall_score * quality_modifier | |
| if adjusted_score > 0.8 and fillability_score > 0.8: | |
| return "excellent" | |
| elif adjusted_score > 0.6 and fillability_score > 0.6: | |
| return "good" | |
| elif adjusted_score > 0.4 and fillability_score > 0.4: | |
| return "fair" | |
| else: | |
| return "poor" | |
| def _extract_template_data(self, section_matches: Dict[str, SectionMatch]) -> Dict[str, str]: | |
| """Extrait les données prêtes pour remplir le template""" | |
| extracted_data = {} | |
| for section_name, match in section_matches.items(): | |
| if match.can_fill and match.extracted_content: | |
| # Nettoyer et formater le contenu | |
| content = match.extracted_content.strip() | |
| if content: | |
| extracted_data[section_name] = content | |
| return extracted_data | |
| def print_smart_results(self, matches: List[TemplateMatch]): | |
| """Affichage détaillé des résultats de matching intelligent""" | |
| if not matches: | |
| print("Aucun résultat trouvé") | |
| return | |
| print(f"\n{'='*100}") | |
| print(f"RÉSULTATS DE MATCHING INTELLIGENT - {len(matches)} templates analysés") | |
| print(f"{'='*100}") | |
| for i, match in enumerate(matches, 1): | |
| print(f"\nTEMPLATE #{i}") | |
| print(f"{'='*60}") | |
| print(f"ID: {match.template_id}") | |
| print(f"Score global: {match.overall_score:.3f}") | |
| print(f"Confiance: {match.confidence_level}") | |
| print(f"Template: {os.path.basename(match.template_info.filepath)}") | |
| print(f"Médecin: {match.template_info.medecin}") | |
| print(f"\nSCORES DÉTAILLÉS:") | |
| print(f" • Type de document: {match.type_match_score:.3f}") | |
| print(f" • Nom de fichier: {match.filename_match_score:.3f}") # Nouveau | |
| print(f" • Médecin: {match.physician_match_score:.3f}") | |
| print(f" • Centre: {match.center_match_score:.3f}") | |
| print(f" • Contenu: {match.content_match_score:.3f}") | |
| print(f" • Remplissage: {match.fillability_score:.3f}") | |
| # Afficher les indicateurs du nom de fichier | |
| if match.filename_indicators: | |
| print(f"\nINDICATEURS NOM DE FICHIER:") | |
| print(f" • Correspondances: {', '.join(match.filename_indicators)}") | |
| print(f"\nCAPACITÉ DE REMPLISSAGE:") | |
| print(f" • Peut être rempli: {'OUI' if match.can_be_filled else 'NON'}") | |
| print(f" • Pourcentage: {match.filling_percentage:.1f}%") | |
| if match.section_matches: | |
| fillable = [s for s in match.section_matches.values() if s.can_fill] | |
| missing = [s for s in match.section_matches.values() if not s.can_fill] | |
| print(f" • Sections remplissables: {len(fillable)}/{len(match.section_matches)}") | |
| if fillable: | |
| print(f" • Remplissables: {', '.join([s.section_name for s in fillable])}") | |
| if missing: | |
| print(f" • Manquantes: {', '.join([s.section_name for s in missing])}") | |
| if match.extracted_data: | |
| print(f"\nDONNÉES EXTRAITES:") | |
| for section, content in match.extracted_data.items(): | |
| preview = content[:100] + "..." if len(content) > 100 else content | |
| print(f" • {section}: {preview}") | |
| print(f"{'='*60}") | |
| def get_best_fillable_match(self, transcription: str, transcription_filename: str = "") -> Optional[TemplateMatch]: | |
| """Retourne le meilleur template qui peut être effectivement rempli""" | |
| matches = self.smart_match_transcription(transcription, transcription_filename, k=10) | |
| # Filtrer uniquement les templates qui peuvent être remplis | |
| fillable_matches = [m for m in matches if m.can_be_filled and m.fillability_score > 0.6] | |
| return fillable_matches[0] if fillable_matches else None | |
| def test_with_provided_example(self): | |
| """Teste le système avec l'exemple fourni par l'utilisateur""" | |
| # Transcription d'exemple fournie | |
| transcription_filename = "default.73.931915433.rtf_3650535_radiologie.doc" | |
| transcription_content = """**Technique :** 3 plans T2, diffusion axiale, T2 grand champ et T1 Dixon. | |
| **Résultats :** | |
| * L'utérus est antéversé, antéfléchi, latéralisé à droite, de taille normale pour l'âge. | |
| * L'endomètre est fin, mesurant moins de 2 mm. | |
| * Pas d'adénomyose franche. | |
| * Aspect normal du col utérin et du vagin. | |
| * L'ovaire droit, en position postérieure, mesure 18 x 11 mm avec présence de 4 follicules. | |
| * L'ovaire gauche, en position latéro-utérine, présente un volumineux endométriome de 45 mm, typique en hypersignal T1 Dixon. | |
| * Deuxième endométriome accolé à l'ovaire droit, périphérique, mesurant 13 mm. | |
| * Pas d'épaississement marqué du torus ni des ligaments utéro-sacrés. | |
| * Pas d'autre localisation pelvienne. | |
| * Pas d'épanchement pelvien. | |
| * Pas d'anomalie de la vessie. | |
| * Pas d'adénomégalie pelvienne, pas de dilatation des uretères. | |
| **Conclusion :** | |
| * Endométriome ovarien droit périphérique de 13 mm. | |
| * Endométriome ovarien gauche centro-ovarien de 45 mm.""" | |
| print("ANALYSE DE L'EXEMPLE FOURNI") | |
| print("="*80) | |
| print(f"Nom de fichier: {transcription_filename}") | |
| print(f"Contenu: {len(transcription_content.split())} mots") | |
| # Analyser le nom de fichier | |
| filename_analysis = self.analyze_filename(transcription_filename) | |
| print(f"\nANALYSE DU NOM DE FICHIER:") | |
| print(f"Mots-clés médicaux: {filename_analysis.medical_keywords}") | |
| print(f"Indicateurs de type: {filename_analysis.document_type_indicators}") | |
| print(f"Spécialités: {filename_analysis.specialty_indicators}") | |
| print(f"Centres: {filename_analysis.center_indicators}") | |
| print(f"Régions anatomiques: {filename_analysis.anatomical_regions}") | |
| print(f"Type de procédure: {filename_analysis.procedure_type}") | |
| print(f"Score de confiance: {filename_analysis.confidence_score:.3f}") | |
| # Effectuer le matching | |
| print(f"\nMATCHING EN COURS...") | |
| results = self.smart_match_transcription(transcription_content, transcription_filename, k=5) | |
| # Afficher les résultats | |
| self.print_smart_results(results) | |
| # Afficher le meilleur match | |
| best_match = self.get_best_fillable_match(transcription_content, transcription_filename) | |
| if best_match: | |
| print(f"\nMEILLEUR TEMPLATE REMPLISSABLE:") | |
| print(f"Template: {best_match.template_id}") | |
| print(f"Score global: {best_match.overall_score:.3f}") | |
| print(f"Score nom de fichier: {best_match.filename_match_score:.3f}") | |
| print(f"Indicateurs nom de fichier: {', '.join(best_match.filename_indicators)}") | |
| print(f"Capacité de remplissage: {best_match.filling_percentage:.1f}%") | |
| def main(): | |
| """Fonction principale pour tester le matching intelligent avec noms de fichiers""" | |
| # Demander le chemin de la base de données | |
| db_path = input("Chemin vers la base de données (templates/medical_templates.pkl): ").strip() | |
| if not db_path: | |
| db_path = "medical_templates.pkl" | |
| if not os.path.exists(db_path): | |
| print(f"Fichier de base de données non trouvé: {db_path}") | |
| return | |
| print(f"\nInitialisation du système de matching intelligent...") | |
| # Initialiser le matcher | |
| matcher = SmartTranscriptionMatcher(db_path) | |
| # Options de test | |
| print(f"\nOPTIONS DE TEST:") | |
| print("1. Utiliser l'exemple fourni (radiologie)") | |
| print("2. Saisie manuelle") | |
| print("3. Lecture depuis fichier") | |
| choice = input("\nChoisissez une option (1-3): ").strip() | |
| if choice == "1": | |
| # Utiliser l'exemple fourni par l'utilisateur | |
| matcher.test_with_provided_example() | |
| return | |
| elif choice == "2": | |
| # Saisie manuelle | |
| transcription_filename = input("Nom du fichier de transcription: ").strip() | |
| print("\nEntrez votre transcription (tapez 'FIN' sur une ligne vide pour terminer):") | |
| lines = [] | |
| while True: | |
| line = input() | |
| if line.strip() == 'FIN': | |
| break | |
| lines.append(line) | |
| transcription = '\n'.join(lines) | |
| elif choice == "3": | |
| # Lecture depuis fichier | |
| filepath = input("Chemin vers le fichier de transcription: ").strip() | |
| try: | |
| with open(filepath, 'r', encoding='utf-8') as f: | |
| transcription = f.read() | |
| transcription_filename = os.path.basename(filepath) | |
| print(f"Fichier lu: {len(transcription.split())} mots") | |
| except Exception as e: | |
| print(f"Erreur de lecture: {e}") | |
| return | |
| else: | |
| print("Choix invalide") | |
| return | |
| if not transcription.strip(): | |
| print("Aucune transcription fournie") | |
| return | |
| print(f"\nAnalyse intelligente en cours...") | |
| # Effectuer le matching intelligent | |
| results = matcher.smart_match_transcription(transcription, transcription_filename, k=5) | |
| # Afficher les résultats | |
| matcher.print_smart_results(results) | |
| # Afficher le meilleur match remplissable | |
| best_fillable = matcher.get_best_fillable_match(transcription, transcription_filename) | |
| if best_fillable: | |
| print(f"\nMEILLEUR TEMPLATE REMPLISSABLE:") | |
| print(f"{'='*60}") | |
| print(f"Template: {best_fillable.template_id}") | |
| print(f"Score global: {best_fillable.overall_score:.3f}") | |
| print(f"Score nom de fichier: {best_fillable.filename_match_score:.3f}") | |
| print(f"Indicateurs fichier: {', '.join(best_fillable.filename_indicators)}") | |
| print(f"Capacité de remplissage: {best_fillable.filling_percentage:.1f}%") | |
| print(f"Confiance: {best_fillable.confidence_level}") | |
| if best_fillable.extracted_data: | |
| print(f"\nTEMPLATE PRÊT À REMPLIR:") | |
| print(f"Sections avec données extraites:") | |
| for section, content in best_fillable.extracted_data.items(): | |
| print(f"\n[{section.upper()}]") | |
| print(f"{content}") | |
| # Proposer de voir plus de détails | |
| show_details = input(f"\nAfficher les détails complets du template? (y/n): ").strip().lower() | |
| if show_details == 'y': | |
| matcher.parser.print_template_summary(best_fillable.template_id) | |
| # Proposer de générer le template rempli | |
| generate_filled = input(f"\nGénérer le template rempli? (y/n): ").strip().lower() | |
| if generate_filled == 'y': | |
| generate_filled_template(matcher, best_fillable, transcription) | |
| else: | |
| print(f"\nAucun template ne peut être suffisamment rempli avec cette transcription") | |
| if results: | |
| print(f"\nMeilleurs candidats (mais insuffisamment remplissables):") | |
| for i, result in enumerate(results[:3], 1): | |
| print(f"{i}. {result.template_id} - Score: {result.overall_score:.3f}") | |
| print(f" Score fichier: {result.filename_match_score:.3f}") | |
| print(f" Remplissage: {result.filling_percentage:.1f}%") | |
| def generate_filled_template(matcher: SmartTranscriptionMatcher, best_match: TemplateMatch, transcription: str): | |
| """Génère un template rempli avec les données extraites""" | |
| print(f"\nGÉNÉRATION DU TEMPLATE REMPLI") | |
| print(f"{'='*80}") | |
| try: | |
| # Récupérer le contenu du template original | |
| template_info = best_match.template_info | |
| # Charger le contenu du fichier template | |
| if os.path.exists(template_info.filepath): | |
| with open(template_info.filepath, 'r', encoding='utf-8') as f: | |
| template_content = f.read() | |
| else: | |
| print(f"Fichier template non trouvé: {template_info.filepath}") | |
| return | |
| filled_content = template_content | |
| replacement_count = 0 | |
| # Remplacer les sections avec les données extraites | |
| for section_name, extracted_content in best_match.extracted_data.items(): | |
| # Rechercher des patterns de remplacement dans le template | |
| patterns = [ | |
| f"[{section_name.upper()}]", | |
| f"[{section_name}]", | |
| f"{{{section_name}}}", | |
| f"__{section_name}__", | |
| f"<!-- {section_name} -->", | |
| f"_{section_name}_", | |
| ] | |
| # Rechercher aussi par mots-clés de la section | |
| section_keywords = section_name.lower().split() | |
| for keyword in section_keywords: | |
| patterns.extend([ | |
| f"[{keyword.upper()}]", | |
| f"{{{keyword}}}", | |
| f"__{keyword}__" | |
| ]) | |
| # Essayer de remplacer avec chaque pattern | |
| for pattern in patterns: | |
| if pattern in filled_content: | |
| filled_content = filled_content.replace(pattern, extracted_content) | |
| replacement_count += 1 | |
| print(f"Section '{section_name}' remplie ({pattern})") | |
| break | |
| else: | |
| # Si aucun pattern trouvé, essayer de trouver la section par similarité | |
| lines = filled_content.split('\n') | |
| for i, line in enumerate(lines): | |
| if any(keyword in line.lower() for keyword in section_keywords): | |
| # Insérer le contenu après cette ligne | |
| lines.insert(i + 1, f"\n{extracted_content}\n") | |
| filled_content = '\n'.join(lines) | |
| replacement_count += 1 | |
| print(f"Section '{section_name}' insérée après ligne similaire") | |
| break | |
| else: | |
| print(f"Section '{section_name}' non intégrée - pattern non trouvé") | |
| # Sauvegarder le template rempli | |
| output_filename = f"template_rempli_{best_match.template_id}.txt" | |
| try: | |
| with open(output_filename, 'w', encoding='utf-8') as f: | |
| f.write(filled_content) | |
| print(f"\nTemplate rempli sauvegardé: {output_filename}") | |
| except Exception as e: | |
| print(f"Erreur lors de la sauvegarde: {e}") | |
| # Proposer d'afficher un aperçu | |
| show_preview = input(f"\nAfficher un aperçu du template rempli? (y/n): ").strip().lower() | |
| if show_preview == 'y': | |
| print(f"\n{'='*80}") | |
| print(f"APERÇU DU TEMPLATE REMPLI") | |
| print(f"{'='*80}") | |
| # Afficher les premiers 2000 caractères | |
| preview = filled_content[:2000] | |
| if len(filled_content) > 2000: | |
| preview += "\n\n[... Tronqué pour l'aperçu ...]" | |
| print(preview) | |
| print(f"\n{'='*80}") | |
| except Exception as e: | |
| print(f"Erreur lors de la génération: {e}") | |
| logging.error(f"Erreur génération template: {e}") | |
| def analyze_transcription_quality(transcription: str) -> Dict: | |
| """Analyse rapide de la qualité d'une transcription""" | |
| words = transcription.split() | |
| sentences = transcription.split('.') | |
| # Métriques de qualité | |
| metrics = { | |
| "word_count": len(words), | |
| "sentence_count": len([s for s in sentences if s.strip()]), | |
| "avg_sentence_length": len(words) / max(len(sentences), 1), | |
| "has_medical_terms": bool(re.search(r'\b(mm|cm|ml|IRM|scanner|échographie|résultats?|conclusion)\b', transcription.lower())), | |
| "has_measurements": bool(re.search(r'\d+\s*(mm|cm|ml)', transcription)), | |
| "has_sections": bool(re.search(r'\b(technique|résultats?|conclusion|indication)\b', transcription.lower())), | |
| "structure_score": 0 | |
| } | |
| # Calculer un score de structure | |
| structure_indicators = ['technique', 'résultat', 'conclusion', 'indication', 'observation'] | |
| structure_count = sum(1 for indicator in structure_indicators if indicator in transcription.lower()) | |
| metrics["structure_score"] = min(1.0, structure_count / 3.0) | |
| # Évaluation globale | |
| if (metrics["word_count"] > 100 and | |
| metrics["has_medical_terms"] and | |
| metrics["has_sections"] and | |
| metrics["structure_score"] > 0.5): | |
| quality = "excellent" | |
| elif (metrics["word_count"] > 50 and | |
| metrics["has_medical_terms"] and | |
| metrics["structure_score"] > 0.3): | |
| quality = "good" | |
| elif metrics["word_count"] > 20 and metrics["has_medical_terms"]: | |
| quality = "fair" | |
| else: | |
| quality = "poor" | |
| metrics["overall_quality"] = quality | |
| return metrics | |
| if __name__ == "__main__": | |
| main() |