""" Module de chargement des données depuis Hugging Face """ import os import traceback import pandas as pd from datasets import load_dataset, Dataset, DatasetDict import datasets as hf_datasets from huggingface_hub import HfApi, hf_hub_download import huggingface_hub as hf_hub import pyarrow as pa from config import HF_TOKEN, DATASET_ID, REQUIRED_COLUMNS, MESSAGES class DataLoader: """Classe responsable du chargement des données depuis différentes sources""" def __init__(self): self.df = None def load_data(self): """Charge les données du dataset avec gestion robuste des erreurs Arrow.""" try: print(MESSAGES["loading"]) print(f"📋 Dataset ID: {DATASET_ID}") print(f"📋 Token disponible: {'Oui' if HF_TOKEN else 'Non'}") self.df = None # Stratégie 1: Chargement direct Hugging Face avec gestion des erreurs Arrow print("🔄 Stratégie 1: chargement via datasets HF avec protection Arrow") hf_msg = self._safe_load_from_hf() if self.df is None: if hf_msg: print(f"❌ Chargement HF échoué: {hf_msg}") # Stratégie 2: Charger directement les fichiers du repo print("🔄 Stratégie 2: chargement via fichiers du dépôt Hugging Face") fallback_msg = self._fallback_load_from_repo_files() if self.df is None: if fallback_msg: print(f"❌ Chargement via fichiers du dépôt échoué: {fallback_msg}") # Stratégie 3: Dernier recours avec échantillon local print("🔄 Stratégie 3: chargement du fichier local de secours") local_msg = self._load_local_sample() if self.df is None: print(f"❌ Chargement local échoué: {local_msg}") return MESSAGES["no_data"] print(f"📊 Données chargées: {len(self.df)} lignes") print(f"📊 Colonnes disponibles: {list(self.df.columns)}") # Nettoyage et validation return self._clean_and_validate_data() except Exception as e: print(f"❌ Erreur critique dans load_data: {e}") import traceback traceback.print_exc() return f"❌ Erreur critique lors du chargement: {str(e)}" def _clean_and_validate_data(self): """Nettoie et valide les données chargées avec validation robuste""" if self.df is None or self.df.empty: print("❌ DataFrame vide ou None") return "❌ Aucune donnée à valider" print(f"📊 Validation des données: {len(self.df)} lignes, {len(self.df.columns)} colonnes") # Vérification des colonnes requises missing_cols = [col for col in REQUIRED_COLUMNS if col not in self.df.columns] if missing_cols: print(f"❌ Colonnes manquantes: {missing_cols}") print(f"📊 Colonnes disponibles: {list(self.df.columns)}") self.df = None return f"❌ Colonnes manquantes: {missing_cols}" # Validation et nettoyage des colonnes requises initial_len = len(self.df) # Nettoyer chaque colonne requise spécifiquement for col in REQUIRED_COLUMNS: if col in self.df.columns: original_count = self.df[col].notna().sum() if col == 'millesime': # Validation spéciale pour millesime (doit être une année valide) self.df[col] = self._validate_year_column(self.df[col]) elif col == 'surfparc': # Validation spéciale pour surfparc (doit être un nombre positif) self.df[col] = self._validate_numeric_positive(self.df[col]) elif col == 'numparcell': # Validation pour numéro de parcelle (string non vide) self.df[col] = self._validate_string_column(self.df[col]) valid_count = self.df[col].notna().sum() if valid_count < original_count: print(f"📊 Colonne {col}: {original_count} → {valid_count} valeurs valides") # Supprimer les lignes avec des valeurs manquantes dans les colonnes requises self.df = self.df.dropna(subset=REQUIRED_COLUMNS) # Validation supplémentaire self.df = self._additional_data_validation(self.df) final_len = len(self.df) print(f"📊 Avant validation: {initial_len} lignes") print(f"📊 Après validation: {final_len} lignes") if final_len == 0: print("❌ Aucune ligne valide après nettoyage") self.df = None return "❌ Aucune ligne valide après nettoyage" return MESSAGES["success"] def _validate_year_column(self, series): """Valide une colonne d'année (entre 1990 et 2030)""" try: numeric_series = pd.to_numeric(series, errors='coerce') # Filtrer les années valides valid_mask = (numeric_series >= 1990) & (numeric_series <= 2030) result = numeric_series.where(valid_mask) return result except Exception: return series def _validate_numeric_positive(self, series): """Valide une colonne numérique positive""" try: numeric_series = pd.to_numeric(series, errors='coerce') # Filtrer les valeurs positives valid_mask = numeric_series > 0 result = numeric_series.where(valid_mask) return result except Exception: return series def _validate_string_column(self, series): """Valide une colonne string (non vide, non null)""" try: # Convertir en string et nettoyer string_series = series.astype(str).str.strip() # Remplacer les valeurs vides par NaN string_series = string_series.replace(['', 'nan', 'null', 'NULL', 'None'], None) return string_series except Exception: return series def _additional_data_validation(self, df): """Validations supplémentaires sur le DataFrame""" if df is None or df.empty: return df try: # Supprimer les doublons complets initial_len = len(df) df = df.drop_duplicates() if len(df) < initial_len: print(f"📊 {initial_len - len(df)} doublons supprimés") # Nettoyer les colonnes texte problématiques for col in df.columns: if df[col].dtype == 'object': # Supprimer les lignes avec des valeurs contenant uniquement des caractères spéciaux problematic_mask = df[col].astype(str).str.match(r'^[^\w\s]*$', na=False) if problematic_mask.any(): print(f"📊 {problematic_mask.sum()} lignes avec caractères problématiques dans {col}") df.loc[problematic_mask, col] = None return df except Exception as e: print(f"❌ Erreur validation supplémentaire: {e}") return df def _safe_load_from_hf(self): """Charge les données depuis Hugging Face avec gestion des erreurs Arrow.""" try: # Tentative de chargement standard print("🔄 Tentative de chargement standard...") dataset = load_dataset(DATASET_ID, token=HF_TOKEN, trust_remote_code=True) # Conversion en DataFrame avec gestion des types if isinstance(dataset, DatasetDict): # Prendre le premier split disponible split_name = list(dataset.keys())[0] hf_dataset = dataset[split_name] else: hf_dataset = dataset self.df = self._safe_convert_to_pandas(hf_dataset) if self.df is not None: print(f"✅ Chargement standard réussi: {len(self.df)} lignes") return None else: return "Conversion en DataFrame échouée" except Exception as e: error_msg = str(e) print(f"❌ Erreur lors du chargement depuis Hugging Face: {error_msg}") # Logging détaillé pour les erreurs Arrow if "ArrowInvalid" in error_msg or "Failed to parse string" in error_msg: print(f"❌ Type d'erreur: {type(e).__name__}") print(f"❖ repr(e): {repr(e)}") if hasattr(e, '__cause__') and e.__cause__: print(f"❖ Cause: {e.__cause__}") if hasattr(e, '__context__') and e.__context__: print(f"❖ Contexte: {e.__context__}") print(f"❖ Args: {e.args}") print(f"❖ datasets version: {hf_datasets.__version__}") print(f"❖ huggingface_hub version: {hf_hub.__version__}") print(f"❖ Proxies détectés: {os.environ.get('HTTP_PROXY', 'aucun')}") print("❖ Traceback complet:") traceback.print_exc() return f"Erreur Arrow/parsing: {error_msg}" def _safe_convert_to_pandas(self, hf_dataset): """Convertit un dataset HF en DataFrame pandas avec gestion sécurisée des types.""" try: # Méthode 1: Conversion directe print("🔄 Tentative de conversion directe...") df = hf_dataset.to_pandas() return self._clean_data_types(df) except Exception as e1: print(f"❌ Conversion directe échouée: {e1}") try: # Méthode 2: Via Arrow Table avec schéma modifié print("🔄 Tentative via Arrow Table avec schéma string...") arrow_table = hf_dataset.data.table # Créer un nouveau schéma avec tous les champs en string string_schema = self._create_string_schema(arrow_table.schema) # Convertir les données en utilisant le schéma string string_table = arrow_table.cast(string_schema) df = string_table.to_pandas() return self._clean_data_types(df) except Exception as e2: print(f"❌ Conversion via Arrow échouée: {e2}") try: # Méthode 3: Chargement ligne par ligne print("🔄 Tentative de chargement ligne par ligne...") rows = [] for i, row in enumerate(hf_dataset): if i >= 10000: # Limite pour éviter les timeouts break # Convertir toutes les valeurs en string pour éviter les erreurs de type safe_row = {k: str(v) if v is not None else None for k, v in row.items()} rows.append(safe_row) if rows: df = pd.DataFrame(rows) return self._clean_data_types(df) except Exception as e3: print(f"❌ Chargement ligne par ligne échoué: {e3}") return None def _create_string_schema(self, original_schema): """Crée un schéma Arrow où tous les types sont convertis en string.""" fields = [] for field in original_schema: # Convertir tous les types en string pour éviter les erreurs de parsing string_field = pa.field(field.name, pa.string(), nullable=True) fields.append(string_field) return pa.schema(fields) def _clean_data_types(self, df): """Nettoie et convertit les types de données du DataFrame.""" if df is None or df.empty: return None print("🔄 Nettoyage et conversion des types...") # Colonnes numériques connues à convertir numeric_columns = ['surfparc', 'millesime', 'quantitetot', 'neffqte', 'peffqte', 'kqte', 'teneurn', 'teneurp', 'teneurk', 'keq', 'volumebo'] for col in numeric_columns: if col in df.columns: df[col] = self._safe_numeric_conversion(df[col]) # Nettoyer les valeurs problématiques for col in df.columns: if df[col].dtype == 'object': # Remplacer les valeurs vides problématiques df[col] = df[col].replace(['', 'null', 'NULL', 'None', 'nan'], None) # Nettoyer les chaînes avec des caractères problématiques df[col] = df[col].astype(str).replace(r'^[^\w\s-]+$', '', regex=True) df[col] = df[col].replace('nan', None) return df def _safe_numeric_conversion(self, series): """Convertit une série en numérique de manière sécurisée.""" try: # Nettoyer d'abord les valeurs non-numériques cleaned = series.astype(str).str.strip() cleaned = cleaned.replace(['', 'null', 'NULL', 'None', 'nan', '-'], None) # Supprimer les caractères non-numériques (sauf . et -) cleaned = cleaned.str.replace(r'[^\d.-]', '', regex=True) # Convertir en numérique numeric_series = pd.to_numeric(cleaned, errors='coerce') return numeric_series except Exception as e: print(f"❌ Erreur conversion numérique pour {series.name}: {e}") return series def _fallback_load_from_repo_files(self): """Fallback pour charger les données en téléchargeant directement les fichiers du repo HF.""" try: print("🔄 Tentative de chargement alternatif via fichiers du dépôt Hugging Face...") api = HfApi() files = api.list_repo_files(repo_id=DATASET_ID, repo_type="dataset", token=HF_TOKEN) if not files: print("❌ Aucun fichier dans le dépôt") return "Aucun fichier trouvé dans le dépôt." data_files = [ f for f in files if f.lower().endswith((".parquet", ".csv", ".tsv", ".json")) ] if not data_files: print("❌ Aucun fichier de données exploitable (csv/tsv/parquet/json)") return "Aucun fichier exploitable (csv/tsv/parquet/json)." # Priorité: parquet > csv > tsv > json for ext in [".parquet", ".csv", ".tsv", ".json"]: selected = [f for f in data_files if f.lower().endswith(ext)] if selected: chosen_ext = ext selected_files = selected break print(f"📂 Fichiers détectés ({chosen_ext}): {selected_files[:5]}{' ...' if len(selected_files) > 5 else ''}") local_paths = [] for f in selected_files: local_path = hf_hub_download( repo_id=DATASET_ID, repo_type="dataset", filename=f, token=HF_TOKEN, ) local_paths.append(local_path) frames = [] if chosen_ext == ".parquet": for p in local_paths: try: df = pd.read_parquet(p) frames.append(self._clean_data_types(df)) except Exception as e: print(f"❌ Erreur lecture parquet {p}: {e}") elif chosen_ext == ".csv": for p in local_paths: try: # Lecture avec tous les types en string pour éviter les erreurs de parsing df = pd.read_csv(p, dtype=str, na_values=['', 'NULL', 'null', 'None']) frames.append(self._clean_data_types(df)) except Exception as e: print(f"❌ Erreur lecture CSV {p}: {e}") elif chosen_ext == ".tsv": for p in local_paths: try: # Lecture avec tous les types en string pour éviter les erreurs de parsing df = pd.read_csv(p, sep="\t", dtype=str, na_values=['', 'NULL', 'null', 'None']) frames.append(self._clean_data_types(df)) except Exception as e: print(f"❌ Erreur lecture TSV {p}: {e}") elif chosen_ext == ".json": for p in local_paths: try: try: df = pd.read_json(p, lines=True, dtype=str) except Exception: df = pd.read_json(p, dtype=str) frames.append(self._clean_data_types(df)) except Exception as e: print(f"❌ Erreur lecture JSON {p}: {e}") # Filtrer les frames None et concaténer valid_frames = [f for f in frames if f is not None and not f.empty] if valid_frames: self.df = pd.concat(valid_frames, ignore_index=True) if len(valid_frames) > 1 else valid_frames[0] print(f"✅ Fallback réussi: {len(self.df)} lignes chargées depuis les fichiers du dépôt") return None else: return "Aucun fichier valide trouvé" except Exception as e: print(f"❌ Fallback échoué: {e}") # Dernier recours: fichier local d'exemple return self._load_local_sample() def _load_local_sample(self): """Charge un fichier local de secours avec conversion sécurisée""" sample_path = os.path.join(os.path.dirname(__file__), "sample_data.csv") if os.path.exists(sample_path): try: # Lecture avec tous les types en string pour éviter les erreurs df = pd.read_csv(sample_path, dtype=str, na_values=['', 'NULL', 'null', 'None']) self.df = self._clean_data_types(df) if self.df is not None and not self.df.empty: print(f"✅ Chargement du fichier local 'sample_data.csv' ({len(self.df)} lignes)") return "Chargement via fichier local de secours." else: print("❌ Fichier local vide après nettoyage") return "Fichier local vide après nettoyage" except Exception as e2: print(f"❌ Échec du chargement du fichier local: {e2}") return f"Erreur fichier local: {str(e2)}" return "Aucune source de données disponible." def get_data(self): """Retourne les données chargées""" return self.df def has_data(self): """Vérifie si des données sont disponibles""" return self.df is not None and len(self.df) > 0 def test_arrow_resilience(self): """Teste la résilience aux erreurs Arrow avec des données problématiques""" print("🧪 Test de résilience aux erreurs Arrow...") # Créer un DataFrame de test avec des valeurs problématiques test_data = { 'numparcell': ['P001', 'P002', 'Coué - ', ''], 'surfparc': [1.5, 2.0, 'Coué - ', '0'], 'millesime': [2014, 2015, 'Coué - ', 'invalid'], 'problematic_col': ['Coué - ', 'Normal', '', 'null'] } original_df = pd.DataFrame(test_data) print(f"📊 Données de test créées: {len(original_df)} lignes") print("📊 Données problématiques incluses: 'Coué - ', chaînes vides, valeurs invalides") # Appliquer le nettoyage cleaned_df = self._clean_data_types(original_df.copy()) if cleaned_df is not None: print(f"✅ Nettoyage réussi: {len(cleaned_df)} lignes") print("📊 Types après nettoyage:") for col in cleaned_df.columns: print(f" - {col}: {cleaned_df[col].dtype}") # Vérifier les colonnes numériques if 'surfparc' in cleaned_df.columns: valid_numeric = cleaned_df['surfparc'].notna().sum() print(f"📊 Valeurs numériques valides dans surfparc: {valid_numeric}") return True else: print("❌ Échec du test de nettoyage") return False