import csv import json import os import unicodedata # Define paths BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv") GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson") def normalize_text(text): if not text: return "" # Normalize unicode characters to ASCII (remove accents) text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII') return text.lower().strip() def validate_censo_integration(): print(f"Loading CSV from {CSV_PATH}...") csv_data = [] try: with open(CSV_PATH, mode='r', encoding='utf-8') as f: reader = csv.DictReader(f) for row in reader: csv_data.append(row) except Exception as e: print(f"Error loading CSV: {e}") return print(f"Loading GeoJSON from {GEOJSON_PATH}...") try: with open(GEOJSON_PATH, 'r') as f: geojson = json.load(f) except Exception as e: print(f"Error loading GeoJSON: {e}") return # Build GeoJSON Lookup Map: (norm_prov, norm_dist, norm_corr) -> properties geojson_lookup = {} # Helper to handle common name variations found in Panama data # (can add more rules as we discover mismatches) def clean_name(name): n = normalize_text(name) # remove "distrito de", "comarca", etc if needed return n print("Building GeoJSON lookup table...") for feature in geojson['features']: props = feature.get('properties', {}) p_name = clean_name(props.get('adm1_name')) d_name = clean_name(props.get('adm2_name')) c_name = clean_name(props.get('adm3_name')) key = (p_name, d_name, c_name) if key in geojson_lookup: print(f"Duplicate key in GeoJSON: {key}") geojson_lookup[key] = props print(f"GeoJSON lookup size: {len(geojson_lookup)}") # Heuristics for Province Mapping (New -> Old) PROV_MAPPING = { "panama oeste": "panama", "comarca naso tjer di": "bocas del toro" # Naso was part of Bocas } print("\nValidating CSV via Name Matching with Heuristics...") matches = [] mismatches = [] for row in csv_data: # CSV headers: nomb_prov, nomb_dist, nomb_corr p_name = clean_name(row.get('nomb_prov')) d_name = clean_name(row.get('nomb_dist')) c_name = clean_name(row.get('nomb_corr')) # Apply Province Mapping search_p_name = PROV_MAPPING.get(p_name, p_name) # 1. Try Exact Match (with mapped province) key = (search_p_name, d_name, c_name) if key in geojson_lookup: matches.append(row) row['geo_match_id'] = geojson_lookup[key].get('adm3_pcode') continue # 2. Relaxed District Match: Search in Province # Find any entry in this province with the same corregimiento name candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name] if len(candidates) == 1: # Single match found in another district! match_key = candidates[0] matches.append(row) row['geo_match_id'] = geojson_lookup[match_key].get('adm3_pcode') # print(f"Relaxed Match: {c_name} (CSV Dist: {d_name}) -> (Geo Dist: {match_key[1]})") continue elif len(candidates) > 1: # Ambiguous (same corregimiento name in multiple districts of same province - rare but possible) # print(f"Ambiguous: {c_name} found in districts {[k[1] for k in candidates]}") pass # 3. Fuzzy/Typo Fixes (Specific hardcodes for common mismatch types if needed) # E.g. "El Hato de San Juan de Dios" vs "El Hato de San Juan" # We can perform a primitive "contains" check best_candidate = None # Get all corregimientos in this province prov_corrs = [k for k in geojson_lookup.keys() if k[0] == search_p_name] for k in prov_corrs: geo_c = k[2] # Check if one contains the other if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4 and len(geo_c) > 4: # Check if starts matching if c_name.startswith(geo_c) or geo_c.startswith(c_name): best_candidate = k break if best_candidate: matches.append(row) row['geo_match_id'] = geojson_lookup[best_candidate].get('adm3_pcode') # print(f"Fuzzy Match: '{c_name}' ~= '{best_candidate[2]}'") continue # No match mismatches.append(row) row['lookup_key'] = (search_p_name, d_name, c_name) print(f"Total rows in CSV: {len(csv_data)}") print(f"Matches found: {len(matches)}") print(f"Mismatches found: {len(mismatches)}") print(f"Match Rate: {len(matches)/len(csv_data)*100:.1f}%") if mismatches: print("\nMismatch Details (First 20):") print(f"{'CSV Key (Prov, Dist, Corr)':<60} {'Closest Match?':<20}") print("-" * 85) for row in mismatches[:20]: key = row['lookup_key'] print(f"{str(key):<60}") # Analyze mismatches by Province print("\nAnalyzing remaining mismatches by Province:") prov_mismatches = {} for row in mismatches: p = row['nomb_prov'] prov_mismatches[p] = prov_mismatches.get(p, 0) + 1 for p, count in prov_mismatches.items(): print(f"{p}: {count}") if __name__ == "__main__": validate_censo_integration()