|
|
import csv |
|
|
import json |
|
|
import os |
|
|
import unicodedata |
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
|
|
CSV_PATH = os.path.join(BASE_DIR, "data/censo/censo_panama_2023_unificado.csv") |
|
|
GEOJSON_PATH = os.path.join(BASE_DIR, "data/base/pan_admin3.geojson") |
|
|
|
|
|
def normalize_text(text): |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
text = unicodedata.normalize('NFKD', text).encode('ASCII', 'ignore').decode('ASCII') |
|
|
return text.lower().strip() |
|
|
|
|
|
def validate_censo_integration(): |
|
|
print(f"Loading CSV from {CSV_PATH}...") |
|
|
csv_data = [] |
|
|
try: |
|
|
with open(CSV_PATH, mode='r', encoding='utf-8') as f: |
|
|
reader = csv.DictReader(f) |
|
|
for row in reader: |
|
|
csv_data.append(row) |
|
|
except Exception as e: |
|
|
print(f"Error loading CSV: {e}") |
|
|
return |
|
|
|
|
|
print(f"Loading GeoJSON from {GEOJSON_PATH}...") |
|
|
try: |
|
|
with open(GEOJSON_PATH, 'r') as f: |
|
|
geojson = json.load(f) |
|
|
except Exception as e: |
|
|
print(f"Error loading GeoJSON: {e}") |
|
|
return |
|
|
|
|
|
|
|
|
geojson_lookup = {} |
|
|
|
|
|
|
|
|
|
|
|
def clean_name(name): |
|
|
n = normalize_text(name) |
|
|
|
|
|
return n |
|
|
|
|
|
print("Building GeoJSON lookup table...") |
|
|
for feature in geojson['features']: |
|
|
props = feature.get('properties', {}) |
|
|
p_name = clean_name(props.get('adm1_name')) |
|
|
d_name = clean_name(props.get('adm2_name')) |
|
|
c_name = clean_name(props.get('adm3_name')) |
|
|
|
|
|
key = (p_name, d_name, c_name) |
|
|
if key in geojson_lookup: |
|
|
print(f"Duplicate key in GeoJSON: {key}") |
|
|
geojson_lookup[key] = props |
|
|
|
|
|
print(f"GeoJSON lookup size: {len(geojson_lookup)}") |
|
|
|
|
|
|
|
|
PROV_MAPPING = { |
|
|
"panama oeste": "panama", |
|
|
"comarca naso tjer di": "bocas del toro" |
|
|
} |
|
|
|
|
|
print("\nValidating CSV via Name Matching with Heuristics...") |
|
|
|
|
|
matches = [] |
|
|
mismatches = [] |
|
|
|
|
|
for row in csv_data: |
|
|
|
|
|
p_name = clean_name(row.get('nomb_prov')) |
|
|
d_name = clean_name(row.get('nomb_dist')) |
|
|
c_name = clean_name(row.get('nomb_corr')) |
|
|
|
|
|
|
|
|
search_p_name = PROV_MAPPING.get(p_name, p_name) |
|
|
|
|
|
|
|
|
key = (search_p_name, d_name, c_name) |
|
|
if key in geojson_lookup: |
|
|
matches.append(row) |
|
|
row['geo_match_id'] = geojson_lookup[key].get('adm3_pcode') |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
candidates = [k for k in geojson_lookup.keys() if k[0] == search_p_name and k[2] == c_name] |
|
|
|
|
|
if len(candidates) == 1: |
|
|
|
|
|
match_key = candidates[0] |
|
|
matches.append(row) |
|
|
row['geo_match_id'] = geojson_lookup[match_key].get('adm3_pcode') |
|
|
|
|
|
continue |
|
|
elif len(candidates) > 1: |
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
best_candidate = None |
|
|
|
|
|
prov_corrs = [k for k in geojson_lookup.keys() if k[0] == search_p_name] |
|
|
|
|
|
for k in prov_corrs: |
|
|
geo_c = k[2] |
|
|
|
|
|
if (c_name in geo_c or geo_c in c_name) and len(c_name) > 4 and len(geo_c) > 4: |
|
|
|
|
|
if c_name.startswith(geo_c) or geo_c.startswith(c_name): |
|
|
best_candidate = k |
|
|
break |
|
|
|
|
|
if best_candidate: |
|
|
matches.append(row) |
|
|
row['geo_match_id'] = geojson_lookup[best_candidate].get('adm3_pcode') |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
mismatches.append(row) |
|
|
row['lookup_key'] = (search_p_name, d_name, c_name) |
|
|
|
|
|
print(f"Total rows in CSV: {len(csv_data)}") |
|
|
print(f"Matches found: {len(matches)}") |
|
|
print(f"Mismatches found: {len(mismatches)}") |
|
|
print(f"Match Rate: {len(matches)/len(csv_data)*100:.1f}%") |
|
|
|
|
|
if mismatches: |
|
|
print("\nMismatch Details (First 20):") |
|
|
print(f"{'CSV Key (Prov, Dist, Corr)':<60} {'Closest Match?':<20}") |
|
|
print("-" * 85) |
|
|
for row in mismatches[:20]: |
|
|
key = row['lookup_key'] |
|
|
print(f"{str(key):<60}") |
|
|
|
|
|
|
|
|
print("\nAnalyzing remaining mismatches by Province:") |
|
|
prov_mismatches = {} |
|
|
for row in mismatches: |
|
|
p = row['nomb_prov'] |
|
|
prov_mismatches[p] = prov_mismatches.get(p, 0) + 1 |
|
|
for p, count in prov_mismatches.items(): |
|
|
print(f"{p}: {count}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
validate_censo_integration() |
|
|
|