import csv import json import os import unicodedata # Define paths BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv") OUTPUT_PATH = os.path.join(BASE_DIR, "data/censo/censo_2023_enriched.csv") GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson") def normalize_text(text): if not text: return "" text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII') return text.lower().strip() def process_censo_data(): print(f"Loading CSV from {CSV_PATH}...") csv_data = [] headers = [] try: with open(CSV_PATH, mode='r', encoding='utf-8') as f: reader = csv.DictReader(f) headers = reader.fieldnames for row in reader: csv_data.append(row) except Exception as e: print(f"Error loading CSV: {e}") return print(f"Loading GeoJSON from {GEOJSON_PATH}...") try: with open(GEOJSON_PATH, 'r') as f: geojson = json.load(f) except Exception as e: print(f"Error loading GeoJSON: {e}") return # Build GeoJSON Lookup Map geojson_lookup = {} def clean_name(name): return normalize_text(name) print("Building GeoJSON lookup table...") for feature in geojson['features']: props = feature.get('properties', {}) p_name = clean_name(props.get('adm1_name')) d_name = clean_name(props.get('adm2_name')) c_name = clean_name(props.get('adm3_name')) # Store properties keyed by (Prov, Dist, Corr) geojson_lookup[(p_name, d_name, c_name)] = props # Province Mapping Heuristics PROV_MAPPING = { "panama oeste": "panama", "comarca naso tjer di": "bocas del toro" } print("Enriching CSV data...") matches = 0 for row in csv_data: p_name = clean_name(row.get('nomb_prov')) d_name = clean_name(row.get('nomb_dist')) c_name = clean_name(row.get('nomb_corr')) search_p_name = PROV_MAPPING.get(p_name, p_name) # Strategy 1: Exact Match key = (search_p_name, d_name, c_name) found_code = None if key in geojson_lookup: found_code = geojson_lookup[key].get('adm3_pcode') else: # Strategy 2: Relaxed District Search candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name] if len(candidates) == 1: found_code = geojson_lookup[candidates[0]].get('adm3_pcode') else: # Strategy 3: Fuzzy startsWith check prov_keys = [k for k in geojson_lookup.keys() if k[0] == search_p_name] for k in prov_keys: geo_c = k[2] # Check if names are "close enough" (contains or starts with) if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4: found_code = geojson_lookup[k].get('adm3_pcode') break # Assign found code or empty string if found_code: row['adm3_pcode'] = found_code matches += 1 else: row['adm3_pcode'] = "" print(f"Enrichment Complete. Matches: {matches}/{len(csv_data)} ({matches/len(csv_data)*100:.1f}%)") # Save Enriched CSV new_headers = ['adm3_pcode'] + headers print(f"Saving to {OUTPUT_PATH}...") try: with open(OUTPUT_PATH, mode='w', encoding='utf-8', newline='') as f: writer = csv.DictWriter(f, fieldnames=new_headers) writer.writeheader() writer.writerows(csv_data) print("File saved successfully.") except Exception as e: print(f"Error saving CSV: {e}") if __name__ == "__main__": process_censo_data()