data / analyzer.py
Tracy André
updated
bc49a9e
"""
Module d'analyse des données agricoles et calcul des risques
"""
import pandas as pd
from config import OPTIONAL_GROUP_COLS, REQUIRED_COLUMNS, RISK_LEVELS
class AgricultureAnalyzer:
"""Classe responsable de l'analyse des données agricoles"""
def __init__(self, data=None):
self.df = data
self.risk_analysis = None
def set_data(self, data):
"""Définit les données à analyser"""
self.df = data
def analyze_data(self):
"""Analyse des données et calcul des risques"""
if self.df is None or len(self.df) == 0:
print("❌ Pas de données à analyser")
return "Erreur: Aucune donnée chargée"
try:
print(f"🔄 Début de l'analyse sur {len(self.df)} enregistrements...")
# Analyse générale
general_stats = self._calculate_general_stats()
# Analyse des herbicides
herbicide_stats = self._calculate_herbicide_stats()
# Calcul de l'analyse des risques
self.calculate_risk_analysis()
print("✅ Analyse terminée avec succès")
return general_stats, herbicide_stats
except Exception as e:
print(f"❌ Erreur lors de l'analyse: {str(e)}")
return None, None
def _calculate_general_stats(self):
"""Calcule les statistiques générales"""
return {
'total_parcelles': self.df['numparcell'].nunique(),
'total_interventions': len(self.df),
'surface_totale': self.df['surfparc'].sum(),
'surface_moyenne': self.df['surfparc'].mean(),
'periode': f"{self.df['millesime'].min()} - {self.df['millesime'].max()}"
}
def _calculate_herbicide_stats(self):
"""Calcule les statistiques sur les herbicides"""
if 'familleprod' in self.df.columns:
herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'].copy()
return {
'nb_interventions_herbicides': len(herbicides_df),
'pourcentage_herbicides': (len(herbicides_df) / len(self.df)) * 100,
'parcelles_traitees': herbicides_df['numparcell'].nunique()
}
else:
return {
'nb_interventions_herbicides': 0,
'pourcentage_herbicides': 0,
'parcelles_traitees': 0
}
def calculate_risk_analysis(self):
"""Calcule l'analyse des risques par parcelle"""
try:
print("🔄 Calcul de l'analyse des risques...")
# Vérifier les colonnes nécessaires
required_group_cols = ['numparcell', 'surfparc']
# Construire la liste des colonnes de groupement disponibles
group_cols = [col for col in required_group_cols if col in self.df.columns]
group_cols.extend([col for col in OPTIONAL_GROUP_COLS if col in self.df.columns])
if len(group_cols) < 2:
print(f"❌ Colonnes insuffisantes pour le groupement: {group_cols}")
self.risk_analysis = pd.DataFrame()
return
# Construire l'agrégation selon les colonnes disponibles
agg_dict = self._build_aggregation_dict()
if not agg_dict:
print("❌ Aucune colonne disponible pour l'agrégation")
self.risk_analysis = pd.DataFrame()
return
# Groupement des données par parcelle
risk_analysis = self.df.groupby(group_cols).agg(agg_dict).round(2)
# Ajout des quantités d'herbicides spécifiques
risk_analysis = self._add_herbicide_quantities(risk_analysis, group_cols)
# Renommage des colonnes
risk_analysis = self._rename_columns(risk_analysis, agg_dict)
# Calcul de l'IFT approximatif
risk_analysis = self._calculate_ift(risk_analysis, group_cols)
# Classification du risque
risk_analysis['Risque_adventice'] = risk_analysis.apply(self._classify_risk, axis=1)
# Tri par risque
risk_analysis = self._sort_by_risk(risk_analysis)
self.risk_analysis = risk_analysis
print(f"✅ Analyse des risques terminée: {len(self.risk_analysis)} parcelles analysées")
except Exception as e:
print(f"❌ Erreur lors du calcul des risques: {str(e)}")
self.risk_analysis = pd.DataFrame()
def _build_aggregation_dict(self):
"""Construit le dictionnaire d'agrégation selon les colonnes disponibles"""
agg_dict = {}
if 'familleprod' in self.df.columns:
agg_dict['familleprod'] = lambda x: (x == 'Herbicides').sum()
if 'libevenem' in self.df.columns:
agg_dict['libevenem'] = lambda x: len(x.unique())
if 'produit' in self.df.columns:
agg_dict['produit'] = lambda x: len(x.unique())
if 'quantitetot' in self.df.columns:
agg_dict['quantitetot'] = 'sum'
return agg_dict
def _add_herbicide_quantities(self, risk_analysis, group_cols):
"""Ajoute les quantités d'herbicides spécifiques"""
if 'familleprod' in self.df.columns and 'quantitetot' in self.df.columns:
herbicides_df = self.df[self.df['familleprod'] == 'Herbicides']
if len(herbicides_df) > 0:
herbicide_quantities = herbicides_df.groupby(group_cols)['quantitetot'].sum().fillna(0)
risk_analysis['Quantite_herbicides'] = herbicide_quantities.reindex(risk_analysis.index, fill_value=0)
else:
risk_analysis['Quantite_herbicides'] = 0
else:
risk_analysis['Quantite_herbicides'] = 0
return risk_analysis
def _rename_columns(self, risk_analysis, agg_dict):
"""Renomme les colonnes de façon sécurisée"""
new_column_names = {}
if 'familleprod' in agg_dict:
new_column_names['familleprod'] = 'Nb_herbicides'
if 'libevenem' in agg_dict:
new_column_names['libevenem'] = 'Diversite_evenements'
if 'produit' in agg_dict:
new_column_names['produit'] = 'Diversite_produits'
if 'quantitetot' in agg_dict:
new_column_names['quantitetot'] = 'Quantite_totale'
return risk_analysis.rename(columns=new_column_names)
def _calculate_ift(self, risk_analysis, group_cols):
"""Calcule l'IFT approximatif"""
if 'surfparc' in group_cols:
risk_analysis['IFT_herbicide_approx'] = (
risk_analysis['Quantite_herbicides'] /
risk_analysis.index.get_level_values('surfparc')
).round(2)
else:
risk_analysis['IFT_herbicide_approx'] = 0
return risk_analysis
def _classify_risk(self, row):
"""Classification du risque pour une parcelle"""
ift = row.get('IFT_herbicide_approx', 0)
nb_herb = row.get('Nb_herbicides', 0)
if ift == 0 and nb_herb == 0:
return 'TRÈS FAIBLE'
elif ift < 1 and nb_herb <= 1:
return 'FAIBLE'
elif ift < 3 and nb_herb <= 3:
return 'MODÉRÉ'
elif ift < 5 and nb_herb <= 5:
return 'ÉLEVÉ'
else:
return 'TRÈS ÉLEVÉ'
def _sort_by_risk(self, risk_analysis):
"""Trie les résultats par niveau de risque"""
risk_order = {r: i for i, r in enumerate(RISK_LEVELS)}
risk_analysis['Risk_Score'] = risk_analysis['Risque_adventice'].map(risk_order)
return risk_analysis.sort_values(['Risk_Score', 'IFT_herbicide_approx'])
def get_summary_stats(self):
"""Retourne les statistiques de résumé avec gestion d'erreur"""
try:
if self.df is None:
return "❌ Aucune donnée disponible"
# Statistiques générales avec gestion d'erreur
try:
total_parcelles = self.df['numparcell'].nunique()
total_interventions = len(self.df)
surface_totale = self.df['surfparc'].sum()
surface_moyenne = self.df['surfparc'].mean()
periode_min = self.df['millesime'].min()
periode_max = self.df['millesime'].max()
stats_text = f"""
## 📊 Statistiques Générales
- **Nombre total de parcelles**: {total_parcelles}
- **Nombre d'interventions**: {total_interventions:,}
- **Surface totale**: {surface_totale:.2f} hectares
- **Surface moyenne par parcelle**: {surface_moyenne:.2f} hectares
- **Période**: {periode_min} - {periode_max}
## 🧪 Analyse Herbicides
"""
except Exception as e:
print(f"❌ Erreur dans les statistiques générales: {e}")
stats_text = """
## 📊 Statistiques Générales
❌ Erreur lors du calcul des statistiques générales
## 🧪 Analyse Herbicides
"""
# Analyse des herbicides avec gestion d'erreur
try:
if 'familleprod' in self.df.columns:
herbicides_df = self.df[self.df['familleprod'] == 'Herbicides']
if len(herbicides_df) > 0:
nb_herbicides = len(herbicides_df)
pct_herbicides = (nb_herbicides/len(self.df)*100)
parcelles_traitees = herbicides_df['numparcell'].nunique()
if 'produit' in herbicides_df.columns:
produits_uniques = herbicides_df['produit'].nunique()
stats_text += f"""
- **Interventions herbicides**: {nb_herbicides} ({pct_herbicides:.1f}%)
- **Parcelles traitées**: {parcelles_traitees}
- **Produits herbicides différents**: {produits_uniques}
"""
else:
stats_text += f"""
- **Interventions herbicides**: {nb_herbicides} ({pct_herbicides:.1f}%)
- **Parcelles traitées**: {parcelles_traitees}
"""
else:
stats_text += "\n- **Aucune intervention herbicide détectée**"
else:
stats_text += "\n- **Données d'herbicides non disponibles**"
except Exception as e:
print(f"❌ Erreur dans l'analyse des herbicides: {e}")
stats_text += "\n❌ Erreur lors de l'analyse des herbicides"
# Analyse des risques avec gestion d'erreur
try:
if self.risk_analysis is not None and len(self.risk_analysis) > 0:
risk_distribution = self.risk_analysis['Risque_adventice'].value_counts()
stats_text += f"""
## 🎯 Répartition des Risques Adventices
"""
for risk_level in RISK_LEVELS:
if risk_level in risk_distribution:
count = risk_distribution[risk_level]
pct = (count / len(self.risk_analysis)) * 100
stats_text += f"- **{risk_level}**: {count} parcelles ({pct:.1f}%)\n"
else:
stats_text += "\n\n❌ Analyse des risques non disponible"
except Exception as e:
print(f"❌ Erreur dans l'analyse des risques: {e}")
stats_text += "\n\n❌ Erreur lors de l'analyse des risques"
return stats_text
except Exception as e:
print(f"❌ Erreur critique dans get_summary_stats: {e}")
return "❌ Erreur critique lors de la génération des statistiques"
def get_low_risk_recommendations(self):
"""Retourne les recommandations pour les parcelles à faible risque avec gestion d'erreur"""
try:
if self.risk_analysis is None or len(self.risk_analysis) == 0:
return "❌ Analyse des risques non disponible"
try:
low_risk = self.risk_analysis[
self.risk_analysis['Risque_adventice'].isin(['TRÈS FAIBLE', 'FAIBLE'])
].head(10)
if len(low_risk) == 0:
return """## 🌾 Recommandations pour Cultures Sensibles
❌ Aucune parcelle à faible risque trouvée.
💡 **Suggestion**: Considérez une rotation plus longue ou des techniques alternatives pour réduire la pression adventice."""
recommendations = "## 🌾 TOP 10 - Parcelles Recommandées pour Cultures Sensibles (Pois, Haricot)\n\n"
for idx, row in low_risk.iterrows():
try:
if isinstance(idx, tuple) and len(idx) >= 4:
parcelle, nom, culture, surface = idx[:4]
else:
# Fallback si l'index n'est pas un tuple de 4 éléments
parcelle = str(idx)
nom = "N/A"
culture = "N/A"
surface = row.get('surfparc', 0) if hasattr(row, 'get') else 0
# Vérification des valeurs avec fallbacks
risque = row.get('Risque_adventice', 'N/A') if hasattr(row, 'get') else 'N/A'
ift = row.get('IFT_herbicide_approx', 0) if hasattr(row, 'get') else 0
nb_herb = row.get('Nb_herbicides', 0) if hasattr(row, 'get') else 0
# Conversion sécurisée pour les formats
try:
surface_formatted = f"{float(surface):.2f}" if surface != "N/A" else "N/A"
except (ValueError, TypeError):
surface_formatted = str(surface)
try:
ift_formatted = f"{float(ift):.2f}" if ift != "N/A" else "N/A"
except (ValueError, TypeError):
ift_formatted = str(ift)
recommendations += f"""
**Parcelle {parcelle}** ({nom})
- Culture actuelle: {culture}
- Surface: {surface_formatted} ha
- Niveau de risque: {risque}
- IFT herbicide: {ift_formatted}
- Nombre d'herbicides: {nb_herb}
---
"""
except Exception as e:
print(f"❌ Erreur lors du traitement d'une parcelle: {e}")
recommendations += f"""
**Parcelle {str(idx)}**
❌ Erreur lors du traitement des données de cette parcelle
---
"""
return recommendations
except Exception as e:
print(f"❌ Erreur lors de la génération des recommandations: {e}")
return """## 🌾 Recommandations pour Cultures Sensibles
❌ Erreur lors de la génération des recommandations.
💡 **Suggestion**: Vérifiez la qualité des données et relancez l'analyse."""
except Exception as e:
print(f"❌ Erreur critique dans get_low_risk_recommendations: {e}")
return "❌ Erreur critique lors de la génération des recommandations"
def get_risk_analysis(self):
"""Retourne l'analyse des risques"""
return self.risk_analysis
def get_available_parcels(self):
"""Retourne la liste des parcelles disponibles dans les données"""
if self.df is None or len(self.df) == 0:
return []
# Créer une liste avec numéro et nom de parcelle si disponible
parcels_info = []
if 'nomparc' in self.df.columns:
# Grouper par parcelle et prendre le premier nom (en cas de doublons)
parcels_data = self.df.groupby('numparcell')['nomparc'].first().reset_index()
for _, row in parcels_data.iterrows():
parcel_id = str(row['numparcell'])
parcel_name = str(row['nomparc']) if pd.notna(row['nomparc']) else ""
if parcel_name and parcel_name != "nan":
display_name = f"{parcel_id} - {parcel_name}"
else:
display_name = parcel_id
parcels_info.append((display_name, parcel_id))
else:
# Seulement les numéros de parcelles
unique_parcels = sorted(self.df['numparcell'].dropna().unique())
parcels_info = [(str(p), str(p)) for p in unique_parcels]
# Ajouter l'option "Toutes les parcelles" en premier
parcels_info.insert(0, ("Toutes les parcelles", "ALL"))
return parcels_info
def get_available_parcels_for_year(self, year):
"""Retourne la liste des parcelles disponibles pour une année donnée"""
if self.df is None or len(self.df) == 0:
return [("Toutes les parcelles", "ALL")]
# Filtrer par année si spécifiée
if year is not None:
year_data = self.df[self.df['millesime'] == year]
else:
year_data = self.df
if len(year_data) == 0:
return [("Toutes les parcelles", "ALL")]
# Créer une liste avec numéro et nom de parcelle si disponible
parcels_info = []
if 'nomparc' in year_data.columns:
# Grouper par parcelle et prendre le premier nom (en cas de doublons)
parcels_data = year_data.groupby('numparcell')['nomparc'].first().reset_index()
for _, row in parcels_data.iterrows():
parcel_id = str(row['numparcell'])
parcel_name = str(row['nomparc']) if pd.notna(row['nomparc']) else ""
if parcel_name and parcel_name != "nan":
display_name = f"{parcel_id} - {parcel_name}"
else:
display_name = parcel_id
parcels_info.append((display_name, parcel_id))
else:
# Seulement les numéros de parcelles
unique_parcels = sorted(year_data['numparcell'].dropna().unique())
parcels_info = [(str(p), str(p)) for p in unique_parcels]
# Ajouter l'option "Toutes les parcelles" en premier
parcels_info.insert(0, ("Toutes les parcelles", "ALL"))
return parcels_info
def get_available_years_for_parcel(self, parcel_id):
"""Retourne la liste des années disponibles pour une parcelle donnée"""
if self.df is None or len(self.df) == 0:
return ["Toutes les années"]
# Filtrer par parcelle si spécifiée
if parcel_id is not None and parcel_id != "ALL":
try:
# Convertir en entier si c'est une chaîne
if isinstance(parcel_id, str) and parcel_id.isdigit():
parcel_id_converted = int(parcel_id)
else:
parcel_id_converted = parcel_id
parcel_data = self.df[self.df['numparcell'] == parcel_id_converted]
except (ValueError, TypeError):
parcel_data = self.df[self.df['numparcell'] == parcel_id]
else:
parcel_data = self.df
if len(parcel_data) == 0:
return ["Toutes les années"]
# Récupérer les années disponibles et les trier
available_years = sorted(parcel_data['millesime'].dropna().unique())
year_choices = ["Toutes les années"] + [str(year) for year in available_years]
return year_choices
def filter_data_by_parcel(self, parcel_id):
"""Filtre les données par parcelle"""
if self.df is None or parcel_id is None or parcel_id == "ALL":
return self.df
parcel_data = self.df[self.df['numparcell'] == parcel_id].copy()
return parcel_data
def filter_data_by_year_and_parcel(self, year, parcel_id):
"""Filtre les données par année et parcelle"""
if self.df is None:
return None
filtered_data = self.df.copy()
# Filtrer par année si spécifiée
if year is not None:
filtered_data = filtered_data[filtered_data['millesime'] == year]
# Filtrer par parcelle si spécifiée (et différente de "ALL")
if parcel_id is not None and parcel_id != "ALL":
# Convertir parcel_id en type approprié (gérer string/int)
try:
# Essayer de convertir en entier si c'est une chaîne
if isinstance(parcel_id, str) and parcel_id.isdigit():
parcel_id_converted = int(parcel_id)
else:
parcel_id_converted = parcel_id
filtered_data = filtered_data[filtered_data['numparcell'] == parcel_id_converted]
except (ValueError, TypeError):
# En cas d'erreur de conversion, essayer tel quel
filtered_data = filtered_data[filtered_data['numparcell'] == parcel_id]
return filtered_data
def get_data_table_by_year_and_parcel(self, year, parcel_id=None, max_rows=1000):
"""Retourne un tableau des données pour une année et optionnellement une parcelle"""
try:
filtered_data = self.filter_data_by_year_and_parcel(year, parcel_id)
if filtered_data is None or len(filtered_data) == 0:
# Construire un message d'erreur informatif avec les données disponibles
available_years = sorted(self.df['millesime'].unique()) if self.df is not None else []
available_parcels = sorted(self.df['numparcell'].unique()) if self.df is not None else []
if parcel_id and parcel_id != "ALL":
error_msg = f"❌ Aucune donnée pour l'année {year} et la parcelle {parcel_id}\n\n"
error_msg += f"📅 **Années disponibles**: {', '.join(map(str, available_years))}\n"
error_msg += f"🏠 **Parcelles disponibles**: {', '.join(map(str, available_parcels[:10]))}"
if len(available_parcels) > 10:
error_msg += f" (et {len(available_parcels)-10} autres...)"
return None, error_msg
else:
error_msg = f"❌ Aucune donnée pour l'année {year}\n\n"
error_msg += f"📅 **Années disponibles**: {', '.join(map(str, available_years))}"
return None, error_msg
# Sélectionner les colonnes les plus importantes pour l'affichage
display_cols = []
important_cols = [
'millesime', 'numparcell', 'nomparc', 'surfparc',
'libelleusag', 'datedebut', 'datefin', 'libevenem',
'familleprod', 'produit', 'quantitetot', 'unite'
]
for col in important_cols:
if col in filtered_data.columns:
display_cols.append(col)
if not display_cols:
return None, "❌ Aucune colonne importante trouvée"
# Préparer les données pour l'affichage
display_df = filtered_data[display_cols].copy()
# Formater les colonnes pour un meilleur affichage
if 'surfparc' in display_df.columns:
display_df['surfparc'] = display_df['surfparc'].round(2)
if 'quantitetot' in display_df.columns:
display_df['quantitetot'] = pd.to_numeric(display_df['quantitetot'], errors='coerce').round(3)
# Trier par date si disponible, sinon par parcelle
if 'datedebut' in display_df.columns:
# Convertir les dates pour le tri
display_df['date_sort'] = pd.to_datetime(display_df['datedebut'], format='%d/%m/%y', errors='coerce')
display_df = display_df.sort_values(['numparcell', 'date_sort'])
display_df = display_df.drop('date_sort', axis=1)
else:
display_df = display_df.sort_values('numparcell')
# Limiter le nombre de lignes pour l'affichage
if len(display_df) > max_rows:
display_df = display_df.head(max_rows)
info_msg = f"📊 Affichage de {max_rows} premières lignes sur {len(filtered_data)} total"
else:
info_msg = f"📊 {len(display_df)} enregistrements au total"
# Renommer les colonnes pour l'affichage
column_mapping = {
'millesime': 'Année',
'numparcell': 'N° Parcelle',
'nomparc': 'Nom Parcelle',
'surfparc': 'Surface (ha)',
'libelleusag': 'Usage',
'datedebut': 'Date Début',
'datefin': 'Date Fin',
'libevenem': 'Type Intervention',
'familleprod': 'Famille Produit',
'produit': 'Produit',
'quantitetot': 'Quantité',
'unite': 'Unité'
}
# Appliquer le renommage seulement pour les colonnes présentes
rename_dict = {k: v for k, v in column_mapping.items() if k in display_df.columns}
display_df = display_df.rename(columns=rename_dict)
# Ajouter l'information sur la sélection
if year and parcel_id and parcel_id != "ALL":
info_msg = f"📊 Année {year} - Parcelle {parcel_id}: {len(display_df)} enregistrements"
elif year:
info_msg = f"📊 Année {year}: {len(display_df)} enregistrements"
elif parcel_id and parcel_id != "ALL":
info_msg = f"📊 Parcelle {parcel_id}: {len(display_df)} enregistrements"
return display_df, info_msg
except Exception as e:
print(f"❌ Erreur lors de la création du tableau: {e}")
return None, f"❌ Erreur lors de la création du tableau: {str(e)[:100]}..."