import os, shutil # Désactiver les analytics Gradio dès le débu os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" # shutil.rmtree(os.path.expanduser("~/.cache/huggingface/datasets"), ignore_errors=True) import gradio as gr import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots import warnings from datasets import load_dataset import pandas as pd from huggingface_hub import HfApi, hf_hub_download import urllib.parse from herbicide_analyzer import HerbicideAnalyzer warnings.filterwarnings('ignore') # Configuration Hugging Face hf_token = os.environ.get("HF_TOKEN") dataset_id = "HackathonCRA/2024" # Configuration des graphiques plt.style.use('default') sns.set_palette("husl") class AgricultureAnalyzer: def __init__(self): self.df = None self.risk_analysis = None def load_data(self): """Charge les données du dataset Hugging Face""" print("🔄 Chargement des données depuis Hugging Face...") print(f"📋 Dataset ID: {dataset_id}") print(f"📋 Token disponible: {'Oui' if hf_token else 'Non'}") self.df = None # 1) Tentative de chargement direct via datasets.load_dataset try: dataset = load_dataset( dataset_id, split="train", token=hf_token, trust_remote_code=True, ) print(f"📊 Dataset chargé: {len(dataset)} exemples") try: self.df = dataset.to_pandas() print("✅ Conversion to_pandas() réussie") except Exception as pandas_error: print(f"❌ Erreur to_pandas(): {pandas_error}") print("🔄 Tentative de conversion manuelle...") data_list = [] for i, item in enumerate(dataset): data_list.append(item) if i < 5: print(f"📋 Exemple {i}: {list(item.keys())}") self.df = pd.DataFrame(data_list) print(f"✅ Conversion manuelle réussie: {len(self.df)} lignes") except Exception as e: print(f"❌ Erreur lors du chargement depuis Hugging Face: {str(e)}") print(f"❌ Type d'erreur: {type(e).__name__}") # 2) Fallback: récupérer directement les fichiers du repo (csv/parquet/tsv/json) fallback_msg = self._fallback_load_from_repo_files() if self.df is None: return f"❌ Erreur lors du chargement du dataset : {str(e)} | Fallback: {fallback_msg}" # Si on n'a toujours pas de dataframe, arrêter if self.df is None: return "❌ Impossible de charger les données" print(f"📊 Données chargées: {len(self.df)} lignes") print(f"📊 Colonnes disponibles: {list(self.df.columns)}") # Nettoyage et validation required_columns = ["numparcell", "surfparc", "millesime"] missing_cols = [col for col in required_columns if col not in self.df.columns] if missing_cols: print(f"❌ Colonnes manquantes: {missing_cols}") self.df = None return f"❌ Colonnes manquantes: {missing_cols}" # Nettoyage initial_len = len(self.df) self.df = self.df.dropna(subset=required_columns) print(f"📊 Avant nettoyage: {initial_len} lignes") print(f"📊 Après nettoyage: {len(self.df)} lignes") def _fallback_load_from_repo_files(self): """Fallback pour charger les données en téléchargeant directement les fichiers du repo HF.""" try: print("🔄 Tentative de chargement alternatif via fichiers du dépôt Hugging Face...") api = HfApi() files = api.list_repo_files(repo_id=dataset_id, repo_type="dataset", token=hf_token) if not files: print("❌ Aucun fichier dans le dépôt") return "Aucun fichier trouvé dans le dépôt." data_files = [ f for f in files if f.lower().endswith((".parquet", ".csv", ".tsv", ".json")) ] if not data_files: print("❌ Aucun fichier de données exploitable (csv/tsv/parquet/json)") return "Aucun fichier exploitable (csv/tsv/parquet/json)." # Priorité: parquet > csv > tsv > json for ext in [".parquet", ".csv", ".tsv", ".json"]: selected = [f for f in data_files if f.lower().endswith(ext)] if selected: chosen_ext = ext selected_files = selected break print(f"📂 Fichiers détectés ({chosen_ext}): {selected_files[:5]}{' ...' if len(selected_files) > 5 else ''}") local_paths = [] for f in selected_files: local_path = hf_hub_download( repo_id=dataset_id, repo_type="dataset", filename=f, token=hf_token, ) local_paths.append(local_path) frames = [] if chosen_ext == ".parquet": for p in local_paths: frames.append(pd.read_parquet(p)) elif chosen_ext == ".csv": for p in local_paths: frames.append(pd.read_csv(p)) elif chosen_ext == ".tsv": for p in local_paths: frames.append(pd.read_csv(p, sep="\t")) elif chosen_ext == ".json": for p in local_paths: try: frames.append(pd.read_json(p, lines=True)) except Exception: frames.append(pd.read_json(p)) self.df = pd.concat(frames, ignore_index=True) if len(frames) > 1 else frames[0] print(f"✅ Fallback réussi: {len(self.df)} lignes chargées depuis les fichiers du dépôt") return None except Exception as e: print(f"❌ Fallback échoué: {e}") # Dernier recours: fichier local d'exemple sample_path = os.path.join(os.path.dirname(__file__), "sample_data.csv") if os.path.exists(sample_path): try: self.df = pd.read_csv(sample_path) print(f"✅ Chargement du fichier local 'sample_data.csv' ({len(self.df)} lignes)") return "Chargement via fichier local de secours." except Exception as e2: print(f"❌ Échec du chargement du fichier local: {e2}") return f"Fallback échoué: {e}" def analyze_data(self): """Analyse des données et calcul des risques""" if self.df is None or len(self.df) == 0: print("❌ Pas de données à analyser") return "Erreur: Aucune donnée chargée" try: print(f"🔄 Début de l'analyse sur {len(self.df)} enregistrements...") # Analyse générale general_stats = { 'total_parcelles': self.df['numparcell'].nunique(), 'total_interventions': len(self.df), 'surface_totale': self.df['surfparc'].sum(), 'surface_moyenne': self.df['surfparc'].mean(), 'periode': f"{self.df['millesime'].min()} - {self.df['millesime'].max()}" } # Analyse des herbicides if 'familleprod' in self.df.columns: herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'].copy() herbicide_stats = { 'nb_interventions_herbicides': len(herbicides_df), 'pourcentage_herbicides': (len(herbicides_df) / len(self.df)) * 100, 'parcelles_traitees': herbicides_df['numparcell'].nunique() } else: herbicide_stats = { 'nb_interventions_herbicides': 0, 'pourcentage_herbicides': 0, 'parcelles_traitees': 0 } # Calcul de l'analyse des risques self.calculate_risk_analysis() print("✅ Analyse terminée avec succès") return general_stats, herbicide_stats except Exception as e: print(f"❌ Erreur lors de l'analyse: {str(e)}") return None, None def calculate_risk_analysis(self): """Calcule l'analyse des risques par parcelle""" try: print("🔄 Calcul de l'analyse des risques...") # Vérifier les colonnes nécessaires required_group_cols = ['numparcell', 'surfparc'] optional_group_cols = ['nomparc', 'libelleusag'] # Construire la liste des colonnes de groupement disponibles group_cols = [col for col in required_group_cols if col in self.df.columns] group_cols.extend([col for col in optional_group_cols if col in self.df.columns]) if len(group_cols) < 2: print(f"❌ Colonnes insuffisantes pour le groupement: {group_cols}") self.risk_analysis = pd.DataFrame() return # Construire l'agrégation selon les colonnes disponibles agg_dict = {} if 'familleprod' in self.df.columns: agg_dict['familleprod'] = lambda x: (x == 'Herbicides').sum() if 'libevenem' in self.df.columns: agg_dict['libevenem'] = lambda x: len(x.unique()) if 'produit' in self.df.columns: agg_dict['produit'] = lambda x: len(x.unique()) if 'quantitetot' in self.df.columns: agg_dict['quantitetot'] = 'sum' if not agg_dict: print("❌ Aucune colonne disponible pour l'agrégation") self.risk_analysis = pd.DataFrame() return # Groupement des données par parcelle risk_analysis = self.df.groupby(group_cols).agg(agg_dict).round(2) # Quantités d'herbicides spécifiques (seulement si les colonnes existent) if 'familleprod' in self.df.columns and 'quantitetot' in self.df.columns: herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'] if len(herbicides_df) > 0: herbicide_quantities = herbicides_df.groupby(group_cols)['quantitetot'].sum().fillna(0) risk_analysis['Quantite_herbicides'] = herbicide_quantities.reindex(risk_analysis.index, fill_value=0) else: risk_analysis['Quantite_herbicides'] = 0 else: risk_analysis['Quantite_herbicides'] = 0 # Renommer les colonnes de façon sécurisée new_column_names = {} if 'familleprod' in agg_dict: new_column_names['familleprod'] = 'Nb_herbicides' if 'libevenem' in agg_dict: new_column_names['libevenem'] = 'Diversite_evenements' if 'produit' in agg_dict: new_column_names['produit'] = 'Diversite_produits' if 'quantitetot' in agg_dict: new_column_names['quantitetot'] = 'Quantite_totale' risk_analysis = risk_analysis.rename(columns=new_column_names) # Calcul de l'IFT approximatif if 'surfparc' in group_cols: risk_analysis['IFT_herbicide_approx'] = (risk_analysis['Quantite_herbicides'] / risk_analysis.index.get_level_values('surfparc')).round(2) else: risk_analysis['IFT_herbicide_approx'] = 0 # Classification du risque def classify_risk(row): ift = row.get('IFT_herbicide_approx', 0) nb_herb = row.get('Nb_herbicides', 0) if ift == 0 and nb_herb == 0: return 'TRÈS FAIBLE' elif ift < 1 and nb_herb <= 1: return 'FAIBLE' elif ift < 3 and nb_herb <= 3: return 'MODÉRÉ' elif ift < 5 and nb_herb <= 5: return 'ÉLEVÉ' else: return 'TRÈS ÉLEVÉ' risk_analysis['Risque_adventice'] = risk_analysis.apply(classify_risk, axis=1) # Tri par risque risk_order = ['TRÈS FAIBLE', 'FAIBLE', 'MODÉRÉ', 'ÉLEVÉ', 'TRÈS ÉLEVÉ'] risk_analysis['Risk_Score'] = risk_analysis['Risque_adventice'].map({r: i for i, r in enumerate(risk_order)}) self.risk_analysis = risk_analysis.sort_values(['Risk_Score', 'IFT_herbicide_approx']) print(f"✅ Analyse des risques terminée: {len(self.risk_analysis)} parcelles analysées") except Exception as e: print(f"❌ Erreur lors du calcul des risques: {str(e)}") self.risk_analysis = pd.DataFrame() def get_summary_stats(self): """Retourne les statistiques de résumé""" if self.df is None: return "Aucune donnée disponible" stats_text = f""" ## 📊 Statistiques Générales - **Nombre total de parcelles**: {self.df['numparcell'].nunique()} - **Nombre d'interventions**: {len(self.df):,} - **Surface totale**: {self.df['surfparc'].sum():.2f} hectares - **Surface moyenne par parcelle**: {self.df['surfparc'].mean():.2f} hectares - **Période**: {self.df['millesime'].min()} - {self.df['millesime'].max()} ## 🧪 Analyse Herbicides """ herbicides_df = self.df[self.df['familleprod'] == 'Herbicides'] if len(herbicides_df) > 0: stats_text += f""" - **Interventions herbicides**: {len(herbicides_df)} ({(len(herbicides_df)/len(self.df)*100):.1f}%) - **Parcelles traitées**: {herbicides_df['numparcell'].nunique()} - **Produits herbicides différents**: {herbicides_df['produit'].nunique()} """ if self.risk_analysis is not None: risk_distribution = self.risk_analysis['Risque_adventice'].value_counts() stats_text += f""" ## 🎯 Répartition des Risques Adventices """ for risk_level in ['TRÈS FAIBLE', 'FAIBLE', 'MODÉRÉ', 'ÉLEVÉ', 'TRÈS ÉLEVÉ']: if risk_level in risk_distribution: count = risk_distribution[risk_level] pct = (count / len(self.risk_analysis)) * 100 stats_text += f"- **{risk_level}**: {count} parcelles ({pct:.1f}%)\n" return stats_text def get_low_risk_recommendations(self): """Retourne les recommandations pour les parcelles à faible risque""" if self.risk_analysis is None: return "Analyse des risques non disponible" low_risk = self.risk_analysis[ self.risk_analysis['Risque_adventice'].isin(['TRÈS FAIBLE', 'FAIBLE']) ].head(10) recommendations = "## 🌾 TOP 10 - Parcelles Recommandées pour Cultures Sensibles (Pois, Haricot)\n\n" for idx, row in low_risk.iterrows(): if isinstance(idx, tuple) and len(idx) >= 4: parcelle, nom, culture, surface = idx[:4] else: # Fallback si l'index n'est pas un tuple de 4 éléments parcelle = str(idx) nom = "N/A" culture = "N/A" surface = row.get('surfparc', 0) if 'surfparc' in row else 0 recommendations += f""" **Parcelle {parcelle}** ({nom}) - Culture actuelle: {culture} - Surface: {surface:.2f} ha - Niveau de risque: {row['Risque_adventice']} - IFT herbicide: {row['IFT_herbicide_approx']:.2f} - Nombre d'herbicides: {row.get('Nb_herbicides', 0)} --- """ return recommendations def create_risk_visualization(self): """Crée la visualisation des risques""" if self.risk_analysis is None or len(self.risk_analysis) == 0: # Créer un graphique vide avec message d'erreur fig = px.scatter(title="❌ Aucune donnée d'analyse des risques disponible") fig.add_annotation(text="Veuillez charger les données d'abord", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig risk_df = self.risk_analysis.reset_index() # Vérifier quelles colonnes sont disponibles pour hover_data available_hover_cols = [] for col in ['nomparc', 'libelleusag']: if col in risk_df.columns: available_hover_cols.append(col) fig = px.scatter(risk_df, x='surfparc', y='IFT_herbicide_approx', color='Risque_adventice', size='Nb_herbicides', hover_data=available_hover_cols if available_hover_cols else None, color_discrete_map={ 'TRÈS FAIBLE': 'green', 'FAIBLE': 'lightgreen', 'MODÉRÉ': 'orange', 'ÉLEVÉ': 'red', 'TRÈS ÉLEVÉ': 'darkred' }, title="🎯 Analyse du Risque Adventice par Parcelle", labels={ 'surfparc': 'Surface de la parcelle (ha)', 'IFT_herbicide_approx': 'IFT Herbicide (approximatif)', 'Risque_adventice': 'Niveau de risque' }) fig.update_layout(width=800, height=600, title_font_size=16) return fig def create_culture_analysis(self): """Analyse par type de culture""" if self.df is None or len(self.df) == 0: # Créer un graphique vide avec message d'erreur fig = px.pie(title="❌ Aucune donnée disponible") fig.add_annotation(text="Veuillez charger les données d'abord", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig culture_counts = self.df['libelleusag'].value_counts() fig = px.pie(values=culture_counts.values, names=culture_counts.index, title="🌱 Répartition des Cultures") fig.update_layout(width=700, height=500) return fig def create_risk_distribution(self): """Distribution des niveaux de risque""" if self.risk_analysis is None or len(self.risk_analysis) == 0: # Créer un graphique vide avec message d'erreur fig = px.bar(title="❌ Aucune analyse des risques disponible") fig.add_annotation(text="Veuillez charger les données d'abord", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig risk_counts = self.risk_analysis['Risque_adventice'].value_counts() fig = px.bar(x=risk_counts.index, y=risk_counts.values, color=risk_counts.index, color_discrete_map={ 'TRÈS FAIBLE': 'green', 'FAIBLE': 'lightgreen', 'MODÉRÉ': 'orange', 'ÉLEVÉ': 'red', 'TRÈS ÉLEVÉ': 'darkred' }, title="📊 Distribution des Niveaux de Risque Adventice", labels={'x': 'Niveau de risque', 'y': 'Nombre de parcelles'}) fig.update_layout(width=700, height=500, showlegend=False) return fig def get_available_years(self): """Retourne la liste des années disponibles dans les données""" if self.df is None or len(self.df) == 0: return [] years = sorted(self.df['millesime'].dropna().unique()) return [int(year) for year in years if pd.notna(year)] def get_available_parcels(self): """Retourne la liste des parcelles disponibles dans les données""" if self.df is None or len(self.df) == 0: return [] # Créer une liste avec numéro et nom de parcelle si disponible parcels_info = [] if 'nomparc' in self.df.columns: # Grouper par parcelle et prendre le premier nom (en cas de doublons) parcels_data = self.df.groupby('numparcell')['nomparc'].first().reset_index() for _, row in parcels_data.iterrows(): parcel_id = str(row['numparcell']) parcel_name = str(row['nomparc']) if pd.notna(row['nomparc']) else "" if parcel_name and parcel_name != "nan": display_name = f"{parcel_id} - {parcel_name}" else: display_name = parcel_id parcels_info.append((display_name, parcel_id)) else: # Seulement les numéros de parcelles unique_parcels = sorted(self.df['numparcell'].dropna().unique()) parcels_info = [(str(p), str(p)) for p in unique_parcels] # Ajouter l'option "Toutes les parcelles" en premier parcels_info.insert(0, ("Toutes les parcelles", "ALL")) return parcels_info def filter_data_by_year(self, year): """Filtre les données par année""" if self.df is None or year is None: return None year_data = self.df[self.df['millesime'] == year].copy() return year_data def filter_data_by_parcel(self, parcel_id): """Filtre les données par parcelle""" if self.df is None or parcel_id is None or parcel_id == "ALL": return self.df parcel_data = self.df[self.df['numparcell'] == parcel_id].copy() return parcel_data def filter_data_by_year_and_parcel(self, year, parcel_id): """Filtre les données par année et parcelle""" if self.df is None: return None filtered_data = self.df.copy() # Filtrer par année si spécifiée if year is not None: filtered_data = filtered_data[filtered_data['millesime'] == year] # Filtrer par parcelle si spécifiée (et différente de "ALL") if parcel_id is not None and parcel_id != "ALL": # Convertir parcel_id en type approprié (gérer string/int) try: # Essayer de convertir en entier si c'est une chaîne if isinstance(parcel_id, str) and parcel_id.isdigit(): parcel_id_converted = int(parcel_id) else: parcel_id_converted = parcel_id filtered_data = filtered_data[filtered_data['numparcell'] == parcel_id_converted] except (ValueError, TypeError): # En cas d'erreur de conversion, essayer tel quel filtered_data = filtered_data[filtered_data['numparcell'] == parcel_id] return filtered_data def get_year_summary_stats(self, year, parcel_id=None): """Retourne les statistiques de résumé pour une année spécifique et optionnellement une parcelle""" filtered_data = self.filter_data_by_year_and_parcel(year, parcel_id) if filtered_data is None or len(filtered_data) == 0: if parcel_id and parcel_id != "ALL": return f"❌ Aucune donnée disponible pour l'année {year} et la parcelle {parcel_id}" else: return f"❌ Aucune donnée disponible pour l'année {year}" # Titre dynamique selon le filtrage if parcel_id and parcel_id != "ALL": title = f"## 📊 Statistiques pour l'année {year} - Parcelle {parcel_id}" # Obtenir le nom de la parcelle si disponible if 'nomparc' in filtered_data.columns: parcel_name = filtered_data['nomparc'].iloc[0] if len(filtered_data) > 0 else "" if parcel_name and str(parcel_name) != "nan": title = f"## 📊 Statistiques pour l'année {year} - Parcelle {parcel_id} ({parcel_name})" else: title = f"## 📊 Statistiques pour l'année {year}" stats_text = f""" {title} - **Nombre de parcelles**: {filtered_data['numparcell'].nunique()} - **Nombre d'interventions**: {len(filtered_data):,} - **Surface totale**: {filtered_data['surfparc'].sum():.2f} hectares - **Surface moyenne par parcelle**: {filtered_data['surfparc'].mean():.2f} hectares ## 🧪 Analyse Herbicides """ if 'familleprod' in filtered_data.columns: herbicides_df = filtered_data[filtered_data['familleprod'] == 'Herbicides'] if len(herbicides_df) > 0: stats_text += f""" - **Interventions herbicides**: {len(herbicides_df)} ({(len(herbicides_df)/len(filtered_data)*100):.1f}%) - **Parcelles traitées**: {herbicides_df['numparcell'].nunique()} - **Produits herbicides différents**: {herbicides_df['produit'].nunique()} """ else: stats_text += "\n- **Aucune intervention herbicide cette période**" return stats_text def create_year_culture_analysis(self, year, parcel_id=None): """Analyse par type de culture pour une année spécifique et optionnellement une parcelle""" filtered_data = self.filter_data_by_year_and_parcel(year, parcel_id) # Titre dynamique if parcel_id and parcel_id != "ALL": title_suffix = f" - {year} (Parcelle {parcel_id})" else: title_suffix = f" - {year}" if filtered_data is None or len(filtered_data) == 0: fig = px.pie(title=f"❌ Aucune donnée disponible{title_suffix}") fig.add_annotation(text=f"Aucune donnée pour cette sélection", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig if 'libelleusag' not in filtered_data.columns: fig = px.pie(title=f"❌ Données de culture manquantes{title_suffix}") fig.add_annotation(text="Colonne 'libelleusag' manquante", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig culture_counts = filtered_data['libelleusag'].value_counts() fig = px.pie(values=culture_counts.values, names=culture_counts.index, title=f"🌱 Répartition des Cultures{title_suffix}") fig.update_layout(width=700, height=500) return fig def create_year_interventions_timeline(self, year, parcel_id=None): """Crée un graphique temporel des interventions pour une année et optionnellement une parcelle""" filtered_data = self.filter_data_by_year_and_parcel(year, parcel_id) # Titre dynamique if parcel_id and parcel_id != "ALL": title_suffix = f" - {year} (Parcelle {parcel_id})" else: title_suffix = f" - {year}" if filtered_data is None or len(filtered_data) == 0: fig = px.bar(title=f"❌ Aucune donnée disponible{title_suffix}") fig.add_annotation(text=f"Aucune donnée pour cette sélection", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig if 'datedebut' not in filtered_data.columns: # Fallback: graphique des types d'événements if 'libevenem' in filtered_data.columns: event_counts = filtered_data['libevenem'].value_counts() fig = px.bar(x=event_counts.index, y=event_counts.values, title=f"📊 Types d'Interventions{title_suffix}", labels={'x': 'Type d\'intervention', 'y': 'Nombre'}) fig.update_layout(width=800, height=500) fig.update_xaxis(tickangle=45) return fig else: fig = px.bar(title=f"❌ Données d'intervention manquantes{title_suffix}") fig.add_annotation(text="Colonnes de dates manquantes", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig # Convertir les dates et créer le graphique temporel try: filtered_data['datedebut_parsed'] = pd.to_datetime(filtered_data['datedebut'], format='%d/%m/%y', errors='coerce') filtered_data['mois'] = filtered_data['datedebut_parsed'].dt.month monthly_counts = filtered_data.groupby('mois').size().reset_index() monthly_counts.columns = ['mois', 'nb_interventions'] # Ajouter les noms des mois month_names = {1: 'Jan', 2: 'Fév', 3: 'Mar', 4: 'Avr', 5: 'Mai', 6: 'Jun', 7: 'Jul', 8: 'Aoû', 9: 'Sep', 10: 'Oct', 11: 'Nov', 12: 'Déc'} monthly_counts['mois_nom'] = monthly_counts['mois'].map(month_names) fig = px.bar(monthly_counts, x='mois_nom', y='nb_interventions', title=f"📅 Répartition Mensuelle des Interventions{title_suffix}", labels={'mois_nom': 'Mois', 'nb_interventions': 'Nombre d\'interventions'}) fig.update_layout(width=800, height=500) return fig except Exception as e: print(f"❌ Erreur lors de la création du graphique temporel: {e}") # Fallback vers le graphique des événements if 'libevenem' in filtered_data.columns: event_counts = filtered_data['libevenem'].value_counts() fig = px.bar(x=event_counts.index, y=event_counts.values, title=f"📊 Types d'Interventions{title_suffix}", labels={'x': 'Type d\'intervention', 'y': 'Nombre'}) fig.update_layout(width=800, height=500) fig.update_xaxis(tickangle=45) return fig else: fig = px.bar(title=f"❌ Erreur lors du traitement des données{title_suffix}") return fig def create_year_herbicide_analysis(self, year, parcel_id=None): """Analyse des herbicides pour une année spécifique et optionnellement une parcelle""" filtered_data = self.filter_data_by_year_and_parcel(year, parcel_id) # Titre dynamique if parcel_id and parcel_id != "ALL": title_suffix = f" - {year} (Parcelle {parcel_id})" else: title_suffix = f" - {year}" if filtered_data is None or len(filtered_data) == 0: fig = px.bar(title=f"❌ Aucune donnée disponible{title_suffix}") fig.add_annotation(text=f"Aucune donnée pour cette sélection", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig if 'familleprod' not in filtered_data.columns: fig = px.bar(title=f"❌ Données de produits manquantes{title_suffix}") fig.add_annotation(text="Colonne 'familleprod' manquante", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig herbicides_df = filtered_data[filtered_data['familleprod'] == 'Herbicides'] if len(herbicides_df) == 0: fig = px.bar(title=f"✅ Aucun herbicide utilisé{title_suffix}") fig.add_annotation(text=f"Aucune intervention herbicide pour cette sélection", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig if 'produit' in herbicides_df.columns: herbicide_counts = herbicides_df['produit'].value_counts().head(10) fig = px.bar(x=herbicide_counts.values, y=herbicide_counts.index, orientation='h', title=f"🧪 Top 10 Herbicides Utilisés{title_suffix}", labels={'x': 'Nombre d\'utilisations', 'y': 'Produit'}) fig.update_layout(width=800, height=500) return fig else: fig = px.bar(title=f"❌ Détails des produits manquants{title_suffix}") fig.add_annotation(text="Colonne 'produit' manquante", xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) return fig def get_data_table_by_year_and_parcel(self, year, parcel_id=None, max_rows=1000): """Retourne un tableau des données pour une année et optionnellement une parcelle""" try: filtered_data = self.filter_data_by_year_and_parcel(year, parcel_id) if filtered_data is None or len(filtered_data) == 0: if parcel_id and parcel_id != "ALL": return None, f"❌ Aucune donnée pour l'année {year} et la parcelle {parcel_id}" else: return None, f"❌ Aucune donnée pour l'année {year}" # Sélectionner les colonnes les plus importantes pour l'affichage display_cols = [] important_cols = [ 'millesime', 'numparcell', 'nomparc', 'surfparc', 'libelleusag', 'datedebut', 'datefin', 'libevenem', 'familleprod', 'produit', 'quantitetot', 'unite' ] for col in important_cols: if col in filtered_data.columns: display_cols.append(col) if not display_cols: return None, "❌ Aucune colonne importante trouvée" # Préparer les données pour l'affichage display_df = filtered_data[display_cols].copy() # Formater les colonnes pour un meilleur affichage if 'surfparc' in display_df.columns: display_df['surfparc'] = display_df['surfparc'].round(2) if 'quantitetot' in display_df.columns: display_df['quantitetot'] = pd.to_numeric(display_df['quantitetot'], errors='coerce').round(3) # Trier par date si disponible, sinon par parcelle if 'datedebut' in display_df.columns: # Convertir les dates pour le tri display_df['date_sort'] = pd.to_datetime(display_df['datedebut'], format='%d/%m/%y', errors='coerce') display_df = display_df.sort_values(['numparcell', 'date_sort']) display_df = display_df.drop('date_sort', axis=1) else: display_df = display_df.sort_values('numparcell') # Limiter le nombre de lignes pour l'affichage if len(display_df) > max_rows: display_df = display_df.head(max_rows) info_msg = f"📊 Affichage de {max_rows} premières lignes sur {len(filtered_data)} total" else: info_msg = f"📊 {len(display_df)} enregistrements au total" # Renommer les colonnes pour un affichage plus clair column_names = { 'millesime': 'Année', 'numparcell': 'N° Parcelle', 'nomparc': 'Nom Parcelle', 'surfparc': 'Surface (ha)', 'libelleusag': 'Culture', 'datedebut': 'Date Début', 'datefin': 'Date Fin', 'libevenem': 'Type Intervention', 'familleprod': 'Famille Produit', 'produit': 'Produit', 'quantitetot': 'Quantité', 'unite': 'Unité' } # Renommer seulement les colonnes qui existent rename_dict = {k: v for k, v in column_names.items() if k in display_df.columns} display_df = display_df.rename(columns=rename_dict) return display_df, info_msg except Exception as e: print(f"❌ Erreur dans get_data_table_by_year_and_parcel: {e}") return None, f"❌ Erreur lors de la génération du tableau: {str(e)[:100]}..." # Initialisation de l'analyseur analyzer = AgricultureAnalyzer() analyzer.load_data() analyzer.analyze_data() # Analyse des données après chargement # Initialisation de l'analyseur d'herbicides herbicide_analyzer = HerbicideAnalyzer(analyzer.df) # Interface Gradio def create_interface(): with gr.Blocks(title="🌾 Analyse Adventices Agricoles CRA", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🌾 Analyse des Adventices Agricoles - CRA Bretagne **Objectif**: Anticiper et réduire la pression des adventices dans les parcelles agricoles bretonnes Cette application analyse les données historiques pour identifier les parcelles les plus adaptées à la culture de plantes sensibles comme le pois ou le haricot. """) with gr.Tabs(): with gr.TabItem("📊 Vue d'ensemble"): gr.Markdown("## Statistiques générales des données agricoles") stats_output = gr.Markdown(analyzer.get_summary_stats()) with gr.Row(): culture_plot = gr.Plot(analyzer.create_culture_analysis()) risk_dist_plot = gr.Plot(analyzer.create_risk_distribution()) with gr.TabItem("🎯 Analyse des Risques"): gr.Markdown("## Cartographie des risques adventices par parcelle") risk_plot = gr.Plot(analyzer.create_risk_visualization()) gr.Markdown(""" **Interprétation du graphique**: - **Axe X**: Surface de la parcelle (hectares) - **Axe Y**: IFT Herbicide approximatif - **Couleur**: Niveau de risque adventice - **Taille**: Nombre d'herbicides utilisés Les parcelles vertes (risque faible) sont idéales pour les cultures sensibles. """) with gr.TabItem("🌾 Recommandations"): reco_output = gr.Markdown(analyzer.get_low_risk_recommendations()) gr.Markdown(""" ## 💡 Conseils pour la gestion des adventices ### Parcelles à Très Faible Risque (Vertes) - ✅ **Idéales pour pois et haricot** - ✅ Historique d'usage herbicide minimal - ✅ Pression adventice faible attendue ### Parcelles à Faible Risque (Vert clair) - ⚠️ Surveillance légère recommandée - ✅ Conviennent aux cultures sensibles avec précautions ### Parcelles à Risque Modéré/Élevé (Orange/Rouge) - ❌ Éviter pour cultures sensibles - 🔍 Rotation nécessaire avant implantation - 📈 Surveillance renforcée des adventices ### Stratégies alternatives - **Rotation longue**: 3-4 ans avant cultures sensibles - **Cultures intermédiaires**: CIPAN pour réduire la pression - **Techniques mécaniques**: Hersage, binage - **Biostimulants**: Renforcement naturel des cultures """) with gr.TabItem("📅 Analyse par Année"): gr.Markdown("## Visualisation des données par année et par parcelle") # Contrôles de filtrage with gr.Row(): with gr.Column(): # Liste déroulante pour sélectionner l'année available_years = analyzer.get_available_years() if available_years: default_year = available_years[-1] # Dernière année par défaut year_dropdown = gr.Dropdown( choices=available_years, value=default_year, label="🗓️ Sélectionner une année", info="Choisissez l'année à analyser dans la liste déroulante" ) else: year_dropdown = gr.Dropdown( choices=[], value=None, label="🗓️ Sélectionner une année", info="Aucune année disponible" ) with gr.Column(): # Liste déroulante pour sélectionner la parcelle available_parcels = analyzer.get_available_parcels() parcel_choices = [item[0] for item in available_parcels] # Noms d'affichage parcel_values = [item[1] for item in available_parcels] # Valeurs réelles parcel_dropdown = gr.Dropdown( choices=list(zip(parcel_choices, parcel_values)), value="ALL" if available_parcels else None, label="🏠 Sélectionner une parcelle", info="Choisissez une parcelle spécifique ou 'Toutes les parcelles'" ) # Statistiques pour l'année sélectionnée year_stats = gr.Markdown("") # Graphiques pour l'année sélectionnée with gr.Row(): year_culture_plot = gr.Plot() year_timeline_plot = gr.Plot() with gr.Row(): year_herbicide_plot = gr.Plot() # Fonction de mise à jour quand l'année ou la parcelle change def update_year_analysis(selected_year, selected_parcel): if selected_year is None: empty_fig = px.bar(title="❌ Aucune année sélectionnée") return ( "❌ Veuillez sélectionner une année", empty_fig, empty_fig, empty_fig ) stats = analyzer.get_year_summary_stats(selected_year, selected_parcel) culture_fig = analyzer.create_year_culture_analysis(selected_year, selected_parcel) timeline_fig = analyzer.create_year_interventions_timeline(selected_year, selected_parcel) herbicide_fig = analyzer.create_year_herbicide_analysis(selected_year, selected_parcel) return stats, culture_fig, timeline_fig, herbicide_fig # Connecter les listes déroulantes aux mises à jour year_dropdown.change( update_year_analysis, inputs=[year_dropdown, parcel_dropdown], outputs=[year_stats, year_culture_plot, year_timeline_plot, year_herbicide_plot] ) parcel_dropdown.change( update_year_analysis, inputs=[year_dropdown, parcel_dropdown], outputs=[year_stats, year_culture_plot, year_timeline_plot, year_herbicide_plot] ) # Initialiser avec l'année et la parcelle par défaut if available_years: default_parcel = "ALL" initial_stats, initial_culture, initial_timeline, initial_herbicide = update_year_analysis(default_year, default_parcel) year_stats.value = initial_stats year_culture_plot.value = initial_culture year_timeline_plot.value = initial_timeline year_herbicide_plot.value = initial_herbicide gr.Markdown(""" **Informations sur cet onglet:** - 🗓️ **Filtre Année**: Sélectionnez l'année à analyser - 🏠 **Filtre Parcelle**: Choisissez une parcelle spécifique ou toutes les parcelles - 📊 **Statistiques**: Résumé pour la sélection (année + parcelle) - 🌱 **Cultures**: Répartition des types de cultures - 📅 **Timeline**: Répartition temporelle des interventions - 🧪 **Herbicides**: Top 10 des herbicides utilisés 💡 **Utilisation**: Combinez les filtres pour explorer les données en détail """) with gr.TabItem("📋 Données par Année"): gr.Markdown("## Exploration des données tabulaires par année et parcelle") # Contrôles de filtrage identiques à l'onglet précédent with gr.Row(): with gr.Column(): # Liste déroulante pour sélectionner l'année data_year_dropdown = gr.Dropdown( choices=available_years, value=available_years[-1] if available_years else None, label="🗓️ Sélectionner une année", info="Choisissez l'année à analyser" ) with gr.Column(): # Liste déroulante pour sélectionner la parcelle data_parcel_dropdown = gr.Dropdown( choices=list(zip(parcel_choices, parcel_values)), value="ALL" if available_parcels else None, label="🏠 Sélectionner une parcelle", info="Choisissez une parcelle spécifique ou 'Toutes les parcelles'" ) # Contrôles supplémentaires with gr.Row(): with gr.Column(): max_rows_slider = gr.Slider( minimum=100, maximum=5000, value=1000, step=100, label="📊 Nombre maximum de lignes à afficher" ) with gr.Column(): update_data_btn = gr.Button("🔄 Actualiser les données", variant="primary") export_info = gr.Markdown("💡 **Astuce**: Vous pouvez copier les données du tableau ci-dessous") # Message d'information data_info_msg = gr.Markdown("") # Tableau des données data_table = gr.Dataframe( label="📋 Données détaillées", interactive=False, wrap=True, height=500 ) # Fonction de mise à jour des données def update_data_table(selected_year, selected_parcel, max_rows): if selected_year is None: return "❌ Veuillez sélectionner une année", None try: data_df, info_msg = analyzer.get_data_table_by_year_and_parcel( selected_year, selected_parcel, max_rows ) if data_df is None: return info_msg, None return info_msg, data_df except Exception as e: return f"❌ Erreur: {str(e)}", None # Connecter les contrôles aux mises à jour update_data_btn.click( update_data_table, inputs=[data_year_dropdown, data_parcel_dropdown, max_rows_slider], outputs=[data_info_msg, data_table] ) data_year_dropdown.change( update_data_table, inputs=[data_year_dropdown, data_parcel_dropdown, max_rows_slider], outputs=[data_info_msg, data_table] ) data_parcel_dropdown.change( update_data_table, inputs=[data_year_dropdown, data_parcel_dropdown, max_rows_slider], outputs=[data_info_msg, data_table] ) # Initialiser avec les valeurs par défaut if available_years: default_data_parcel = "ALL" initial_info, initial_data = update_data_table(default_year, default_data_parcel, 1000) data_info_msg.value = initial_info if initial_data is not None: data_table.value = initial_data gr.Markdown(""" **Informations sur cet onglet:** - 📋 **Vue tabulaire**: Exploration détaillée des données ligne par ligne - 🔍 **Filtrage avancé**: Combinez année et parcelle pour cibler vos recherches - 📊 **Contrôle du volume**: Ajustez le nombre de lignes affichées - 📋 **Colonnes optimisées**: Affichage des informations les plus pertinentes - 🗓️ **Tri intelligent**: Données triées par parcelle et date 💡 **Cas d'usage**: - Vérification détaillée des interventions - Export de données pour analyse externe - Traçabilité parcelle par parcelle - Audit des pratiques agricoles """) with gr.TabItem("🔍 Requêtes Herbicides"): gr.Markdown("## Requêtes avancées pour l'analyse des herbicides") with gr.Tabs(): with gr.TabItem("🏆 Top IFT par Année"): gr.Markdown("### Parcelles avec les IFT herbicides les plus élevés") with gr.Row(): with gr.Column(): year_select_ift = gr.Dropdown( choices=analyzer.get_available_years(), value=analyzer.get_available_years()[-1] if analyzer.get_available_years() else None, label="📅 Année à analyser" ) n_parcels_ift = gr.Slider( minimum=5, maximum=50, value=10, step=5, label="🔢 Nombre de parcelles à afficher" ) btn_ift = gr.Button("🔍 Analyser les IFT", variant="primary") with gr.Column(): ift_chart = gr.Plot() ift_table = gr.Dataframe( headers=["Parcelle", "Surface", "IFT Herbicide", "Quantité Totale", "Nb Produits"], label="📊 Détail des parcelles avec IFT élevé" ) def analyze_ift(year, n_parcels): if year is None: return None, "❌ Veuillez sélectionner une année" data, message = herbicide_analyzer.get_top_ift_parcels_by_year(year, n_parcels) chart = herbicide_analyzer.create_ift_ranking_chart(year, n_parcels) return chart, data btn_ift.click( analyze_ift, inputs=[year_select_ift, n_parcels_ift], outputs=[ift_chart, ift_table] ) with gr.TabItem("📖 Historique Parcelle"): gr.Markdown("### Produits utilisés sur une parcelle spécifique") with gr.Row(): with gr.Column(): parcel_id_input = gr.Textbox( label="🏠 Numéro de parcelle", placeholder="Ex: 21, 22, etc." ) n_years_history = gr.Slider( minimum=1, maximum=15, value=5, step=1, label="📅 Nombre d'années à analyser" ) btn_history = gr.Button("🔍 Analyser l'historique", variant="primary") with gr.Column(): history_chart = gr.Plot() history_table = gr.Dataframe( headers=["Année", "Produit", "Quantité", "Date début", "Date fin"], label="📊 Historique des produits utilisés" ) def analyze_parcel_history(parcel_id, n_years): if not parcel_id or parcel_id.strip() == "": return None, "❌ Veuillez entrer un numéro de parcelle" data, message = herbicide_analyzer.get_parcel_products_history(parcel_id.strip(), n_years) chart = herbicide_analyzer.create_product_timeline_chart(parcel_id.strip(), n_years) return chart, data btn_history.click( analyze_parcel_history, inputs=[parcel_id_input, n_years_history], outputs=[history_chart, history_table] ) with gr.TabItem("🎯 Recherche de Produits"): gr.Markdown("### Parcelles ayant reçu des produits spécifiques") with gr.Row(): with gr.Column(): products_input = gr.Textbox( label="🧪 Noms de produits (séparés par des virgules)", placeholder="Ex: Minarex, Chardex, Aligator", lines=2 ) n_years_search = gr.Slider( minimum=1, maximum=15, value=10, step=1, label="📅 Période d'analyse (années)" ) with gr.Row(): btn_with_products = gr.Button("✅ Parcelles AVEC ces produits", variant="primary") btn_without_products = gr.Button("❌ Parcelles SANS ces produits", variant="secondary") search_results_table = gr.Dataframe( label="📊 Résultats de la recherche" ) search_message = gr.Markdown("") def search_with_products(products_text, n_years): if not products_text or products_text.strip() == "": return "❌ Veuillez entrer au moins un nom de produit", None products_list = [p.strip() for p in products_text.split(",") if p.strip()] data, message = herbicide_analyzer.find_parcels_with_products(products_list, n_years) return message, data def search_without_products(products_text, n_years): if not products_text or products_text.strip() == "": return "❌ Veuillez entrer au moins un nom de produit", None products_list = [p.strip() for p in products_text.split(",") if p.strip()] data, message = herbicide_analyzer.find_parcels_without_products(products_list, n_years) return message, data btn_with_products.click( search_with_products, inputs=[products_input, n_years_search], outputs=[search_message, search_results_table] ) btn_without_products.click( search_without_products, inputs=[products_input, n_years_search], outputs=[search_message, search_results_table] ) with gr.TabItem("🗓️ Périodes d'Intervention"): gr.Markdown("### Analyse des périodes d'interventions herbicides") with gr.Row(): with gr.Column(): n_years_periods = gr.Slider( minimum=1, maximum=15, value=10, step=1, label="📅 Période d'analyse (années)" ) btn_periods = gr.Button("🔍 Analyser les périodes", variant="primary") with gr.Column(): periods_chart = gr.Plot() with gr.Row(): heatmap_chart = gr.Plot() periods_table = gr.Dataframe( label="📊 Analyse des périodes par parcelle" ) def analyze_periods(n_years): data, message = herbicide_analyzer.analyze_intervention_periods(n_years) chart = herbicide_analyzer.create_intervention_periods_chart(n_years) heatmap = herbicide_analyzer.create_monthly_interventions_heatmap(n_years) return chart, heatmap, data btn_periods.click( analyze_periods, inputs=[n_years_periods], outputs=[periods_chart, heatmap_chart, periods_table] ) gr.Markdown(""" **Guide d'interprétation :** - 🎯 **Graphique scatter** : Position des interventions dans l'année vs intensité - 🔥 **Heatmap** : Répartition mensuelle des interventions par année - 📊 **Tableau** : Détail par parcelle avec périodes et quantités """) with gr.TabItem("ℹ️ À propos"): gr.Markdown(""" ## 🎯 Méthodologie Cette analyse se base sur : ### Calcul de l'IFT (Indice de Fréquence de Traitement) - **IFT ≈ Quantité appliquée / Surface de parcelle** - Indicateur de l'intensité des traitements herbicides ### Classification des risques - **TRÈS FAIBLE**: IFT = 0, aucun herbicide - **FAIBLE**: IFT < 1, usage minimal - **MODÉRÉ**: IFT < 3, usage modéré - **ÉLEVÉ**: IFT < 5, usage important - **TRÈS ÉLEVÉ**: IFT ≥ 5, usage intensif ### Données analysées - **Source**: Station Expérimentale de Kerguéhennec - **Période**: Campagne 2025 - **Variables**: Interventions, produits, quantités, surfaces ## 🔍 Nouvelles Fonctionnalités - Requêtes Herbicides ### 🏆 Top IFT par Année - Identifie les parcelles avec les IFT herbicides les plus élevés - Aide à cibler les parcelles à surveiller prioritairement - Graphique de classement et tableau détaillé ### 📖 Historique Parcelle - Trace l'historique complet des herbicides sur une parcelle - Permet de voir l'évolution des pratiques - Graphique chronologique des utilisations ### 🎯 Recherche de Produits - **Recherche positive**: Trouve les parcelles ayant reçu des produits spécifiques - **Recherche négative**: Identifie les parcelles n'ayant PAS reçu certains produits - Supporte la recherche partielle (ex: "Chardex" trouve "Chardex 500") ### 🗓️ Périodes d'Intervention - Analyse les patterns temporels d'application des herbicides - Graphique scatter des périodes vs intensité - Heatmap mensuelle/annuelle des interventions ### 💡 Exemples d'utilisation - **Rotation des cultures**: Identifier les parcelles sans Chardex pour y planter des cultures sensibles - **Suivi réglementaire**: Tracer l'usage de produits spécifiques - **Optimisation temporelle**: Analyser les meilleures périodes d'intervention - **Benchmarking**: Comparer les IFT entre parcelles similaires --- **Développé pour le Hackathon CRA Bretagne** 🏆 *Application d'aide à la décision pour une agriculture durable* """) # Bouton de rafraîchissement refresh_btn = gr.Button("🔄 Actualiser les données", variant="secondary") def refresh_data(): analyzer.load_data() analyzer.analyze_data() # Recalculer l'analyse après rechargement # Mettre à jour l'analyseur d'herbicides avec les nouvelles données herbicide_analyzer.set_data(analyzer.df) # Mettre à jour la liste des années disponibles updated_years = analyzer.get_available_years() year_dropdown.choices = updated_years if updated_years: year_dropdown.value = updated_years[-1] return ( analyzer.get_summary_stats(), analyzer.create_culture_analysis(), analyzer.create_risk_distribution(), analyzer.create_risk_visualization(), analyzer.get_low_risk_recommendations() ) refresh_btn.click( refresh_data, outputs=[stats_output, culture_plot, risk_dist_plot, risk_plot, reco_output] ) return demo # Lancement de l'application if __name__ == "__main__": demo = create_interface() # Configuration pour Hugging Face Spaces demo.launch( server_name="0.0.0.0", server_port=7860, share=False # Pas besoin de share sur HF Spaces )