""" Module d'analyse des données agricoles et calcul des risques """ import pandas as pd from config import OPTIONAL_GROUP_COLS, REQUIRED_COLUMNS, RISK_LEVELS class AgricultureAnalyzer: """Classe responsable de l'analyse des données agricoles""" def __init__(self, data=None): self.df = data self.risk_analysis = None def set_data(self, data): """Définit les données à analyser""" self.df = data def analyze_data(self): """Analyse des données et calcul des risques""" if self.df is None or len(self.df) == 0: print("❌ Pas de données à analyser") return "Erreur: Aucune donnée chargée" try: print(f"🔄 Début de l'analyse sur {len(self.df)} enregistrements...") # Analyse générale general_stats = self._calculate_general_stats() # Analyse des herbicides herbicide_stats = self._calculate_herbicide_stats() # Calcul de l'analyse des risques self.calculate_risk_analysis() print("✅ Analyse terminée avec succès") return general_stats, herbicide_stats except Exception as e: print(f"❌ Erreur lors de l'analyse: {str(e)}") return None, None def _calculate_general_stats(self): """Calcule les statistiques générales""" return { 'total_parcelles': self.df['numparcell'].nunique(), 'total_interventions': len(self.df), 'surface_totale': self.df['surfparc'].sum(), 'surface_moyenne': self.df['surfparc'].mean(), 'periode': f"{self.df['millesime'].min()} - {self.df['millesime'].max()}" } def _calculate_herbicide_stats(self): """Calcule les statistiques sur les herbicides""" if 'familleprod' in self.df.columns: herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'].copy() return { 'nb_interventions_herbicides': len(herbicides_df), 'pourcentage_herbicides': (len(herbicides_df) / len(self.df)) * 100, 'parcelles_traitees': herbicides_df['numparcell'].nunique() } else: return { 'nb_interventions_herbicides': 0, 'pourcentage_herbicides': 0, 'parcelles_traitees': 0 } def calculate_risk_analysis(self): """Calcule l'analyse des risques par parcelle""" try: print("🔄 Calcul de l'analyse des risques...") # Vérifier les colonnes nécessaires required_group_cols = ['numparcell', 'surfparc'] # Construire la liste des colonnes de groupement disponibles group_cols = [col for col in required_group_cols if col in self.df.columns] group_cols.extend([col for col in OPTIONAL_GROUP_COLS if col in self.df.columns]) if len(group_cols) < 2: print(f"❌ Colonnes insuffisantes pour le groupement: {group_cols}") self.risk_analysis = pd.DataFrame() return # Construire l'agrégation selon les colonnes disponibles agg_dict = self._build_aggregation_dict() if not agg_dict: print("❌ Aucune colonne disponible pour l'agrégation") self.risk_analysis = pd.DataFrame() return # Groupement des données par parcelle risk_analysis = self.df.groupby(group_cols).agg(agg_dict).round(2) # Ajout des quantités d'herbicides spécifiques risk_analysis = self._add_herbicide_quantities(risk_analysis, group_cols) # Renommage des colonnes risk_analysis = self._rename_columns(risk_analysis, agg_dict) # Calcul de l'IFT approximatif risk_analysis = self._calculate_ift(risk_analysis, group_cols) # Classification du risque risk_analysis['Risque_adventice'] = risk_analysis.apply(self._classify_risk, axis=1) # Tri par risque risk_analysis = self._sort_by_risk(risk_analysis) self.risk_analysis = risk_analysis print(f"✅ Analyse des risques terminée: {len(self.risk_analysis)} parcelles analysées") except Exception as e: print(f"❌ Erreur lors du calcul des risques: {str(e)}") self.risk_analysis = pd.DataFrame() def _build_aggregation_dict(self): """Construit le dictionnaire d'agrégation selon les colonnes disponibles""" agg_dict = {} if 'familleprod' in self.df.columns: agg_dict['familleprod'] = lambda x: (x == 'Herbicides').sum() if 'libevenem' in self.df.columns: agg_dict['libevenem'] = lambda x: len(x.unique()) if 'produit' in self.df.columns: agg_dict['produit'] = lambda x: len(x.unique()) if 'quantitetot' in self.df.columns: agg_dict['quantitetot'] = 'sum' return agg_dict def _add_herbicide_quantities(self, risk_analysis, group_cols): """Ajoute les quantités d'herbicides spécifiques""" if 'familleprod' in self.df.columns and 'quantitetot' in self.df.columns: herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'] if len(herbicides_df) > 0: herbicide_quantities = herbicides_df.groupby(group_cols)['quantitetot'].sum().fillna(0) risk_analysis['Quantite_herbicides'] = herbicide_quantities.reindex(risk_analysis.index, fill_value=0) else: risk_analysis['Quantite_herbicides'] = 0 else: risk_analysis['Quantite_herbicides'] = 0 return risk_analysis def _rename_columns(self, risk_analysis, agg_dict): """Renomme les colonnes de façon sécurisée""" new_column_names = {} if 'familleprod' in agg_dict: new_column_names['familleprod'] = 'Nb_herbicides' if 'libevenem' in agg_dict: new_column_names['libevenem'] = 'Diversite_evenements' if 'produit' in agg_dict: new_column_names['produit'] = 'Diversite_produits' if 'quantitetot' in agg_dict: new_column_names['quantitetot'] = 'Quantite_totale' return risk_analysis.rename(columns=new_column_names) def _calculate_ift(self, risk_analysis, group_cols): """Calcule l'IFT approximatif""" if 'surfparc' in group_cols: risk_analysis['IFT_herbicide_approx'] = ( risk_analysis['Quantite_herbicides'] / risk_analysis.index.get_level_values('surfparc') ).round(2) else: risk_analysis['IFT_herbicide_approx'] = 0 return risk_analysis def _classify_risk(self, row): """Classification du risque pour une parcelle""" ift = row.get('IFT_herbicide_approx', 0) nb_herb = row.get('Nb_herbicides', 0) if ift == 0 and nb_herb == 0: return 'TRÈS FAIBLE' elif ift < 1 and nb_herb <= 1: return 'FAIBLE' elif ift < 3 and nb_herb <= 3: return 'MODÉRÉ' elif ift < 5 and nb_herb <= 5: return 'ÉLEVÉ' else: return 'TRÈS ÉLEVÉ' def _sort_by_risk(self, risk_analysis): """Trie les résultats par niveau de risque""" risk_order = {r: i for i, r in enumerate(RISK_LEVELS)} risk_analysis['Risk_Score'] = risk_analysis['Risque_adventice'].map(risk_order) return risk_analysis.sort_values(['Risk_Score', 'IFT_herbicide_approx']) def get_summary_stats(self): """Retourne les statistiques de résumé avec gestion d'erreur""" try: if self.df is None: return "❌ Aucune donnée disponible" # Statistiques générales avec gestion d'erreur try: total_parcelles = self.df['numparcell'].nunique() total_interventions = len(self.df) surface_totale = self.df['surfparc'].sum() surface_moyenne = self.df['surfparc'].mean() periode_min = self.df['millesime'].min() periode_max = self.df['millesime'].max() stats_text = f""" ## 📊 Statistiques Générales - **Nombre total de parcelles**: {total_parcelles} - **Nombre d'interventions**: {total_interventions:,} - **Surface totale**: {surface_totale:.2f} hectares - **Surface moyenne par parcelle**: {surface_moyenne:.2f} hectares - **Période**: {periode_min} - {periode_max} ## 🧪 Analyse Herbicides """ except Exception as e: print(f"❌ Erreur dans les statistiques générales: {e}") stats_text = """ ## 📊 Statistiques Générales ❌ Erreur lors du calcul des statistiques générales ## 🧪 Analyse Herbicides """ # Analyse des herbicides avec gestion d'erreur try: if 'familleprod' in self.df.columns: herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'] if len(herbicides_df) > 0: nb_herbicides = len(herbicides_df) pct_herbicides = (nb_herbicides/len(self.df)*100) parcelles_traitees = herbicides_df['numparcell'].nunique() if 'produit' in herbicides_df.columns: produits_uniques = herbicides_df['produit'].nunique() stats_text += f""" - **Interventions herbicides**: {nb_herbicides} ({pct_herbicides:.1f}%) - **Parcelles traitées**: {parcelles_traitees} - **Produits herbicides différents**: {produits_uniques} """ else: stats_text += f""" - **Interventions herbicides**: {nb_herbicides} ({pct_herbicides:.1f}%) - **Parcelles traitées**: {parcelles_traitees} """ else: stats_text += "\n- **Aucune intervention herbicide détectée**" else: stats_text += "\n- **Données d'herbicides non disponibles**" except Exception as e: print(f"❌ Erreur dans l'analyse des herbicides: {e}") stats_text += "\n❌ Erreur lors de l'analyse des herbicides" # Analyse des risques avec gestion d'erreur try: if self.risk_analysis is not None and len(self.risk_analysis) > 0: risk_distribution = self.risk_analysis['Risque_adventice'].value_counts() stats_text += f""" ## 🎯 Répartition des Risques Adventices """ for risk_level in RISK_LEVELS: if risk_level in risk_distribution: count = risk_distribution[risk_level] pct = (count / len(self.risk_analysis)) * 100 stats_text += f"- **{risk_level}**: {count} parcelles ({pct:.1f}%)\n" else: stats_text += "\n\n❌ Analyse des risques non disponible" except Exception as e: print(f"❌ Erreur dans l'analyse des risques: {e}") stats_text += "\n\n❌ Erreur lors de l'analyse des risques" return stats_text except Exception as e: print(f"❌ Erreur critique dans get_summary_stats: {e}") return "❌ Erreur critique lors de la génération des statistiques" def get_low_risk_recommendations(self): """Retourne les recommandations pour les parcelles à faible risque avec gestion d'erreur""" try: if self.risk_analysis is None or len(self.risk_analysis) == 0: return "❌ Analyse des risques non disponible" try: low_risk = self.risk_analysis[ self.risk_analysis['Risque_adventice'].isin(['TRÈS FAIBLE', 'FAIBLE']) ].head(10) if len(low_risk) == 0: return """## 🌾 Recommandations pour Cultures Sensibles ❌ Aucune parcelle à faible risque trouvée. 💡 **Suggestion**: Considérez une rotation plus longue ou des techniques alternatives pour réduire la pression adventice.""" recommendations = "## 🌾 TOP 10 - Parcelles Recommandées pour Cultures Sensibles (Pois, Haricot)\n\n" for idx, row in low_risk.iterrows(): try: if isinstance(idx, tuple) and len(idx) >= 4: parcelle, nom, culture, surface = idx[:4] else: # Fallback si l'index n'est pas un tuple de 4 éléments parcelle = str(idx) nom = "N/A" culture = "N/A" surface = row.get('surfparc', 0) if hasattr(row, 'get') else 0 # Vérification des valeurs avec fallbacks risque = row.get('Risque_adventice', 'N/A') if hasattr(row, 'get') else 'N/A' ift = row.get('IFT_herbicide_approx', 0) if hasattr(row, 'get') else 0 nb_herb = row.get('Nb_herbicides', 0) if hasattr(row, 'get') else 0 # Conversion sécurisée pour les formats try: surface_formatted = f"{float(surface):.2f}" if surface != "N/A" else "N/A" except (ValueError, TypeError): surface_formatted = str(surface) try: ift_formatted = f"{float(ift):.2f}" if ift != "N/A" else "N/A" except (ValueError, TypeError): ift_formatted = str(ift) recommendations += f""" **Parcelle {parcelle}** ({nom}) - Culture actuelle: {culture} - Surface: {surface_formatted} ha - Niveau de risque: {risque} - IFT herbicide: {ift_formatted} - Nombre d'herbicides: {nb_herb} --- """ except Exception as e: print(f"❌ Erreur lors du traitement d'une parcelle: {e}") recommendations += f""" **Parcelle {str(idx)}** ❌ Erreur lors du traitement des données de cette parcelle --- """ return recommendations except Exception as e: print(f"❌ Erreur lors de la génération des recommandations: {e}") return """## 🌾 Recommandations pour Cultures Sensibles ❌ Erreur lors de la génération des recommandations. 💡 **Suggestion**: Vérifiez la qualité des données et relancez l'analyse.""" except Exception as e: print(f"❌ Erreur critique dans get_low_risk_recommendations: {e}") return "❌ Erreur critique lors de la génération des recommandations" def get_risk_analysis(self): """Retourne l'analyse des risques""" return self.risk_analysis def get_available_parcels(self): """Retourne la liste des parcelles disponibles dans les données""" if self.df is None or len(self.df) == 0: return [] # Créer une liste avec numéro et nom de parcelle si disponible parcels_info = [] if 'nomparc' in self.df.columns: # Grouper par parcelle et prendre le premier nom (en cas de doublons) parcels_data = self.df.groupby('numparcell')['nomparc'].first().reset_index() for _, row in parcels_data.iterrows(): parcel_id = str(row['numparcell']) parcel_name = str(row['nomparc']) if pd.notna(row['nomparc']) else "" if parcel_name and parcel_name != "nan": display_name = f"{parcel_id} - {parcel_name}" else: display_name = parcel_id parcels_info.append((display_name, parcel_id)) else: # Seulement les numéros de parcelles unique_parcels = sorted(self.df['numparcell'].dropna().unique()) parcels_info = [(str(p), str(p)) for p in unique_parcels] # Ajouter l'option "Toutes les parcelles" en premier parcels_info.insert(0, ("Toutes les parcelles", "ALL")) return parcels_info def get_available_parcels_for_year(self, year): """Retourne la liste des parcelles disponibles pour une année donnée""" if self.df is None or len(self.df) == 0: return [("Toutes les parcelles", "ALL")] # Filtrer par année si spécifiée if year is not None: year_data = self.df[self.df['millesime'] == year] else: year_data = self.df if len(year_data) == 0: return [("Toutes les parcelles", "ALL")] # Créer une liste avec numéro et nom de parcelle si disponible parcels_info = [] if 'nomparc' in year_data.columns: # Grouper par parcelle et prendre le premier nom (en cas de doublons) parcels_data = year_data.groupby('numparcell')['nomparc'].first().reset_index() for _, row in parcels_data.iterrows(): parcel_id = str(row['numparcell']) parcel_name = str(row['nomparc']) if pd.notna(row['nomparc']) else "" if parcel_name and parcel_name != "nan": display_name = f"{parcel_id} - {parcel_name}" else: display_name = parcel_id parcels_info.append((display_name, parcel_id)) else: # Seulement les numéros de parcelles unique_parcels = sorted(year_data['numparcell'].dropna().unique()) parcels_info = [(str(p), str(p)) for p in unique_parcels] # Ajouter l'option "Toutes les parcelles" en premier parcels_info.insert(0, ("Toutes les parcelles", "ALL")) return parcels_info def get_available_years_for_parcel(self, parcel_id): """Retourne la liste des années disponibles pour une parcelle donnée""" if self.df is None or len(self.df) == 0: return ["Toutes les années"] # Filtrer par parcelle si spécifiée if parcel_id is not None and parcel_id != "ALL": try: # Convertir en entier si c'est une chaîne if isinstance(parcel_id, str) and parcel_id.isdigit(): parcel_id_converted = int(parcel_id) else: parcel_id_converted = parcel_id parcel_data = self.df[self.df['numparcell'] == parcel_id_converted] except (ValueError, TypeError): parcel_data = self.df[self.df['numparcell'] == parcel_id] else: parcel_data = self.df if len(parcel_data) == 0: return ["Toutes les années"] # Récupérer les années disponibles et les trier available_years = sorted(parcel_data['millesime'].dropna().unique()) year_choices = ["Toutes les années"] + [str(year) for year in available_years] return year_choices def filter_data_by_parcel(self, parcel_id): """Filtre les données par parcelle""" if self.df is None or parcel_id is None or parcel_id == "ALL": return self.df parcel_data = self.df[self.df['numparcell'] == parcel_id].copy() return parcel_data def filter_data_by_year_and_parcel(self, year, parcel_id): """Filtre les données par année et parcelle""" if self.df is None: return None filtered_data = self.df.copy() # Filtrer par année si spécifiée if year is not None: filtered_data = filtered_data[filtered_data['millesime'] == year] # Filtrer par parcelle si spécifiée (et différente de "ALL") if parcel_id is not None and parcel_id != "ALL": # Convertir parcel_id en type approprié (gérer string/int) try: # Essayer de convertir en entier si c'est une chaîne if isinstance(parcel_id, str) and parcel_id.isdigit(): parcel_id_converted = int(parcel_id) else: parcel_id_converted = parcel_id filtered_data = filtered_data[filtered_data['numparcell'] == parcel_id_converted] except (ValueError, TypeError): # En cas d'erreur de conversion, essayer tel quel filtered_data = filtered_data[filtered_data['numparcell'] == parcel_id] return filtered_data def get_data_table_by_year_and_parcel(self, year, parcel_id=None, max_rows=1000): """Retourne un tableau des données pour une année et optionnellement une parcelle""" try: filtered_data = self.filter_data_by_year_and_parcel(year, parcel_id) if filtered_data is None or len(filtered_data) == 0: # Construire un message d'erreur informatif avec les données disponibles available_years = sorted(self.df['millesime'].unique()) if self.df is not None else [] available_parcels = sorted(self.df['numparcell'].unique()) if self.df is not None else [] if parcel_id and parcel_id != "ALL": error_msg = f"❌ Aucune donnée pour l'année {year} et la parcelle {parcel_id}\n\n" error_msg += f"📅 **Années disponibles**: {', '.join(map(str, available_years))}\n" error_msg += f"🏠 **Parcelles disponibles**: {', '.join(map(str, available_parcels[:10]))}" if len(available_parcels) > 10: error_msg += f" (et {len(available_parcels)-10} autres...)" return None, error_msg else: error_msg = f"❌ Aucune donnée pour l'année {year}\n\n" error_msg += f"📅 **Années disponibles**: {', '.join(map(str, available_years))}" return None, error_msg # Sélectionner les colonnes les plus importantes pour l'affichage display_cols = [] important_cols = [ 'millesime', 'numparcell', 'nomparc', 'surfparc', 'libelleusag', 'datedebut', 'datefin', 'libevenem', 'familleprod', 'produit', 'quantitetot', 'unite' ] for col in important_cols: if col in filtered_data.columns: display_cols.append(col) if not display_cols: return None, "❌ Aucune colonne importante trouvée" # Préparer les données pour l'affichage display_df = filtered_data[display_cols].copy() # Formater les colonnes pour un meilleur affichage if 'surfparc' in display_df.columns: display_df['surfparc'] = display_df['surfparc'].round(2) if 'quantitetot' in display_df.columns: display_df['quantitetot'] = pd.to_numeric(display_df['quantitetot'], errors='coerce').round(3) # Trier par date si disponible, sinon par parcelle if 'datedebut' in display_df.columns: # Convertir les dates pour le tri display_df['date_sort'] = pd.to_datetime(display_df['datedebut'], format='%d/%m/%y', errors='coerce') display_df = display_df.sort_values(['numparcell', 'date_sort']) display_df = display_df.drop('date_sort', axis=1) else: display_df = display_df.sort_values('numparcell') # Limiter le nombre de lignes pour l'affichage if len(display_df) > max_rows: display_df = display_df.head(max_rows) info_msg = f"📊 Affichage de {max_rows} premières lignes sur {len(filtered_data)} total" else: info_msg = f"📊 {len(display_df)} enregistrements au total" # Renommer les colonnes pour l'affichage column_mapping = { 'millesime': 'Année', 'numparcell': 'N° Parcelle', 'nomparc': 'Nom Parcelle', 'surfparc': 'Surface (ha)', 'libelleusag': 'Usage', 'datedebut': 'Date Début', 'datefin': 'Date Fin', 'libevenem': 'Type Intervention', 'familleprod': 'Famille Produit', 'produit': 'Produit', 'quantitetot': 'Quantité', 'unite': 'Unité' } # Appliquer le renommage seulement pour les colonnes présentes rename_dict = {k: v for k, v in column_mapping.items() if k in display_df.columns} display_df = display_df.rename(columns=rename_dict) # Ajouter l'information sur la sélection if year and parcel_id and parcel_id != "ALL": info_msg = f"📊 Année {year} - Parcelle {parcel_id}: {len(display_df)} enregistrements" elif year: info_msg = f"📊 Année {year}: {len(display_df)} enregistrements" elif parcel_id and parcel_id != "ALL": info_msg = f"📊 Parcelle {parcel_id}: {len(display_df)} enregistrements" return display_df, info_msg except Exception as e: print(f"❌ Erreur lors de la création du tableau: {e}") return None, f"❌ Erreur lors de la création du tableau: {str(e)[:100]}..."